diff --git a/docs/docs/append-table/blob.mdx b/docs/docs/append-table/blob.mdx
index 4f0ce604acea..ab938c2a1ad0 100644
--- a/docs/docs/append-table/blob.mdx
+++ b/docs/docs/append-table/blob.mdx
@@ -256,6 +256,69 @@ CREATE TABLE image_table (
+### Adding a Blob Column to an Existing Table
+
+Flink SQL and Spark SQL do not expose Paimon's `BLOB` type directly. To add a BLOB column with SQL, add a binary
+column (`BYTES` in Flink SQL, `BINARY` in Spark SQL) and configure the blob field option before adding the physical
+column. Currently, `ALTER TABLE` supports adding BLOB columns through `blob-field` or `blob-descriptor-field`.
+`blob-view-field` and `blob-external-storage-field` are immutable and should be configured when creating the table.
+
+
+
+
+
+```sql
+CREATE TABLE image_table (
+ id INT,
+ name STRING
+) WITH (
+ 'row-tracking.enabled' = 'true',
+ 'data-evolution.enabled' = 'true'
+);
+
+ALTER TABLE image_table SET ('blob-field' = 'image');
+ALTER TABLE image_table ADD image BYTES;
+```
+
+
+
+
+
+```sql
+CREATE TABLE image_table (
+ id INT,
+ name STRING
+) TBLPROPERTIES (
+ 'row-tracking.enabled' = 'true',
+ 'data-evolution.enabled' = 'true'
+);
+
+ALTER TABLE image_table SET TBLPROPERTIES ('blob-field' = 'image');
+ALTER TABLE image_table ADD COLUMN image BINARY;
+```
+
+
+
+
+
+Do not add the binary column first and set `blob-field` later. For example, the following order creates a normal
+`BYTES` / `BINARY` column. The later option change will not convert the existing column to BLOB, and may be rejected
+because existing fields cannot be newly configured as blob fields:
+
+```sql
+ALTER TABLE image_table ADD image BYTES; -- or ADD COLUMN image BINARY in Spark SQL
+ALTER TABLE image_table SET ('blob-field' = 'image'); -- error, existing column cannot be configured as blob
+```
+
+When using the Paimon API, you can add a BLOB column directly because Paimon has a native `BLOB` type:
+
+```java
+catalog.alterTable(
+ identifier,
+ Collections.singletonList(SchemaChange.addColumn("image", DataTypes.BLOB())),
+ false);
+```
+
### Inserting Blob Data
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index d5a1bb7fb067..63c4715c39d9 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -2299,7 +2299,6 @@ public InlineElement getDescription() {
+ "Fields listed in blob-descriptor-field or blob-view-field "
+ "are also treated as BLOB fields.");
- @Immutable
public static final ConfigOption BLOB_DESCRIPTOR_FIELD =
key("blob-descriptor-field")
.stringType()
diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index 670960889d5c..66d47a27ebc8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -317,6 +317,8 @@ public static TableSchema generateTableSchema(
for (SchemaChange change : changes) {
if (change instanceof SetOption) {
SetOption setOption = (SetOption) change;
+ checkAlterBlobFieldOption(
+ oldTableSchema, oldOptions, setOption.key(), setOption.value());
if (hasSnapshots.get()) {
checkAlterTableOption(
oldOptions,
@@ -327,6 +329,10 @@ public static TableSchema generateTableSchema(
newOptions.put(setOption.key(), setOption.value());
} else if (change instanceof RemoveOption) {
RemoveOption removeOption = (RemoveOption) change;
+ if (isBlobFieldOption(removeOption.key())) {
+ throw new UnsupportedOperationException(
+ "Cannot remove blob field option: " + removeOption.key());
+ }
if (hasSnapshots.get()) {
checkResetTableOption(oldOptions, removeOption.key());
}
@@ -1283,6 +1289,80 @@ public static void checkResetTableOption(Map options, String key
}
}
+ /**
+ * Check alter blob field option. Now we only allow adding non-existing new fields as BLOB
+ * fields, and removing existing BLOB fields is not allowed.
+ *
+ * @param oldTableSchema table schema
+ * @param oldOptions old table options
+ * @param key altering key
+ * @param value altering value
+ */
+ private static void checkAlterBlobFieldOption(
+ TableSchema oldTableSchema, Map oldOptions, String key, String value) {
+ if (!isBlobFieldOption(key)) {
+ return;
+ }
+
+ Map newOptions = new HashMap<>(oldOptions);
+ if (value == null) {
+ newOptions.remove(key);
+ } else {
+ newOptions.put(key, value);
+ }
+
+ Set oldFields = getBlobFields(oldOptions, key);
+ Set newFields = getBlobFields(newOptions, key);
+
+ // 1. do not allow removing existing blob fields
+ Set removals = new HashSet<>(oldFields);
+ removals.removeAll(newFields);
+ for (String fieldName : oldTableSchema.fieldNames()) {
+ if (removals.contains(fieldName)) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Cannot remove an existing field '%s' from 'blob-field'.",
+ fieldName));
+ }
+ }
+
+ // 2. do not allow adding existing fields as BLOB fields
+ Set additions = new HashSet<>(newFields);
+ additions.removeAll(oldFields);
+ Set existingFields = new HashSet<>(oldTableSchema.fieldNames());
+ for (String field : additions) {
+ if (existingFields.contains(field)) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Cannot configure existing field '%s' as a BLOB field. "
+ + "BLOB fields can only be added as new columns. "
+ + "If you are using SQL to ADD a blob column, set '%s' "
+ + "before adding the binary column.",
+ field, key));
+ }
+ }
+ }
+
+ private static boolean isBlobFieldOption(String key) {
+ return CoreOptions.BLOB_FIELD.key().equals(key)
+ || CoreOptions.BLOB_DESCRIPTOR_FIELD.key().equals(key)
+ || CoreOptions.BLOB_VIEW_FIELD.key().equals(key)
+ || CoreOptions.BLOB_EXTERNAL_STORAGE_FIELD.key().equals(key);
+ }
+
+ private static Set getBlobFields(Map options, String key) {
+ if (CoreOptions.BLOB_FIELD.key().equals(key)) {
+ return new HashSet<>(CoreOptions.blobField(options));
+ } else if (CoreOptions.BLOB_DESCRIPTOR_FIELD.key().equals(key)) {
+ return new CoreOptions(options).blobDescriptorField();
+ } else if (CoreOptions.BLOB_VIEW_FIELD.key().equals(key)) {
+ return new CoreOptions(options).blobViewField();
+ } else if (CoreOptions.BLOB_EXTERNAL_STORAGE_FIELD.key().equals(key)) {
+ return new CoreOptions(options).blobExternalStorageField();
+ }
+ throw new IllegalArgumentException("Unknown blob field option: " + key);
+ }
+
public static void checkAlterTablePath(String key) {
if (CoreOptions.PATH.key().equalsIgnoreCase(key)) {
throw new UnsupportedOperationException("Change path is not supported yet.");
diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index 4ffc3ec0259e..ae02871b9259 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -166,6 +166,9 @@ public static void validateTableSchema(TableSchema schema) {
FileFormat fileFormat =
FileFormat.fromIdentifier(options.formatType(), new Options(schema.options()));
RowType tableRowType = new RowType(schema.fields());
+
+ // Validate blob-related fields
+ // Now we support configuring blob fields for future columns.
validateBlobFields(tableRowType, options);
Set blobDescriptorFields = validateBlobDescriptorFields(tableRowType, options);
Set blobViewFields =
@@ -795,17 +798,14 @@ private static void validateRowTracking(TableSchema schema, CoreOptions options)
}
private static void validateBlobFields(RowType rowType, CoreOptions options) {
- Set blobFieldNames =
- rowType.getFields().stream()
- .filter(field -> field.type().getTypeRoot() == DataTypeRoot.BLOB)
- .map(DataField::name)
- .collect(Collectors.toCollection(HashSet::new));
+ Set fieldNames = fieldNames(rowType);
+ Set blobFieldNames = blobFieldNames(rowType);
Set configured =
CoreOptions.blobField(options.toMap()).stream()
.collect(Collectors.toCollection(HashSet::new));
for (String field : configured) {
checkArgument(
- blobFieldNames.contains(field),
+ !fieldNames.contains(field) || blobFieldNames.contains(field),
"Field '%s' in '%s' must be a BLOB field in table schema.",
field,
CoreOptions.BLOB_FIELD.key());
@@ -813,15 +813,12 @@ private static void validateBlobFields(RowType rowType, CoreOptions options) {
}
private static Set validateBlobDescriptorFields(RowType rowType, CoreOptions options) {
- Set blobFieldNames =
- rowType.getFields().stream()
- .filter(field -> field.type().getTypeRoot() == DataTypeRoot.BLOB)
- .map(DataField::name)
- .collect(Collectors.toCollection(HashSet::new));
+ Set fieldNames = fieldNames(rowType);
+ Set blobFieldNames = blobFieldNames(rowType);
Set configured = options.blobDescriptorField();
for (String field : configured) {
checkArgument(
- blobFieldNames.contains(field),
+ !fieldNames.contains(field) || blobFieldNames.contains(field),
"Field '%s' in '%s' must be a BLOB field in table schema.",
field,
CoreOptions.BLOB_DESCRIPTOR_FIELD.key());
@@ -831,15 +828,12 @@ private static Set validateBlobDescriptorFields(RowType rowType, CoreOpt
private static Set validateBlobViewFields(
RowType rowType, CoreOptions options, Set blobDescriptorFields) {
- Set blobFieldNames =
- rowType.getFields().stream()
- .filter(field -> field.type().getTypeRoot() == DataTypeRoot.BLOB)
- .map(DataField::name)
- .collect(Collectors.toCollection(HashSet::new));
+ Set fieldNames = fieldNames(rowType);
+ Set blobFieldNames = blobFieldNames(rowType);
Set configured = options.blobViewField();
for (String field : configured) {
checkArgument(
- blobFieldNames.contains(field),
+ !fieldNames.contains(field) || blobFieldNames.contains(field),
"Field '%s' in '%s' must be a BLOB field in table schema.",
field,
CoreOptions.BLOB_VIEW_FIELD.key());
@@ -855,15 +849,12 @@ private static Set validateBlobViewFields(
private static void validateBlobExternalStorageFields(
RowType rowType, CoreOptions options, Set blobDescriptorFields) {
- Set blobFieldNames =
- rowType.getFields().stream()
- .filter(field -> field.type().getTypeRoot() == DataTypeRoot.BLOB)
- .map(DataField::name)
- .collect(Collectors.toCollection(HashSet::new));
+ Set fieldNames = fieldNames(rowType);
+ Set blobFieldNames = blobFieldNames(rowType);
Set configured = options.blobExternalStorageField();
for (String field : configured) {
checkArgument(
- blobFieldNames.contains(field),
+ !fieldNames.contains(field) || blobFieldNames.contains(field),
"Field '%s' in '%s' must be a BLOB field in table schema.",
field,
CoreOptions.BLOB_EXTERNAL_STORAGE_FIELD.key());
@@ -884,6 +875,19 @@ private static void validateBlobExternalStorageFields(
}
}
+ private static Set fieldNames(RowType rowType) {
+ return rowType.getFields().stream()
+ .map(DataField::name)
+ .collect(Collectors.toCollection(HashSet::new));
+ }
+
+ private static Set blobFieldNames(RowType rowType) {
+ return rowType.getFields().stream()
+ .filter(field -> field.type().getTypeRoot() == DataTypeRoot.BLOB)
+ .map(DataField::name)
+ .collect(Collectors.toCollection(HashSet::new));
+ }
+
private static void validateIncrementalClustering(TableSchema schema, CoreOptions options) {
if (options.clusteringIncrementalEnabled()) {
checkArgument(
diff --git a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
index 8b118e05621d..28e677108c8d 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
@@ -608,6 +608,94 @@ public void testRenameSeqGroupFields() throws Exception {
assertThat(newSchema.options()).contains(entries);
}
+ @Test
+ public void testAlterAddBlobFieldOptionBeforeAddingColumn() throws Exception {
+ Schema schema =
+ new Schema(
+ RowType.of(DataTypes.INT(), DataTypes.STRING()).getFields(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ optionsForBlobTable(),
+ "");
+ schemaManager.createTable(schema);
+
+ TableSchema schemaWithOption =
+ schemaManager.commitChanges(
+ SchemaChange.setOption(CoreOptions.BLOB_FIELD.key(), "picture"));
+ assertThat(schemaWithOption.fieldNames()).doesNotContain("picture");
+
+ TableSchema schemaWithBlobColumn =
+ schemaManager.commitChanges(SchemaChange.addColumn("picture", DataTypes.BLOB()));
+ assertThat(schemaWithBlobColumn.fields().get(2).type()).isEqualTo(DataTypes.BLOB());
+ }
+
+ @Test
+ public void testAlterBlobFieldOptionCannotConvertExistingColumn() throws Exception {
+ Schema schema =
+ new Schema(
+ RowType.of(DataTypes.INT(), DataTypes.BYTES()).getFields(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ optionsForBlobTable(),
+ "");
+ schemaManager.createTable(schema);
+
+ assertThatThrownBy(
+ () ->
+ schemaManager.commitChanges(
+ SchemaChange.setOption(CoreOptions.BLOB_FIELD.key(), "f1")))
+ .isInstanceOf(UnsupportedOperationException.class)
+ .hasMessageContaining("Cannot configure existing field 'f1' as a BLOB field.")
+ .hasMessageContaining("set 'blob-field' before adding the binary column");
+ }
+
+ @Test
+ public void testAlterBlobFieldOptionCanRemoveNonExistingField() throws Exception {
+ Map options = optionsForBlobTable();
+ options.put(CoreOptions.BLOB_FIELD.key(), "picture");
+ Schema schema =
+ new Schema(
+ RowType.of(DataTypes.INT(), DataTypes.STRING()).getFields(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ options,
+ "");
+ schemaManager.createTable(schema);
+
+ TableSchema newSchema =
+ schemaManager.commitChanges(
+ SchemaChange.setOption(CoreOptions.BLOB_FIELD.key(), "avatar"));
+ assertThat(CoreOptions.blobField(newSchema.options())).containsExactly("avatar");
+ }
+
+ @Test
+ public void testAlterBlobFieldOptionCannotRemoveExistingField() throws Exception {
+ Map options = optionsForBlobTable();
+ options.put(CoreOptions.BLOB_FIELD.key(), "f1");
+ Schema schema =
+ new Schema(
+ RowType.of(DataTypes.INT(), DataTypes.BLOB()).getFields(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ options,
+ "");
+ schemaManager.createTable(schema);
+
+ assertThatThrownBy(
+ () ->
+ schemaManager.commitChanges(
+ SchemaChange.setOption(CoreOptions.BLOB_FIELD.key(), "")))
+ .isInstanceOf(UnsupportedOperationException.class)
+ .hasMessageContaining("Cannot remove an existing field 'f1' from 'blob-field'.");
+ }
+
+ private Map optionsForBlobTable() {
+ Map options = new HashMap<>();
+ options.put(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
+ options.put(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
+ return options;
+ }
+
private List readRecords(FileStoreTable table, Predicate filter) throws IOException {
List results = new ArrayList<>();
forEachRemaining(
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index cae85c238621..04ed72202c9b 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -47,6 +47,7 @@
import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.table.source.DataTableScan;
import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.types.DataType;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.InternalRowPartitionComputer;
@@ -566,20 +567,23 @@ protected Schema buildPaimonSchema(CatalogBaseTable catalogTable, Map toSchemaChange(
TableChange change,
Map oldTableNonPhysicalColumnIndex,
- @Nullable String primaryKeyConstraintName) {
+ @Nullable String primaryKeyConstraintName,
+ Set blobTypeFields,
+ Map options) {
List schemaChanges = new ArrayList<>();
if (change instanceof AddColumn) {
if (((AddColumn) change).getColumn().isPhysical()) {
AddColumn add = (AddColumn) change;
String comment = add.getColumn().getComment().orElse(null);
SchemaChange.Move move = getMove(add.getPosition(), add.getColumn().getName());
- schemaChanges.add(
- SchemaChange.addColumn(
+ DataType type =
+ resolveDataType(
add.getColumn().getName(),
- LogicalTypeConversion.toDataType(
- add.getColumn().getDataType().getLogicalType()),
- comment,
- move));
+ add.getColumn().getDataType().getLogicalType(),
+ options,
+ blobTypeFields);
+ schemaChanges.add(
+ SchemaChange.addColumn(add.getColumn().getName(), type, comment, move));
}
return schemaChanges;
} else if (change instanceof AddWatermark) {
@@ -847,7 +851,9 @@ public void alterTable(
toSchemaChange(
tableChange,
oldTableNonPhysicalColumnIndex,
- primaryKeyConstraintName)
+ primaryKeyConstraintName,
+ blobTypeFields(newTable.getOptions()),
+ newTable.getOptions())
.stream())
.collect(Collectors.toList());
changes.addAll(schemaChanges);
@@ -1133,8 +1139,10 @@ public static Schema fromCatalogTable(CatalogBaseTable catalogTable) {
private static Set blobTypeFields(Map options) {
Set blobTypeFields = new HashSet<>(CoreOptions.blobField(options));
- blobTypeFields.addAll(new CoreOptions(options).blobDescriptorField());
+ CoreOptions coreOptions = new CoreOptions(options);
+ blobTypeFields.addAll(coreOptions.blobDescriptorField());
blobTypeFields.addAll(CoreOptions.blobViewField(options));
+ blobTypeFields.addAll(coreOptions.blobExternalStorageField());
return blobTypeFields;
}
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
index d51c1d006ac2..092c22e5226f 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
@@ -19,15 +19,24 @@
package org.apache.paimon.flink;
import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.types.DataTypeRoot;
import org.apache.paimon.utils.DateTimeUtils;
+import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.TableChange;
import org.apache.flink.types.Row;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import java.time.format.DateTimeFormatter;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -110,6 +119,82 @@ public void testAddColumn() {
+ " +I[ggg, hhh, 23.43, 6, 2.34, 34, 23, true]]");
}
+ @Test
+ public void testAlterAddBlobColumn() throws Exception {
+ sql(
+ "CREATE TABLE T_BLOB (id INT, data STRING) WITH ("
+ + "'row-tracking.enabled'='true', "
+ + "'data-evolution.enabled'='true')");
+ sql("INSERT INTO T_BLOB VALUES (1, 'old')");
+
+ sql("ALTER TABLE T_BLOB SET ('blob-field'='picture')");
+ sql("ALTER TABLE T_BLOB ADD picture BYTES");
+
+ assertThat(paimonTable("T_BLOB").rowType().getField("picture").type().is(DataTypeRoot.BLOB))
+ .isTrue();
+ sql("INSERT INTO T_BLOB VALUES (2, 'new', X'4869')");
+ List result = sql("SELECT * FROM T_BLOB ORDER BY id");
+ assertThat(result.get(0).getField(2)).isNull();
+ assertThat((byte[]) result.get(1).getField(2)).containsExactly((byte) 72, (byte) 105);
+ }
+
+ @Test
+ public void testAlterAddBlobDescriptorColumn() throws Exception {
+ sql(
+ "CREATE TABLE T_BLOB_DESCRIPTOR (id INT, data STRING) WITH ("
+ + "'row-tracking.enabled'='true', "
+ + "'data-evolution.enabled'='true')");
+
+ sql("ALTER TABLE T_BLOB_DESCRIPTOR SET ('blob-descriptor-field'='picture')");
+ sql("ALTER TABLE T_BLOB_DESCRIPTOR ADD picture BYTES");
+
+ assertThat(
+ paimonTable("T_BLOB_DESCRIPTOR")
+ .rowType()
+ .getField("picture")
+ .type()
+ .is(DataTypeRoot.BLOB))
+ .isTrue();
+ }
+
+ @Test
+ public void testAlterAddBlobColumnWithCombinedTableChanges() throws Exception {
+ sql(
+ "CREATE TABLE T_BLOB_COMBINED (id INT, data STRING) WITH ("
+ + "'row-tracking.enabled'='true', "
+ + "'data-evolution.enabled'='true')");
+
+ CatalogTable table = table("T_BLOB_COMBINED");
+ Map newOptions = new HashMap<>(table.getOptions());
+ newOptions.put("blob-field", "picture");
+
+ CatalogTable newTable =
+ new ResolvedCatalogTable(
+ table.copy(newOptions),
+ ResolvedSchema.physical(
+ new String[] {"id", "data", "picture"},
+ new org.apache.flink.table.types.DataType[] {
+ DataTypes.INT(), DataTypes.STRING(), DataTypes.BYTES()
+ }));
+
+ flinkCatalog()
+ .alterTable(
+ new ObjectPath(tEnv.getCurrentDatabase(), "T_BLOB_COMBINED"),
+ newTable,
+ Arrays.asList(
+ TableChange.set("blob-field", "picture"),
+ TableChange.add(Column.physical("picture", DataTypes.BYTES()))),
+ false);
+
+ assertThat(
+ paimonTable("T_BLOB_COMBINED")
+ .rowType()
+ .getField("picture")
+ .type()
+ .is(DataTypeRoot.BLOB))
+ .isTrue();
+ }
+
@Test
public void testDropColumn() {
sql(
diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
index 451a5fae26ad..0e1f5ba246dc 100644
--- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
+++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
@@ -38,6 +38,7 @@
import org.apache.paimon.spark.catalog.functions.V1FunctionConverter;
import org.apache.paimon.spark.utils.CatalogUtils;
import org.apache.paimon.table.FormatTable;
+import org.apache.paimon.table.Table;
import org.apache.paimon.table.iceberg.IcebergTable;
import org.apache.paimon.table.lance.LanceTable;
import org.apache.paimon.table.object.ObjectTable;
@@ -85,6 +86,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -346,9 +348,15 @@ public SparkTable loadTable(Identifier ident, long timestamp) throws NoSuchTable
@Override
public org.apache.spark.sql.connector.catalog.Table alterTable(
Identifier ident, TableChange... changes) throws NoSuchTableException {
- List schemaChanges =
- Arrays.stream(changes).map(this::toSchemaChange).collect(Collectors.toList());
try {
+ Table table = catalog.getTable(toIdentifier(ident, catalogName));
+ // We have to get the new table options by merging current options with changes,
+ // in case that altering blob fields and adding blob columns come together
+ Map newOptions = newTableOptions(table.options(), changes);
+ List schemaChanges = new ArrayList<>();
+ for (TableChange change : changes) {
+ schemaChanges.add(toSchemaChange(change, newOptions));
+ }
catalog.alterTable(toIdentifier(ident, catalogName), schemaChanges, false);
return loadTable(ident);
} catch (Catalog.TableNotExistException e) {
@@ -471,7 +479,7 @@ private StagedTable stageReplaceByDropAndCreate(
return new RollbackStagedTable(loadTable(ident), () -> {});
}
- private SchemaChange toSchemaChange(TableChange change) {
+ private SchemaChange toSchemaChange(TableChange change, Map newOptions) {
if (change instanceof TableChange.SetProperty) {
TableChange.SetProperty set = (TableChange.SetProperty) change;
validateAlterProperty(set.property());
@@ -494,7 +502,7 @@ private SchemaChange toSchemaChange(TableChange change) {
checkNoDefaultValue(add);
return SchemaChange.addColumn(
add.fieldNames(),
- toPaimonType(add.dataType()).copy(add.isNullable()),
+ resolveDataType(add, newOptions).copy(add.isNullable()),
add.comment(),
move);
} else if (change instanceof TableChange.RenameColumn) {
@@ -526,6 +534,46 @@ private SchemaChange toSchemaChange(TableChange change) {
}
}
+ private static Map newTableOptions(
+ Map currentOptions, TableChange[] changes) {
+ Map options = new HashMap<>(currentOptions);
+ for (TableChange change : changes) {
+ if (change instanceof TableChange.SetProperty) {
+ TableChange.SetProperty set = (TableChange.SetProperty) change;
+ if (!set.property().equals(TableCatalog.PROP_COMMENT)) {
+ options.put(set.property(), set.value());
+ }
+ } else if (change instanceof TableChange.RemoveProperty) {
+ TableChange.RemoveProperty remove = (TableChange.RemoveProperty) change;
+ if (!remove.property().equals(TableCatalog.PROP_COMMENT)) {
+ options.remove(remove.property());
+ }
+ }
+ }
+ return options;
+ }
+
+ private static DataType resolveDataType(
+ TableChange.AddColumn add, Map finalOptions) {
+ if (add.fieldNames().length == 1
+ && blobTypeFields(finalOptions).contains(add.fieldNames()[0])) {
+ checkArgument(
+ add.dataType() instanceof org.apache.spark.sql.types.BinaryType,
+ "The type of blob field must be binary");
+ return new BlobType().copy(add.isNullable());
+ }
+ return toPaimonType(add.dataType()).copy(add.isNullable());
+ }
+
+ private static Set blobTypeFields(Map options) {
+ Set blobTypeFields = new HashSet<>(CoreOptions.blobField(options));
+ CoreOptions coreOptions = new CoreOptions(options);
+ blobTypeFields.addAll(coreOptions.blobDescriptorField());
+ blobTypeFields.addAll(coreOptions.blobViewField());
+ blobTypeFields.addAll(coreOptions.blobExternalStorageField());
+ return blobTypeFields;
+ }
+
private static SchemaChange.Move getMove(
TableChange.ColumnPosition columnPosition, String[] fieldNames) {
SchemaChange.Move move = null;
@@ -560,9 +608,7 @@ private StagedTable stageCreateDirectly(
private Schema toInitialSchema(
StructType schema, Transform[] partitions, Map properties) {
Map normalizedProperties = new HashMap<>(properties);
- List blobFields = CoreOptions.blobField(properties);
- Set blobDescriptorFields = new CoreOptions(properties).blobDescriptorField();
- List blobViewFields = CoreOptions.blobViewField(properties);
+ Set blobTypeFields = blobTypeFields(properties);
String provider = properties.get(TableCatalog.PROP_PROVIDER);
if (!usePaimon(provider)) {
if (isFormatTable(provider)) {
@@ -596,9 +642,7 @@ private Schema toInitialSchema(
for (StructField field : schema.fields()) {
String name = field.name();
DataType type;
- if (blobFields.contains(name)
- || blobDescriptorFields.contains(name)
- || blobViewFields.contains(name)) {
+ if (blobTypeFields.contains(name)) {
checkArgument(
field.dataType() instanceof org.apache.spark.sql.types.BinaryType,
"The type of blob field must be binary");
diff --git a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java
index fc8b4adb6ebd..73d1f2ec6792 100644
--- a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java
+++ b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java
@@ -18,11 +18,14 @@
package org.apache.paimon.spark;
+import org.apache.paimon.types.DataTypeRoot;
import org.apache.paimon.utils.StringUtils;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableChange;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
@@ -95,6 +98,79 @@ public void testAddColumn() {
.isEqualTo("[[1,2,1,null], [5,6,3,null]]");
}
+ @Test
+ public void testAlterAddBlobColumn() {
+ spark.sql(
+ "CREATE TABLE testAlterAddBlobColumn (id INT, data STRING) "
+ + "TBLPROPERTIES ("
+ + "'row-tracking.enabled'='true', "
+ + "'data-evolution.enabled'='true')");
+ spark.sql("INSERT INTO testAlterAddBlobColumn VALUES (1, 'old')");
+
+ spark.sql(
+ "ALTER TABLE testAlterAddBlobColumn "
+ + "SET TBLPROPERTIES ('blob-field'='picture')");
+ spark.sql("ALTER TABLE testAlterAddBlobColumn ADD COLUMN picture BINARY");
+ spark.sql("INSERT INTO testAlterAddBlobColumn VALUES (2, 'new', X'4869')");
+
+ assertThat(
+ getTable("testAlterAddBlobColumn")
+ .rowType()
+ .getField("picture")
+ .type()
+ .is(DataTypeRoot.BLOB))
+ .isTrue();
+ List rows =
+ spark.sql("SELECT * FROM testAlterAddBlobColumn ORDER BY id").collectAsList();
+ assertThat(rows.get(0).get(2)).isNull();
+ assertThat((byte[]) rows.get(1).get(2)).containsExactly((byte) 72, (byte) 105);
+ }
+
+ @Test
+ public void testAlterAddBlobDescriptorColumn() {
+ spark.sql(
+ "CREATE TABLE testAlterAddBlobDescriptorColumn (id INT, data STRING) "
+ + "TBLPROPERTIES ("
+ + "'row-tracking.enabled'='true', "
+ + "'data-evolution.enabled'='true')");
+
+ spark.sql(
+ "ALTER TABLE testAlterAddBlobDescriptorColumn "
+ + "SET TBLPROPERTIES ('blob-descriptor-field'='picture')");
+ spark.sql("ALTER TABLE testAlterAddBlobDescriptorColumn ADD COLUMN picture BINARY");
+
+ assertThat(
+ getTable("testAlterAddBlobDescriptorColumn")
+ .rowType()
+ .getField("picture")
+ .type()
+ .is(DataTypeRoot.BLOB))
+ .isTrue();
+ }
+
+ @Test
+ public void testAlterAddBlobColumnWithCombinedTableChanges() throws Exception {
+ String tableName = "testAlterAddBlobColumnWithCombinedTableChanges";
+ spark.sql(
+ "CREATE TABLE "
+ + tableName
+ + " (id INT, data STRING) "
+ + "TBLPROPERTIES ("
+ + "'row-tracking.enabled'='true', "
+ + "'data-evolution.enabled'='true')");
+
+ ((SparkCatalog) spark.sessionState().catalogManager().currentCatalog())
+ .alterTable(
+ Identifier.of(new String[] {"default"}, tableName),
+ TableChange.setProperty("blob-field", "picture"),
+ TableChange.addColumn(
+ new String[] {"picture"},
+ org.apache.spark.sql.types.DataTypes.BinaryType));
+
+ assertThat(getTable(tableName).rowType().getField("picture").type().is(DataTypeRoot.BLOB))
+ .isTrue();
+ }
+
@Test
public void testAddNotNullColumn() {
createTable("testAddNotNullColumn");