Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 59 additions & 0 deletions docs/docs/append-table/blob.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,65 @@ CREATE TABLE image_table (

</Tabs>

### Adding a Blob Column to an Existing Table

Flink SQL and Spark SQL do not expose Paimon's `BLOB` type directly. To add a BLOB column with SQL, add a binary
column (`BYTES` in Flink SQL, `BINARY` in Spark SQL) and configure the blob field option before adding the physical
column. Currently, `ALTER TABLE` supports adding BLOB columns through `blob-field` or `blob-descriptor-field`.
`blob-view-field` and `blob-external-storage-field` are immutable and should be configured when creating the table.

{{< tabs "blob-add-column" >}}

{{< tab "Flink SQL" >}}
```sql
CREATE TABLE image_table (
id INT,
name STRING
) WITH (
'row-tracking.enabled' = 'true',
'data-evolution.enabled' = 'true'
);

ALTER TABLE image_table SET ('blob-field' = 'image');
ALTER TABLE image_table ADD image BYTES;
```
{{< /tab >}}

{{< tab "Spark SQL" >}}
```sql
CREATE TABLE image_table (
id INT,
name STRING
) TBLPROPERTIES (
'row-tracking.enabled' = 'true',
'data-evolution.enabled' = 'true'
);

ALTER TABLE image_table SET TBLPROPERTIES ('blob-field' = 'image');
ALTER TABLE image_table ADD COLUMN image BINARY;
```
{{< /tab >}}

{{< /tabs >}}

Do not add the binary column first and set `blob-field` later. For example, the following order creates a normal
`BYTES` / `BINARY` column. The later option change will not convert the existing column to BLOB, and may be rejected
because existing fields cannot be newly configured as blob fields:

```sql
ALTER TABLE image_table ADD image BYTES; -- or ADD COLUMN image BINARY in Spark SQL
ALTER TABLE image_table SET ('blob-field' = 'image'); -- error, existing column cannot be configured as blob
```

When using the Paimon API, you can add a BLOB column directly because Paimon has a native `BLOB` type:

```java
catalog.alterTable(
identifier,
Collections.singletonList(SchemaChange.addColumn("image", DataTypes.BLOB())),
false);
```

### Inserting Blob Data

<Tabs groupId="blob-insert">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2299,7 +2299,6 @@ public InlineElement getDescription() {
+ "Fields listed in blob-descriptor-field or blob-view-field "
+ "are also treated as BLOB fields.");

@Immutable
public static final ConfigOption<String> BLOB_DESCRIPTOR_FIELD =
key("blob-descriptor-field")
.stringType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,8 @@ public static TableSchema generateTableSchema(
for (SchemaChange change : changes) {
if (change instanceof SetOption) {
SetOption setOption = (SetOption) change;
checkAlterBlobFieldOption(
oldTableSchema, oldOptions, setOption.key(), setOption.value());
if (hasSnapshots.get()) {
checkAlterTableOption(
oldOptions,
Expand All @@ -327,6 +329,10 @@ public static TableSchema generateTableSchema(
newOptions.put(setOption.key(), setOption.value());
} else if (change instanceof RemoveOption) {
RemoveOption removeOption = (RemoveOption) change;
if (isBlobFieldOption(removeOption.key())) {
throw new UnsupportedOperationException(
"Cannot remove blob field option: " + removeOption.key());
}
if (hasSnapshots.get()) {
checkResetTableOption(oldOptions, removeOption.key());
}
Expand Down Expand Up @@ -1283,6 +1289,80 @@ public static void checkResetTableOption(Map<String, String> options, String key
}
}

/**
* Check alter blob field option. Now we only allow adding non-existing new fields as BLOB
* fields, and removing existing BLOB fields is not allowed.
*
* @param oldTableSchema table schema
* @param oldOptions old table options
* @param key altering key
* @param value altering value
*/
private static void checkAlterBlobFieldOption(
TableSchema oldTableSchema, Map<String, String> oldOptions, String key, String value) {
if (!isBlobFieldOption(key)) {
return;
}

Map<String, String> newOptions = new HashMap<>(oldOptions);
if (value == null) {
newOptions.remove(key);
} else {
newOptions.put(key, value);
}

Set<String> oldFields = getBlobFields(oldOptions, key);
Set<String> newFields = getBlobFields(newOptions, key);

// 1. do not allow removing existing blob fields
Set<String> removals = new HashSet<>(oldFields);
removals.removeAll(newFields);
for (String fieldName : oldTableSchema.fieldNames()) {
if (removals.contains(fieldName)) {
throw new UnsupportedOperationException(
String.format(
"Cannot remove an existing field '%s' from 'blob-field'.",
fieldName));
}
}

// 2. do not allow adding existing fields as BLOB fields
Set<String> additions = new HashSet<>(newFields);
additions.removeAll(oldFields);
Set<String> existingFields = new HashSet<>(oldTableSchema.fieldNames());
for (String field : additions) {
if (existingFields.contains(field)) {
throw new UnsupportedOperationException(
String.format(
"Cannot configure existing field '%s' as a BLOB field. "
+ "BLOB fields can only be added as new columns. "
+ "If you are using SQL to ADD a blob column, set '%s' "
+ "before adding the binary column.",
field, key));
}
}
}

private static boolean isBlobFieldOption(String key) {
return CoreOptions.BLOB_FIELD.key().equals(key)
|| CoreOptions.BLOB_DESCRIPTOR_FIELD.key().equals(key)
|| CoreOptions.BLOB_VIEW_FIELD.key().equals(key)
|| CoreOptions.BLOB_EXTERNAL_STORAGE_FIELD.key().equals(key);
}

private static Set<String> getBlobFields(Map<String, String> options, String key) {
if (CoreOptions.BLOB_FIELD.key().equals(key)) {
return new HashSet<>(CoreOptions.blobField(options));
} else if (CoreOptions.BLOB_DESCRIPTOR_FIELD.key().equals(key)) {
return new CoreOptions(options).blobDescriptorField();
} else if (CoreOptions.BLOB_VIEW_FIELD.key().equals(key)) {
return new CoreOptions(options).blobViewField();
} else if (CoreOptions.BLOB_EXTERNAL_STORAGE_FIELD.key().equals(key)) {
return new CoreOptions(options).blobExternalStorageField();
}
throw new IllegalArgumentException("Unknown blob field option: " + key);
}

public static void checkAlterTablePath(String key) {
if (CoreOptions.PATH.key().equalsIgnoreCase(key)) {
throw new UnsupportedOperationException("Change path is not supported yet.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,9 @@ public static void validateTableSchema(TableSchema schema) {
FileFormat fileFormat =
FileFormat.fromIdentifier(options.formatType(), new Options(schema.options()));
RowType tableRowType = new RowType(schema.fields());

// Validate blob-related fields
// Now we support configuring blob fields for future columns.
validateBlobFields(tableRowType, options);
Set<String> blobDescriptorFields = validateBlobDescriptorFields(tableRowType, options);
Set<String> blobViewFields =
Expand Down Expand Up @@ -795,33 +798,27 @@ private static void validateRowTracking(TableSchema schema, CoreOptions options)
}

private static void validateBlobFields(RowType rowType, CoreOptions options) {
Set<String> blobFieldNames =
rowType.getFields().stream()
.filter(field -> field.type().getTypeRoot() == DataTypeRoot.BLOB)
.map(DataField::name)
.collect(Collectors.toCollection(HashSet::new));
Set<String> fieldNames = fieldNames(rowType);
Set<String> blobFieldNames = blobFieldNames(rowType);
Set<String> configured =
CoreOptions.blobField(options.toMap()).stream()
.collect(Collectors.toCollection(HashSet::new));
for (String field : configured) {
checkArgument(
blobFieldNames.contains(field),
!fieldNames.contains(field) || blobFieldNames.contains(field),
"Field '%s' in '%s' must be a BLOB field in table schema.",
field,
CoreOptions.BLOB_FIELD.key());
}
}

private static Set<String> validateBlobDescriptorFields(RowType rowType, CoreOptions options) {
Set<String> blobFieldNames =
rowType.getFields().stream()
.filter(field -> field.type().getTypeRoot() == DataTypeRoot.BLOB)
.map(DataField::name)
.collect(Collectors.toCollection(HashSet::new));
Set<String> fieldNames = fieldNames(rowType);
Set<String> blobFieldNames = blobFieldNames(rowType);
Set<String> configured = options.blobDescriptorField();
for (String field : configured) {
checkArgument(
blobFieldNames.contains(field),
!fieldNames.contains(field) || blobFieldNames.contains(field),
"Field '%s' in '%s' must be a BLOB field in table schema.",
field,
CoreOptions.BLOB_DESCRIPTOR_FIELD.key());
Expand All @@ -831,15 +828,12 @@ private static Set<String> validateBlobDescriptorFields(RowType rowType, CoreOpt

private static Set<String> validateBlobViewFields(
RowType rowType, CoreOptions options, Set<String> blobDescriptorFields) {
Set<String> blobFieldNames =
rowType.getFields().stream()
.filter(field -> field.type().getTypeRoot() == DataTypeRoot.BLOB)
.map(DataField::name)
.collect(Collectors.toCollection(HashSet::new));
Set<String> fieldNames = fieldNames(rowType);
Set<String> blobFieldNames = blobFieldNames(rowType);
Set<String> configured = options.blobViewField();
for (String field : configured) {
checkArgument(
blobFieldNames.contains(field),
!fieldNames.contains(field) || blobFieldNames.contains(field),
"Field '%s' in '%s' must be a BLOB field in table schema.",
field,
CoreOptions.BLOB_VIEW_FIELD.key());
Expand All @@ -855,15 +849,12 @@ private static Set<String> validateBlobViewFields(

private static void validateBlobExternalStorageFields(
RowType rowType, CoreOptions options, Set<String> blobDescriptorFields) {
Set<String> blobFieldNames =
rowType.getFields().stream()
.filter(field -> field.type().getTypeRoot() == DataTypeRoot.BLOB)
.map(DataField::name)
.collect(Collectors.toCollection(HashSet::new));
Set<String> fieldNames = fieldNames(rowType);
Set<String> blobFieldNames = blobFieldNames(rowType);
Set<String> configured = options.blobExternalStorageField();
for (String field : configured) {
checkArgument(
blobFieldNames.contains(field),
!fieldNames.contains(field) || blobFieldNames.contains(field),
"Field '%s' in '%s' must be a BLOB field in table schema.",
field,
CoreOptions.BLOB_EXTERNAL_STORAGE_FIELD.key());
Expand All @@ -884,6 +875,19 @@ private static void validateBlobExternalStorageFields(
}
}

private static Set<String> fieldNames(RowType rowType) {
return rowType.getFields().stream()
.map(DataField::name)
.collect(Collectors.toCollection(HashSet::new));
}

private static Set<String> blobFieldNames(RowType rowType) {
return rowType.getFields().stream()
.filter(field -> field.type().getTypeRoot() == DataTypeRoot.BLOB)
.map(DataField::name)
.collect(Collectors.toCollection(HashSet::new));
}

private static void validateIncrementalClustering(TableSchema schema, CoreOptions options) {
if (options.clusteringIncrementalEnabled()) {
checkArgument(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,94 @@ public void testRenameSeqGroupFields() throws Exception {
assertThat(newSchema.options()).contains(entries);
}

@Test
public void testAlterAddBlobFieldOptionBeforeAddingColumn() throws Exception {
Schema schema =
new Schema(
RowType.of(DataTypes.INT(), DataTypes.STRING()).getFields(),
Collections.emptyList(),
Collections.emptyList(),
optionsForBlobTable(),
"");
schemaManager.createTable(schema);

TableSchema schemaWithOption =
schemaManager.commitChanges(
SchemaChange.setOption(CoreOptions.BLOB_FIELD.key(), "picture"));
assertThat(schemaWithOption.fieldNames()).doesNotContain("picture");

TableSchema schemaWithBlobColumn =
schemaManager.commitChanges(SchemaChange.addColumn("picture", DataTypes.BLOB()));
assertThat(schemaWithBlobColumn.fields().get(2).type()).isEqualTo(DataTypes.BLOB());
}

@Test
public void testAlterBlobFieldOptionCannotConvertExistingColumn() throws Exception {
Schema schema =
new Schema(
RowType.of(DataTypes.INT(), DataTypes.BYTES()).getFields(),
Collections.emptyList(),
Collections.emptyList(),
optionsForBlobTable(),
"");
schemaManager.createTable(schema);

assertThatThrownBy(
() ->
schemaManager.commitChanges(
SchemaChange.setOption(CoreOptions.BLOB_FIELD.key(), "f1")))
.isInstanceOf(UnsupportedOperationException.class)
.hasMessageContaining("Cannot configure existing field 'f1' as a BLOB field.")
.hasMessageContaining("set 'blob-field' before adding the binary column");
}

@Test
public void testAlterBlobFieldOptionCanRemoveNonExistingField() throws Exception {
Map<String, String> options = optionsForBlobTable();
options.put(CoreOptions.BLOB_FIELD.key(), "picture");
Schema schema =
new Schema(
RowType.of(DataTypes.INT(), DataTypes.STRING()).getFields(),
Collections.emptyList(),
Collections.emptyList(),
options,
"");
schemaManager.createTable(schema);

TableSchema newSchema =
schemaManager.commitChanges(
SchemaChange.setOption(CoreOptions.BLOB_FIELD.key(), "avatar"));
assertThat(CoreOptions.blobField(newSchema.options())).containsExactly("avatar");
}

@Test
public void testAlterBlobFieldOptionCannotRemoveExistingField() throws Exception {
Map<String, String> options = optionsForBlobTable();
options.put(CoreOptions.BLOB_FIELD.key(), "f1");
Schema schema =
new Schema(
RowType.of(DataTypes.INT(), DataTypes.BLOB()).getFields(),
Collections.emptyList(),
Collections.emptyList(),
options,
"");
schemaManager.createTable(schema);

assertThatThrownBy(
() ->
schemaManager.commitChanges(
SchemaChange.setOption(CoreOptions.BLOB_FIELD.key(), "")))
.isInstanceOf(UnsupportedOperationException.class)
.hasMessageContaining("Cannot remove an existing field 'f1' from 'blob-field'.");
}

private Map<String, String> optionsForBlobTable() {
Map<String, String> options = new HashMap<>();
options.put(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
options.put(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
return options;
}

private List<String> readRecords(FileStoreTable table, Predicate filter) throws IOException {
List<String> results = new ArrayList<>();
forEachRemaining(
Expand Down
Loading
Loading