From c17559545cfbc685e6ab1f10fc342957f3344123 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=96=86=E5=AE=87?= Date: Thu, 21 May 2026 10:59:16 +0800 Subject: [PATCH 1/6] [flink][spark] supports adding blob columns through `ALTER TABLE` statements --- .../apache/paimon/schema/SchemaManager.java | 78 +++++++++++++++++ .../paimon/schema/SchemaValidation.java | 52 ++++++----- .../paimon/table/SchemaEvolutionTest.java | 87 +++++++++++++++++++ .../org/apache/paimon/flink/FlinkCatalog.java | 26 ++++-- .../paimon/flink/SchemaChangeITCase.java | 20 +++++ .../org/apache/paimon/spark/SparkCatalog.java | 64 +++++++++++--- .../spark/SparkSchemaEvolutionITCase.java | 29 +++++++ 7 files changed, 313 insertions(+), 43 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java index 670960889d5c..876c78fefddb 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java @@ -317,6 +317,8 @@ public static TableSchema generateTableSchema( for (SchemaChange change : changes) { if (change instanceof SetOption) { SetOption setOption = (SetOption) change; + checkAlterBlobFieldOption( + oldTableSchema, oldOptions, setOption.key(), setOption.value()); if (hasSnapshots.get()) { checkAlterTableOption( oldOptions, @@ -327,6 +329,10 @@ public static TableSchema generateTableSchema( newOptions.put(setOption.key(), setOption.value()); } else if (change instanceof RemoveOption) { RemoveOption removeOption = (RemoveOption) change; + if (isBlobFieldOption(removeOption.key())) { + throw new UnsupportedOperationException( + "Cannot remove blob field option: " + removeOption.key()); + } if (hasSnapshots.get()) { checkResetTableOption(oldOptions, removeOption.key()); } @@ -1283,6 +1289,78 @@ public static void checkResetTableOption(Map options, String key } } + /** + * Check alter blob field option. Now we only allow adding non-existing new fields as BLOB + * fields, and removing existing BLOB fields is not allowed. + * + * @param oldTableSchema table schema + * @param oldOptions old table options + * @param key altering key + * @param value altering value + */ + private static void checkAlterBlobFieldOption( + TableSchema oldTableSchema, Map oldOptions, String key, String value) { + if (!isBlobFieldOption(key)) { + return; + } + + Map newOptions = new HashMap<>(oldOptions); + if (value == null) { + newOptions.remove(key); + } else { + newOptions.put(key, value); + } + + Set oldFields = getBlobFields(oldOptions, key); + Set newFields = getBlobFields(newOptions, key); + + // 1. do not allow removing existing blob fields + Set removals = new HashSet<>(oldFields); + removals.removeAll(newFields); + for (String fieldName : oldTableSchema.fieldNames()) { + if (removals.contains(fieldName)) { + throw new UnsupportedOperationException( + String.format( + "Cannot remove an existing field '%s' from 'blob-field'.", + fieldName)); + } + } + + // 2. do not allow adding existing fields as BLOB fields + Set additions = new HashSet<>(newFields); + additions.removeAll(oldFields); + Set existingFields = new HashSet<>(oldTableSchema.fieldNames()); + for (String field : additions) { + if (existingFields.contains(field)) { + throw new UnsupportedOperationException( + String.format( + "Cannot configure existing field '%s' as a BLOB field. " + + "BLOB fields can only be added by adding new columns.", + field)); + } + } + } + + private static boolean isBlobFieldOption(String key) { + return CoreOptions.BLOB_FIELD.key().equals(key) + || CoreOptions.BLOB_DESCRIPTOR_FIELD.key().equals(key) + || CoreOptions.BLOB_VIEW_FIELD.key().equals(key) + || CoreOptions.BLOB_EXTERNAL_STORAGE_FIELD.key().equals(key); + } + + private static Set getBlobFields(Map options, String key) { + if (CoreOptions.BLOB_FIELD.key().equals(key)) { + return new HashSet<>(CoreOptions.blobField(options)); + } else if (CoreOptions.BLOB_DESCRIPTOR_FIELD.key().equals(key)) { + return new CoreOptions(options).blobDescriptorField(); + } else if (CoreOptions.BLOB_VIEW_FIELD.key().equals(key)) { + return new CoreOptions(options).blobViewField(); + } else if (CoreOptions.BLOB_EXTERNAL_STORAGE_FIELD.key().equals(key)) { + return new CoreOptions(options).blobExternalStorageField(); + } + throw new IllegalArgumentException("Unknown blob field option: " + key); + } + public static void checkAlterTablePath(String key) { if (CoreOptions.PATH.key().equalsIgnoreCase(key)) { throw new UnsupportedOperationException("Change path is not supported yet."); diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java index 4ffc3ec0259e..ae02871b9259 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java @@ -166,6 +166,9 @@ public static void validateTableSchema(TableSchema schema) { FileFormat fileFormat = FileFormat.fromIdentifier(options.formatType(), new Options(schema.options())); RowType tableRowType = new RowType(schema.fields()); + + // Validate blob-related fields + // Now we support configuring blob fields for future columns. validateBlobFields(tableRowType, options); Set blobDescriptorFields = validateBlobDescriptorFields(tableRowType, options); Set blobViewFields = @@ -795,17 +798,14 @@ private static void validateRowTracking(TableSchema schema, CoreOptions options) } private static void validateBlobFields(RowType rowType, CoreOptions options) { - Set blobFieldNames = - rowType.getFields().stream() - .filter(field -> field.type().getTypeRoot() == DataTypeRoot.BLOB) - .map(DataField::name) - .collect(Collectors.toCollection(HashSet::new)); + Set fieldNames = fieldNames(rowType); + Set blobFieldNames = blobFieldNames(rowType); Set configured = CoreOptions.blobField(options.toMap()).stream() .collect(Collectors.toCollection(HashSet::new)); for (String field : configured) { checkArgument( - blobFieldNames.contains(field), + !fieldNames.contains(field) || blobFieldNames.contains(field), "Field '%s' in '%s' must be a BLOB field in table schema.", field, CoreOptions.BLOB_FIELD.key()); @@ -813,15 +813,12 @@ private static void validateBlobFields(RowType rowType, CoreOptions options) { } private static Set validateBlobDescriptorFields(RowType rowType, CoreOptions options) { - Set blobFieldNames = - rowType.getFields().stream() - .filter(field -> field.type().getTypeRoot() == DataTypeRoot.BLOB) - .map(DataField::name) - .collect(Collectors.toCollection(HashSet::new)); + Set fieldNames = fieldNames(rowType); + Set blobFieldNames = blobFieldNames(rowType); Set configured = options.blobDescriptorField(); for (String field : configured) { checkArgument( - blobFieldNames.contains(field), + !fieldNames.contains(field) || blobFieldNames.contains(field), "Field '%s' in '%s' must be a BLOB field in table schema.", field, CoreOptions.BLOB_DESCRIPTOR_FIELD.key()); @@ -831,15 +828,12 @@ private static Set validateBlobDescriptorFields(RowType rowType, CoreOpt private static Set validateBlobViewFields( RowType rowType, CoreOptions options, Set blobDescriptorFields) { - Set blobFieldNames = - rowType.getFields().stream() - .filter(field -> field.type().getTypeRoot() == DataTypeRoot.BLOB) - .map(DataField::name) - .collect(Collectors.toCollection(HashSet::new)); + Set fieldNames = fieldNames(rowType); + Set blobFieldNames = blobFieldNames(rowType); Set configured = options.blobViewField(); for (String field : configured) { checkArgument( - blobFieldNames.contains(field), + !fieldNames.contains(field) || blobFieldNames.contains(field), "Field '%s' in '%s' must be a BLOB field in table schema.", field, CoreOptions.BLOB_VIEW_FIELD.key()); @@ -855,15 +849,12 @@ private static Set validateBlobViewFields( private static void validateBlobExternalStorageFields( RowType rowType, CoreOptions options, Set blobDescriptorFields) { - Set blobFieldNames = - rowType.getFields().stream() - .filter(field -> field.type().getTypeRoot() == DataTypeRoot.BLOB) - .map(DataField::name) - .collect(Collectors.toCollection(HashSet::new)); + Set fieldNames = fieldNames(rowType); + Set blobFieldNames = blobFieldNames(rowType); Set configured = options.blobExternalStorageField(); for (String field : configured) { checkArgument( - blobFieldNames.contains(field), + !fieldNames.contains(field) || blobFieldNames.contains(field), "Field '%s' in '%s' must be a BLOB field in table schema.", field, CoreOptions.BLOB_EXTERNAL_STORAGE_FIELD.key()); @@ -884,6 +875,19 @@ private static void validateBlobExternalStorageFields( } } + private static Set fieldNames(RowType rowType) { + return rowType.getFields().stream() + .map(DataField::name) + .collect(Collectors.toCollection(HashSet::new)); + } + + private static Set blobFieldNames(RowType rowType) { + return rowType.getFields().stream() + .filter(field -> field.type().getTypeRoot() == DataTypeRoot.BLOB) + .map(DataField::name) + .collect(Collectors.toCollection(HashSet::new)); + } + private static void validateIncrementalClustering(TableSchema schema, CoreOptions options) { if (options.clusteringIncrementalEnabled()) { checkArgument( diff --git a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java index 8b118e05621d..8418baaea9c7 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java @@ -608,6 +608,93 @@ public void testRenameSeqGroupFields() throws Exception { assertThat(newSchema.options()).contains(entries); } + @Test + public void testAlterAddBlobFieldOptionBeforeAddingColumn() throws Exception { + Schema schema = + new Schema( + RowType.of(DataTypes.INT(), DataTypes.STRING()).getFields(), + Collections.emptyList(), + Collections.emptyList(), + optionsForBlobTable(), + ""); + schemaManager.createTable(schema); + + TableSchema schemaWithOption = + schemaManager.commitChanges( + SchemaChange.setOption(CoreOptions.BLOB_FIELD.key(), "picture")); + assertThat(schemaWithOption.fieldNames()).doesNotContain("picture"); + + TableSchema schemaWithBlobColumn = + schemaManager.commitChanges(SchemaChange.addColumn("picture", DataTypes.BLOB())); + assertThat(schemaWithBlobColumn.fields().get(2).type()).isEqualTo(DataTypes.BLOB()); + } + + @Test + public void testAlterBlobFieldOptionCannotConvertExistingColumn() throws Exception { + Schema schema = + new Schema( + RowType.of(DataTypes.INT(), DataTypes.BYTES()).getFields(), + Collections.emptyList(), + Collections.emptyList(), + optionsForBlobTable(), + ""); + schemaManager.createTable(schema); + + assertThatThrownBy( + () -> + schemaManager.commitChanges( + SchemaChange.setOption(CoreOptions.BLOB_FIELD.key(), "f1"))) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessageContaining("Cannot configure existing field 'f1' as a BLOB field."); + } + + @Test + public void testAlterBlobFieldOptionCanRemoveNonExistingField() throws Exception { + Map options = optionsForBlobTable(); + options.put(CoreOptions.BLOB_FIELD.key(), "picture"); + Schema schema = + new Schema( + RowType.of(DataTypes.INT(), DataTypes.STRING()).getFields(), + Collections.emptyList(), + Collections.emptyList(), + options, + ""); + schemaManager.createTable(schema); + + TableSchema newSchema = + schemaManager.commitChanges( + SchemaChange.setOption(CoreOptions.BLOB_FIELD.key(), "avatar")); + assertThat(CoreOptions.blobField(newSchema.options())).containsExactly("avatar"); + } + + @Test + public void testAlterBlobFieldOptionCannotRemoveExistingField() throws Exception { + Map options = optionsForBlobTable(); + options.put(CoreOptions.BLOB_FIELD.key(), "f1"); + Schema schema = + new Schema( + RowType.of(DataTypes.INT(), DataTypes.BLOB()).getFields(), + Collections.emptyList(), + Collections.emptyList(), + options, + ""); + schemaManager.createTable(schema); + + assertThatThrownBy( + () -> + schemaManager.commitChanges( + SchemaChange.setOption(CoreOptions.BLOB_FIELD.key(), ""))) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessageContaining("Cannot remove an existing field 'f1' from 'blob-field'."); + } + + private Map optionsForBlobTable() { + Map options = new HashMap<>(); + options.put(CoreOptions.ROW_TRACKING_ENABLED.key(), "true"); + options.put(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true"); + return options; + } + private List readRecords(FileStoreTable table, Predicate filter) throws IOException { List results = new ArrayList<>(); forEachRemaining( diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java index cae85c238621..04ed72202c9b 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java @@ -47,6 +47,7 @@ import org.apache.paimon.table.sink.BatchTableCommit; import org.apache.paimon.table.source.DataTableScan; import org.apache.paimon.table.source.ReadBuilder; +import org.apache.paimon.types.DataType; import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.Filter; import org.apache.paimon.utils.InternalRowPartitionComputer; @@ -566,20 +567,23 @@ protected Schema buildPaimonSchema(CatalogBaseTable catalogTable, Map toSchemaChange( TableChange change, Map oldTableNonPhysicalColumnIndex, - @Nullable String primaryKeyConstraintName) { + @Nullable String primaryKeyConstraintName, + Set blobTypeFields, + Map options) { List schemaChanges = new ArrayList<>(); if (change instanceof AddColumn) { if (((AddColumn) change).getColumn().isPhysical()) { AddColumn add = (AddColumn) change; String comment = add.getColumn().getComment().orElse(null); SchemaChange.Move move = getMove(add.getPosition(), add.getColumn().getName()); - schemaChanges.add( - SchemaChange.addColumn( + DataType type = + resolveDataType( add.getColumn().getName(), - LogicalTypeConversion.toDataType( - add.getColumn().getDataType().getLogicalType()), - comment, - move)); + add.getColumn().getDataType().getLogicalType(), + options, + blobTypeFields); + schemaChanges.add( + SchemaChange.addColumn(add.getColumn().getName(), type, comment, move)); } return schemaChanges; } else if (change instanceof AddWatermark) { @@ -847,7 +851,9 @@ public void alterTable( toSchemaChange( tableChange, oldTableNonPhysicalColumnIndex, - primaryKeyConstraintName) + primaryKeyConstraintName, + blobTypeFields(newTable.getOptions()), + newTable.getOptions()) .stream()) .collect(Collectors.toList()); changes.addAll(schemaChanges); @@ -1133,8 +1139,10 @@ public static Schema fromCatalogTable(CatalogBaseTable catalogTable) { private static Set blobTypeFields(Map options) { Set blobTypeFields = new HashSet<>(CoreOptions.blobField(options)); - blobTypeFields.addAll(new CoreOptions(options).blobDescriptorField()); + CoreOptions coreOptions = new CoreOptions(options); + blobTypeFields.addAll(coreOptions.blobDescriptorField()); blobTypeFields.addAll(CoreOptions.blobViewField(options)); + blobTypeFields.addAll(coreOptions.blobExternalStorageField()); return blobTypeFields; } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java index d51c1d006ac2..975537597c7e 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java @@ -19,6 +19,7 @@ package org.apache.paimon.flink; import org.apache.paimon.data.BinaryString; +import org.apache.paimon.types.DataTypeRoot; import org.apache.paimon.utils.DateTimeUtils; import org.apache.flink.table.api.config.ExecutionConfigOptions; @@ -110,6 +111,25 @@ public void testAddColumn() { + " +I[ggg, hhh, 23.43, 6, 2.34, 34, 23, true]]"); } + @Test + public void testAlterAddBlobColumn() throws Exception { + sql( + "CREATE TABLE T_BLOB (id INT, data STRING) WITH (" + + "'row-tracking.enabled'='true', " + + "'data-evolution.enabled'='true')"); + sql("INSERT INTO T_BLOB VALUES (1, 'old')"); + + sql("ALTER TABLE T_BLOB SET ('blob-field'='picture')"); + sql("ALTER TABLE T_BLOB ADD picture BYTES"); + sql("INSERT INTO T_BLOB VALUES (2, 'new', X'4869')"); + + assertThat(paimonTable("T_BLOB").rowType().getField("picture").type().is(DataTypeRoot.BLOB)) + .isTrue(); + List result = sql("SELECT * FROM T_BLOB ORDER BY id"); + assertThat(result.get(0).getField(2)).isNull(); + assertThat((byte[]) result.get(1).getField(2)).containsExactly((byte) 72, (byte) 105); + } + @Test public void testDropColumn() { sql( diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java index 451a5fae26ad..0e1f5ba246dc 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java @@ -38,6 +38,7 @@ import org.apache.paimon.spark.catalog.functions.V1FunctionConverter; import org.apache.paimon.spark.utils.CatalogUtils; import org.apache.paimon.table.FormatTable; +import org.apache.paimon.table.Table; import org.apache.paimon.table.iceberg.IcebergTable; import org.apache.paimon.table.lance.LanceTable; import org.apache.paimon.table.object.ObjectTable; @@ -85,6 +86,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -346,9 +348,15 @@ public SparkTable loadTable(Identifier ident, long timestamp) throws NoSuchTable @Override public org.apache.spark.sql.connector.catalog.Table alterTable( Identifier ident, TableChange... changes) throws NoSuchTableException { - List schemaChanges = - Arrays.stream(changes).map(this::toSchemaChange).collect(Collectors.toList()); try { + Table table = catalog.getTable(toIdentifier(ident, catalogName)); + // We have to get the new table options by merging current options with changes, + // in case that altering blob fields and adding blob columns come together + Map newOptions = newTableOptions(table.options(), changes); + List schemaChanges = new ArrayList<>(); + for (TableChange change : changes) { + schemaChanges.add(toSchemaChange(change, newOptions)); + } catalog.alterTable(toIdentifier(ident, catalogName), schemaChanges, false); return loadTable(ident); } catch (Catalog.TableNotExistException e) { @@ -471,7 +479,7 @@ private StagedTable stageReplaceByDropAndCreate( return new RollbackStagedTable(loadTable(ident), () -> {}); } - private SchemaChange toSchemaChange(TableChange change) { + private SchemaChange toSchemaChange(TableChange change, Map newOptions) { if (change instanceof TableChange.SetProperty) { TableChange.SetProperty set = (TableChange.SetProperty) change; validateAlterProperty(set.property()); @@ -494,7 +502,7 @@ private SchemaChange toSchemaChange(TableChange change) { checkNoDefaultValue(add); return SchemaChange.addColumn( add.fieldNames(), - toPaimonType(add.dataType()).copy(add.isNullable()), + resolveDataType(add, newOptions).copy(add.isNullable()), add.comment(), move); } else if (change instanceof TableChange.RenameColumn) { @@ -526,6 +534,46 @@ private SchemaChange toSchemaChange(TableChange change) { } } + private static Map newTableOptions( + Map currentOptions, TableChange[] changes) { + Map options = new HashMap<>(currentOptions); + for (TableChange change : changes) { + if (change instanceof TableChange.SetProperty) { + TableChange.SetProperty set = (TableChange.SetProperty) change; + if (!set.property().equals(TableCatalog.PROP_COMMENT)) { + options.put(set.property(), set.value()); + } + } else if (change instanceof TableChange.RemoveProperty) { + TableChange.RemoveProperty remove = (TableChange.RemoveProperty) change; + if (!remove.property().equals(TableCatalog.PROP_COMMENT)) { + options.remove(remove.property()); + } + } + } + return options; + } + + private static DataType resolveDataType( + TableChange.AddColumn add, Map finalOptions) { + if (add.fieldNames().length == 1 + && blobTypeFields(finalOptions).contains(add.fieldNames()[0])) { + checkArgument( + add.dataType() instanceof org.apache.spark.sql.types.BinaryType, + "The type of blob field must be binary"); + return new BlobType().copy(add.isNullable()); + } + return toPaimonType(add.dataType()).copy(add.isNullable()); + } + + private static Set blobTypeFields(Map options) { + Set blobTypeFields = new HashSet<>(CoreOptions.blobField(options)); + CoreOptions coreOptions = new CoreOptions(options); + blobTypeFields.addAll(coreOptions.blobDescriptorField()); + blobTypeFields.addAll(coreOptions.blobViewField()); + blobTypeFields.addAll(coreOptions.blobExternalStorageField()); + return blobTypeFields; + } + private static SchemaChange.Move getMove( TableChange.ColumnPosition columnPosition, String[] fieldNames) { SchemaChange.Move move = null; @@ -560,9 +608,7 @@ private StagedTable stageCreateDirectly( private Schema toInitialSchema( StructType schema, Transform[] partitions, Map properties) { Map normalizedProperties = new HashMap<>(properties); - List blobFields = CoreOptions.blobField(properties); - Set blobDescriptorFields = new CoreOptions(properties).blobDescriptorField(); - List blobViewFields = CoreOptions.blobViewField(properties); + Set blobTypeFields = blobTypeFields(properties); String provider = properties.get(TableCatalog.PROP_PROVIDER); if (!usePaimon(provider)) { if (isFormatTable(provider)) { @@ -596,9 +642,7 @@ private Schema toInitialSchema( for (StructField field : schema.fields()) { String name = field.name(); DataType type; - if (blobFields.contains(name) - || blobDescriptorFields.contains(name) - || blobViewFields.contains(name)) { + if (blobTypeFields.contains(name)) { checkArgument( field.dataType() instanceof org.apache.spark.sql.types.BinaryType, "The type of blob field must be binary"); diff --git a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java index fc8b4adb6ebd..557e7a1de3c1 100644 --- a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java +++ b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java @@ -18,6 +18,7 @@ package org.apache.paimon.spark; +import org.apache.paimon.types.DataTypeRoot; import org.apache.paimon.utils.StringUtils; import org.apache.spark.sql.AnalysisException; @@ -95,6 +96,34 @@ public void testAddColumn() { .isEqualTo("[[1,2,1,null], [5,6,3,null]]"); } + @Test + public void testAlterAddBlobColumn() { + spark.sql( + "CREATE TABLE testAlterAddBlobColumn (id INT, data STRING) " + + "TBLPROPERTIES (" + + "'row-tracking.enabled'='true', " + + "'data-evolution.enabled'='true')"); + spark.sql("INSERT INTO testAlterAddBlobColumn VALUES (1, 'old')"); + + spark.sql( + "ALTER TABLE testAlterAddBlobColumn " + + "SET TBLPROPERTIES ('blob-field'='picture')"); + spark.sql("ALTER TABLE testAlterAddBlobColumn ADD COLUMN picture BINARY"); + spark.sql("INSERT INTO testAlterAddBlobColumn VALUES (2, 'new', X'4869')"); + + assertThat( + getTable("testAlterAddBlobColumn") + .rowType() + .getField("picture") + .type() + .is(DataTypeRoot.BLOB)) + .isTrue(); + List rows = + spark.sql("SELECT * FROM testAlterAddBlobColumn ORDER BY id").collectAsList(); + assertThat(rows.get(0).get(2)).isNull(); + assertThat((byte[]) rows.get(1).get(2)).containsExactly((byte) 72, (byte) 105); + } + @Test public void testAddNotNullColumn() { createTable("testAddNotNullColumn"); From 56b152ebd758db55b2893cb1a54345640c9b7e3e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=96=86=E5=AE=87?= Date: Thu, 21 May 2026 11:51:54 +0800 Subject: [PATCH 2/6] add tests for blob-descriptor-field --- .../java/org/apache/paimon/CoreOptions.java | 1 - .../paimon/flink/SchemaChangeITCase.java | 21 +++++++++++++++++- .../spark/SparkSchemaEvolutionITCase.java | 22 +++++++++++++++++++ 3 files changed, 42 insertions(+), 2 deletions(-) diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java index d5a1bb7fb067..63c4715c39d9 100644 --- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java @@ -2299,7 +2299,6 @@ public InlineElement getDescription() { + "Fields listed in blob-descriptor-field or blob-view-field " + "are also treated as BLOB fields."); - @Immutable public static final ConfigOption BLOB_DESCRIPTOR_FIELD = key("blob-descriptor-field") .stringType() diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java index 975537597c7e..9d2b475a08cf 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java @@ -121,15 +121,34 @@ public void testAlterAddBlobColumn() throws Exception { sql("ALTER TABLE T_BLOB SET ('blob-field'='picture')"); sql("ALTER TABLE T_BLOB ADD picture BYTES"); - sql("INSERT INTO T_BLOB VALUES (2, 'new', X'4869')"); assertThat(paimonTable("T_BLOB").rowType().getField("picture").type().is(DataTypeRoot.BLOB)) .isTrue(); + sql("INSERT INTO T_BLOB VALUES (2, 'new', X'4869')"); List result = sql("SELECT * FROM T_BLOB ORDER BY id"); assertThat(result.get(0).getField(2)).isNull(); assertThat((byte[]) result.get(1).getField(2)).containsExactly((byte) 72, (byte) 105); } + @Test + public void testAlterAddBlobDescriptorColumn() throws Exception { + sql( + "CREATE TABLE T_BLOB_DESCRIPTOR (id INT, data STRING) WITH (" + + "'row-tracking.enabled'='true', " + + "'data-evolution.enabled'='true')"); + + sql("ALTER TABLE T_BLOB_DESCRIPTOR SET ('blob-descriptor-field'='picture')"); + sql("ALTER TABLE T_BLOB_DESCRIPTOR ADD picture BYTES"); + + assertThat( + paimonTable("T_BLOB_DESCRIPTOR") + .rowType() + .getField("picture") + .type() + .is(DataTypeRoot.BLOB)) + .isTrue(); + } + @Test public void testDropColumn() { sql( diff --git a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java index 557e7a1de3c1..3af2a484454d 100644 --- a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java +++ b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java @@ -124,6 +124,28 @@ public void testAlterAddBlobColumn() { assertThat((byte[]) rows.get(1).get(2)).containsExactly((byte) 72, (byte) 105); } + @Test + public void testAlterAddBlobDescriptorColumn() { + spark.sql( + "CREATE TABLE testAlterAddBlobDescriptorColumn (id INT, data STRING) " + + "TBLPROPERTIES (" + + "'row-tracking.enabled'='true', " + + "'data-evolution.enabled'='true')"); + + spark.sql( + "ALTER TABLE testAlterAddBlobDescriptorColumn " + + "SET TBLPROPERTIES ('blob-descriptor-field'='picture')"); + spark.sql("ALTER TABLE testAlterAddBlobDescriptorColumn ADD COLUMN picture BINARY"); + + assertThat( + getTable("testAlterAddBlobDescriptorColumn") + .rowType() + .getField("picture") + .type() + .is(DataTypeRoot.BLOB)) + .isTrue(); + } + @Test public void testAddNotNullColumn() { createTable("testAddNotNullColumn"); From c89c434d282ebe2cd85b9de64738c2d81132f64b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=96=86=E5=AE=87?= Date: Fri, 22 May 2026 11:31:15 +0800 Subject: [PATCH 3/6] fix comments --- docs/docs/append-table/blob.mdx | 69 ++++++++++++++++++- .../apache/paimon/schema/SchemaManager.java | 6 +- .../paimon/table/SchemaEvolutionTest.java | 3 +- .../paimon/flink/SchemaChangeITCase.java | 46 +++++++++++++ .../spark/SparkSchemaEvolutionITCase.java | 25 +++++++ 5 files changed, 145 insertions(+), 4 deletions(-) diff --git a/docs/docs/append-table/blob.mdx b/docs/docs/append-table/blob.mdx index 4f0ce604acea..0cbbaa821c3c 100644 --- a/docs/docs/append-table/blob.mdx +++ b/docs/docs/append-table/blob.mdx @@ -106,7 +106,7 @@ This allows one table to mix raw-data BLOB fields, descriptor-only BLOB fields, No (none) String - Specifies column names that should be stored as blob type. This is used when you want to treat a BYTES column as a BLOB. Fields listed in blob-descriptor-field or blob-view-field are also treated as BLOB fields. + Specifies column names that should be stored as blob type. This is used when you want to treat a BYTES column as a BLOB. Fields listed in blob-descriptor-field, blob-view-field, or blob-external-storage-field are also treated as BLOB fields.
blob-as-descriptor
@@ -256,6 +256,73 @@ CREATE TABLE image_table ( +### Adding a Blob Column to an Existing Table + +Flink SQL and Spark SQL do not expose Paimon's `BLOB` type directly. To add a BLOB column with SQL, add a binary +column (`BYTES` in Flink SQL, `BINARY` in Spark SQL) and configure the blob field option before adding the physical +column. Currently, `ALTER TABLE` supports adding BLOB columns through `blob-field` or `blob-descriptor-field`. +`blob-view-field` and `blob-external-storage-field` are immutable and should be configured when creating the table. + +{{< tabs "blob-add-column" >}} + +{{< tab "Flink SQL" >}} +```sql +CREATE TABLE image_table ( + id INT, + name STRING +) WITH ( + 'row-tracking.enabled' = 'true', + 'data-evolution.enabled' = 'true' +); + +ALTER TABLE image_table SET ('blob-field' = 'image'); +ALTER TABLE image_table ADD image BYTES; +``` +{{< /tab >}} + +{{< tab "Spark SQL" >}} +```sql +CREATE TABLE image_table ( + id INT, + name STRING +) TBLPROPERTIES ( + 'row-tracking.enabled' = 'true', + 'data-evolution.enabled' = 'true' +); + +ALTER TABLE image_table SET TBLPROPERTIES ('blob-field' = 'image'); +ALTER TABLE image_table ADD COLUMN image BINARY; +``` +{{< /tab >}} + +{{< /tabs >}} + +Do not add the binary column first and set `blob-field` later. For example, the following order creates a normal +`BYTES` / `BINARY` column. The later option change will not convert the existing column to BLOB, and may be rejected +because existing fields cannot be newly configured as blob fields: + +```sql +ALTER TABLE image_table ADD image BYTES; -- or ADD COLUMN image BINARY in Spark SQL +ALTER TABLE image_table SET ('blob-field' = 'image'); +``` + +For descriptor BLOB columns, set `blob-descriptor-field` before adding the column. The following example uses Flink +SQL; in Spark SQL, use `SET TBLPROPERTIES` and `ADD COLUMN image BINARY`. + +```sql +ALTER TABLE image_table SET ('blob-descriptor-field' = 'image'); +ALTER TABLE image_table ADD image BYTES; +``` + +When using the Paimon API, you can add a BLOB column directly because Paimon has a native `BLOB` type: + +```java +catalog.alterTable( + identifier, + Collections.singletonList(SchemaChange.addColumn("image", DataTypes.BLOB())), + false); +``` + ### Inserting Blob Data diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java index 876c78fefddb..66d47a27ebc8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java @@ -1335,8 +1335,10 @@ private static void checkAlterBlobFieldOption( throw new UnsupportedOperationException( String.format( "Cannot configure existing field '%s' as a BLOB field. " - + "BLOB fields can only be added by adding new columns.", - field)); + + "BLOB fields can only be added as new columns. " + + "If you are using SQL to ADD a blob column, set '%s' " + + "before adding the binary column.", + field, key)); } } } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java index 8418baaea9c7..28e677108c8d 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java @@ -645,7 +645,8 @@ public void testAlterBlobFieldOptionCannotConvertExistingColumn() throws Excepti schemaManager.commitChanges( SchemaChange.setOption(CoreOptions.BLOB_FIELD.key(), "f1"))) .isInstanceOf(UnsupportedOperationException.class) - .hasMessageContaining("Cannot configure existing field 'f1' as a BLOB field."); + .hasMessageContaining("Cannot configure existing field 'f1' as a BLOB field.") + .hasMessageContaining("set 'blob-field' before adding the binary column"); } @Test diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java index 9d2b475a08cf..092c22e5226f 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java @@ -22,13 +22,21 @@ import org.apache.paimon.types.DataTypeRoot; import org.apache.paimon.utils.DateTimeUtils; +import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.config.ExecutionConfigOptions; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.ResolvedCatalogTable; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.catalog.TableChange; import org.apache.flink.types.Row; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; import java.time.format.DateTimeFormatter; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -149,6 +157,44 @@ public void testAlterAddBlobDescriptorColumn() throws Exception { .isTrue(); } + @Test + public void testAlterAddBlobColumnWithCombinedTableChanges() throws Exception { + sql( + "CREATE TABLE T_BLOB_COMBINED (id INT, data STRING) WITH (" + + "'row-tracking.enabled'='true', " + + "'data-evolution.enabled'='true')"); + + CatalogTable table = table("T_BLOB_COMBINED"); + Map newOptions = new HashMap<>(table.getOptions()); + newOptions.put("blob-field", "picture"); + + CatalogTable newTable = + new ResolvedCatalogTable( + table.copy(newOptions), + ResolvedSchema.physical( + new String[] {"id", "data", "picture"}, + new org.apache.flink.table.types.DataType[] { + DataTypes.INT(), DataTypes.STRING(), DataTypes.BYTES() + })); + + flinkCatalog() + .alterTable( + new ObjectPath(tEnv.getCurrentDatabase(), "T_BLOB_COMBINED"), + newTable, + Arrays.asList( + TableChange.set("blob-field", "picture"), + TableChange.add(Column.physical("picture", DataTypes.BYTES()))), + false); + + assertThat( + paimonTable("T_BLOB_COMBINED") + .rowType() + .getField("picture") + .type() + .is(DataTypeRoot.BLOB)) + .isTrue(); + } + @Test public void testDropColumn() { sql( diff --git a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java index 3af2a484454d..73d1f2ec6792 100644 --- a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java +++ b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java @@ -24,6 +24,8 @@ import org.apache.spark.sql.AnalysisException; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.TableChange; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -146,6 +148,29 @@ public void testAlterAddBlobDescriptorColumn() { .isTrue(); } + @Test + public void testAlterAddBlobColumnWithCombinedTableChanges() throws Exception { + String tableName = "testAlterAddBlobColumnWithCombinedTableChanges"; + spark.sql( + "CREATE TABLE " + + tableName + + " (id INT, data STRING) " + + "TBLPROPERTIES (" + + "'row-tracking.enabled'='true', " + + "'data-evolution.enabled'='true')"); + + ((SparkCatalog) spark.sessionState().catalogManager().currentCatalog()) + .alterTable( + Identifier.of(new String[] {"default"}, tableName), + TableChange.setProperty("blob-field", "picture"), + TableChange.addColumn( + new String[] {"picture"}, + org.apache.spark.sql.types.DataTypes.BinaryType)); + + assertThat(getTable(tableName).rowType().getField("picture").type().is(DataTypeRoot.BLOB)) + .isTrue(); + } + @Test public void testAddNotNullColumn() { createTable("testAddNotNullColumn"); From 5f559ec8bf306f0e12385c562ae837b5647d5b9f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=96=86=E5=AE=87?= Date: Fri, 22 May 2026 11:42:29 +0800 Subject: [PATCH 4/6] optim docs --- docs/docs/append-table/blob.mdx | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/docs/docs/append-table/blob.mdx b/docs/docs/append-table/blob.mdx index 0cbbaa821c3c..1c0197916257 100644 --- a/docs/docs/append-table/blob.mdx +++ b/docs/docs/append-table/blob.mdx @@ -106,7 +106,7 @@ This allows one table to mix raw-data BLOB fields, descriptor-only BLOB fields, No (none) String - Specifies column names that should be stored as blob type. This is used when you want to treat a BYTES column as a BLOB. Fields listed in blob-descriptor-field, blob-view-field, or blob-external-storage-field are also treated as BLOB fields. + Specifies column names that should be stored as blob type. This is used when you want to treat a BYTES column as a BLOB. Fields listed in blob-descriptor-field, blob-view-field are also treated as BLOB fields.
blob-as-descriptor
@@ -303,15 +303,7 @@ because existing fields cannot be newly configured as blob fields: ```sql ALTER TABLE image_table ADD image BYTES; -- or ADD COLUMN image BINARY in Spark SQL -ALTER TABLE image_table SET ('blob-field' = 'image'); -``` - -For descriptor BLOB columns, set `blob-descriptor-field` before adding the column. The following example uses Flink -SQL; in Spark SQL, use `SET TBLPROPERTIES` and `ADD COLUMN image BINARY`. - -```sql -ALTER TABLE image_table SET ('blob-descriptor-field' = 'image'); -ALTER TABLE image_table ADD image BYTES; +ALTER TABLE image_table SET ('blob-field' = 'image'); -- error, existing column cannot be configured as blob ``` When using the Paimon API, you can add a BLOB column directly because Paimon has a native `BLOB` type: From c6feff54ab72f7bc976165a1d64eec2063e14bb4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=96=86=E5=AE=87?= Date: Fri, 22 May 2026 11:43:54 +0800 Subject: [PATCH 5/6] fix docs --- docs/docs/append-table/blob.mdx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/docs/append-table/blob.mdx b/docs/docs/append-table/blob.mdx index 1c0197916257..2f95dfb8a450 100644 --- a/docs/docs/append-table/blob.mdx +++ b/docs/docs/append-table/blob.mdx @@ -106,7 +106,7 @@ This allows one table to mix raw-data BLOB fields, descriptor-only BLOB fields, No (none) String - Specifies column names that should be stored as blob type. This is used when you want to treat a BYTES column as a BLOB. Fields listed in blob-descriptor-field, blob-view-field are also treated as BLOB fields. + Specifies column names that should be stored as blob type. This is used when you want to treat a BYTES column as a BLOB. Fields listed in blob-descriptor-field or blob-view-field are also treated as BLOB fields.
blob-as-descriptor
From 9e95745da824608369fb4476ad22aa32354994bc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=96=86=E5=AE=87?= Date: Fri, 22 May 2026 11:57:00 +0800 Subject: [PATCH 6/6] fix docs --- docs/docs/append-table/blob.mdx | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/docs/docs/append-table/blob.mdx b/docs/docs/append-table/blob.mdx index 2f95dfb8a450..ab938c2a1ad0 100644 --- a/docs/docs/append-table/blob.mdx +++ b/docs/docs/append-table/blob.mdx @@ -263,9 +263,10 @@ column (`BYTES` in Flink SQL, `BINARY` in Spark SQL) and configure the blob fiel column. Currently, `ALTER TABLE` supports adding BLOB columns through `blob-field` or `blob-descriptor-field`. `blob-view-field` and `blob-external-storage-field` are immutable and should be configured when creating the table. -{{< tabs "blob-add-column" >}} + + + -{{< tab "Flink SQL" >}} ```sql CREATE TABLE image_table ( id INT, @@ -278,9 +279,11 @@ CREATE TABLE image_table ( ALTER TABLE image_table SET ('blob-field' = 'image'); ALTER TABLE image_table ADD image BYTES; ``` -{{< /tab >}} -{{< tab "Spark SQL" >}} + + + + ```sql CREATE TABLE image_table ( id INT, @@ -293,9 +296,10 @@ CREATE TABLE image_table ( ALTER TABLE image_table SET TBLPROPERTIES ('blob-field' = 'image'); ALTER TABLE image_table ADD COLUMN image BINARY; ``` -{{< /tab >}} -{{< /tabs >}} + + + Do not add the binary column first and set `blob-field` later. For example, the following order creates a normal `BYTES` / `BINARY` column. The later option change will not convert the existing column to BLOB, and may be rejected