diff --git a/fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java b/fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java index 0bd67da17d..45a1e54b7e 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java @@ -110,6 +110,7 @@ import java.util.stream.Collectors; import static org.apache.fluss.cluster.rebalance.RebalanceStatus.FINAL_STATUSES; +import static org.apache.fluss.config.FlussConfigUtils.BUCKET_NUM; import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toResolvedPartitionSpec; import static org.apache.fluss.utils.Preconditions.checkArgument; import static org.apache.fluss.utils.Preconditions.checkState; @@ -407,7 +408,8 @@ public static AlterTableRequest makeAlterTableRequest( } else if (tableChange instanceof TableChange.ModifyColumn) { modifyColumns.add(toPbModifyColumn((TableChange.ModifyColumn) tableChange)); } else if (tableChange instanceof TableChange.SetOption - || tableChange instanceof TableChange.ResetOption) { + || tableChange instanceof TableChange.ResetOption + || tableChange instanceof TableChange.BucketNumOption) { alterConfigs.add(toPbAlterConfigs(tableChange)); } else { throw new IllegalArgumentException( @@ -624,7 +626,12 @@ public static PbPartitionSpec makePbPartitionSpec(PartitionSpec partitionSpec) { public static PbAlterConfig toPbAlterConfigs(TableChange tableChange) { PbAlterConfig info = new PbAlterConfig(); - if (tableChange instanceof TableChange.SetOption) { + if (tableChange instanceof TableChange.BucketNumOption) { + TableChange.BucketNumOption bucketNumOption = (TableChange.BucketNumOption) tableChange; + info.setConfigKey(BUCKET_NUM); + info.setConfigValue(String.valueOf(bucketNumOption.getBucketNum())); + info.setOpType(AlterConfigOpType.SET.value()); + } else if (tableChange instanceof TableChange.SetOption) { TableChange.SetOption setOption = (TableChange.SetOption) tableChange; info.setConfigKey(setOption.getKey()); info.setConfigValue(setOption.getValue()); diff --git a/fluss-client/src/test/java/org/apache/fluss/client/admin/ClientToServerITCaseBase.java b/fluss-client/src/test/java/org/apache/fluss/client/admin/ClientToServerITCaseBase.java index 5714ddf674..0f10ed27ec 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/admin/ClientToServerITCaseBase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/admin/ClientToServerITCaseBase.java @@ -246,6 +246,13 @@ public static void waitAllReplicasReady(long tableId, int expectBucketCount) { } } + public static void waitAllReplicasReady(long tableId, long partitionId, int expectBucketCount) { + for (int i = 0; i < expectBucketCount; i++) { + FLUSS_CLUSTER_EXTENSION.waitUntilAllReplicaReady( + new TableBucket(tableId, partitionId, i)); + } + } + protected static void verifyRows( RowType rowType, Map> actualRows, diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java index 1d3865a08f..5f88fcb2a4 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java @@ -34,6 +34,8 @@ import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; import org.apache.fluss.config.MemorySize; +import org.apache.fluss.exception.InvalidAlterTableException; +import org.apache.fluss.exception.InvalidTableException; import org.apache.fluss.fs.FsPath; import org.apache.fluss.fs.TestFileSystem; import org.apache.fluss.metadata.DataLakeFormat; @@ -70,9 +72,11 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.CompletableFuture; import static org.apache.fluss.client.table.scanner.batch.BatchScanUtils.collectRows; @@ -1164,6 +1168,128 @@ void verifyAppendOrPut(boolean append, String logFormat, @Nullable String kvForm } } + @Test + void testAppendWithAlterTableBucket() throws Exception { + TableDescriptor data1TableDescriptor = + TableDescriptor.builder().schema(DATA1_SCHEMA).distributedBy(1).build(); + createTable(DATA1_TABLE_PATH, data1TableDescriptor, false); + TableInfo tableInfo = admin.getTableInfo(DATA1_TABLE_PATH).get(); + + int lastCount = verifyAppendForAlterTableBucket(1, 0); + + // alter table bucket from 1 to 2 + List tableChanges = Collections.singletonList(TableChange.bucketNum(2)); + admin.alterTable(DATA1_TABLE_PATH, tableChanges, false); + + // wait until new bucket replicas are ready + waitAllReplicasReady(tableInfo.getTableId(), 2); + + verifyAppendForAlterTableBucket(2, lastCount); + } + + int verifyAppendForAlterTableBucket(int bucketNum, int lastCount) throws Exception { + Configuration clientConf = FLUSS_CLUSTER_EXTENSION.getClientConfig(); + // use round-robin bucket assigner, so that we can append data to all buckets + clientConf.set( + ConfigOptions.CLIENT_WRITER_BUCKET_NO_KEY_ASSIGNER, + ConfigOptions.NoKeyAssigner.ROUND_ROBIN); + Connection conn = ConnectionFactory.createConnection(clientConf); + int rowCount = 10; + int expectedRowCount = lastCount + rowCount; + try (Table table = conn.getTable(DATA1_TABLE_PATH)) { + AppendWriter appendWriter = table.newAppend().createWriter(); + + for (int i = 0; i < rowCount; i++) { + GenericRow row = row(i, "a"); + appendWriter.append(row).get(); + } + appendWriter.flush(); + + try (LogScanner logScanner = createLogScanner(table)) { + subscribeFromBeginning(logScanner, table); + + int count = 0; + Set allBuckets = new HashSet<>(); + while (count < expectedRowCount) { + ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1)); + allBuckets.addAll(scanRecords.buckets()); + count += scanRecords.count(); + } + assertThat(allBuckets.size()).isEqualTo(bucketNum); + assertThat(count).isEqualTo(expectedRowCount); + } + } + conn.close(); + return expectedRowCount; + } + + @Test + void testInvalidAlterTableBucket() throws Exception { + Schema logTableSchema = + Schema.newBuilder() + .column("a", DataTypes.INT()) + .column("b", DataTypes.STRING()) + .build(); + TableDescriptor td1 = + TableDescriptor.builder().schema(logTableSchema).distributedBy(1, "a").build(); + TablePath t1 = TablePath.of("test_db_1", "test_invalid_alter_table_bucket_1"); + createTable(t1, td1, false); + + // alter table bucket from 1 to 2 + List tableChanges = Collections.singletonList(TableChange.bucketNum(2)); + // alter table bucket is not supported for log table with bucket keys now + assertThatThrownBy(() -> admin.alterTable(t1, tableChanges, false).get()) + .cause() + .isInstanceOf(InvalidTableException.class) + .hasMessage( + "Alter table bucket is not supported for Log table with bucket keys or PrimaryKey Table now."); + + Schema pkTableSchema = + Schema.newBuilder() + .column("a", DataTypes.INT()) + .column("b", DataTypes.STRING()) + .primaryKey("a") + .build(); + TableDescriptor td2 = + TableDescriptor.builder().schema(pkTableSchema).distributedBy(1).build(); + TablePath t2 = TablePath.of("test_db_2", "test_invalid_alter_table_bucket_2"); + createTable(t2, td2, false); + + // alter table bucket from 1 to 2 + List tableChanges2 = Collections.singletonList(TableChange.bucketNum(2)); + // alter table bucket is not supported for pk table now + assertThatThrownBy(() -> admin.alterTable(t2, tableChanges2, false).get()) + .cause() + .isInstanceOf(InvalidTableException.class) + .hasMessage( + "Alter table bucket is not supported for Log table with bucket keys or PrimaryKey Table now."); + + // test bucket number cannot be decreased + Schema logTableNoBucketKey = + Schema.newBuilder() + .column("a", DataTypes.INT()) + .column("b", DataTypes.STRING()) + .build(); + TableDescriptor td3 = + TableDescriptor.builder().schema(logTableNoBucketKey).distributedBy(3).build(); + TablePath t3 = TablePath.of("test_db_3", "test_invalid_alter_table_bucket_3"); + createTable(t3, td3, false); + + // alter table bucket from 3 to 2 (shrink) + List tableChanges3 = Collections.singletonList(TableChange.bucketNum(2)); + assertThatThrownBy(() -> admin.alterTable(t3, tableChanges3, false).get()) + .cause() + .isInstanceOf(InvalidAlterTableException.class) + .hasMessageContaining("Bucket number can only be increased"); + + // alter table bucket from 3 to 3 (same) + List tableChanges4 = Collections.singletonList(TableChange.bucketNum(3)); + assertThatThrownBy(() -> admin.alterTable(t3, tableChanges4, false).get()) + .cause() + .isInstanceOf(InvalidAlterTableException.class) + .hasMessageContaining("Bucket number can only be increased"); + } + @ParameterizedTest @ValueSource(strings = {"INDEXED", "ARROW"}) void testAppendAndProject(String format) throws Exception { diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/PartitionedTableITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/table/PartitionedTableITCase.java index d204d087a4..f2b28da8a3 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/table/PartitionedTableITCase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/table/PartitionedTableITCase.java @@ -17,17 +17,25 @@ package org.apache.fluss.client.table; +import org.apache.fluss.client.Connection; +import org.apache.fluss.client.ConnectionFactory; import org.apache.fluss.client.admin.ClientToServerITCaseBase; import org.apache.fluss.client.lookup.Lookuper; +import org.apache.fluss.client.table.scanner.log.LogScanner; +import org.apache.fluss.client.table.scanner.log.ScanRecords; import org.apache.fluss.client.table.writer.AppendWriter; import org.apache.fluss.client.table.writer.UpsertWriter; import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.config.Configuration; import org.apache.fluss.exception.PartitionNotExistException; import org.apache.fluss.exception.TooManyPartitionsException; import org.apache.fluss.metadata.PartitionInfo; import org.apache.fluss.metadata.PhysicalTablePath; import org.apache.fluss.metadata.Schema; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TableChange; import org.apache.fluss.metadata.TableDescriptor; +import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.metadata.TablePath; import org.apache.fluss.row.GenericRow; import org.apache.fluss.row.InternalRow; @@ -37,10 +45,13 @@ import java.time.Duration; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH_PK; import static org.apache.fluss.testutils.DataTestUtils.row; @@ -146,6 +157,95 @@ void testPartitionedLogTable() throws Exception { verifyPartitionLogs(table, schema.getRowType(), expectPartitionAppendRows); } + @Test + void testAppendForAlterPartitionTableBucket() throws Exception { + TablePath tablePath = TablePath.of("test_db_1", "test_alter_partition_table_bucket"); + createPartitionedTable(tablePath, false); + TableInfo tableInfo = admin.getTableInfo(tablePath).get(); + + List partitionInfos = admin.listPartitionInfos(tablePath).get(); + assertThat(partitionInfos.isEmpty()).isTrue(); + + // add 1 partition + admin.createPartition(tablePath, newPartitionSpec("c", "c0"), false).get(); + partitionInfos = admin.listPartitionInfos(tablePath).get(); + assertThat(partitionInfos.size()).isEqualTo(1); + + // verify append for 1 partition with 1 bucket + int lastCount = verifyAppendForAlterPartitionTableBucket(tablePath, partitionInfos, 1, 0); + + // alter table bucket from 1 to 2 + List tableChanges = Collections.singletonList(TableChange.bucketNum(2)); + admin.alterTable(tablePath, tableChanges, false); + + // wait until new bucket replicas are ready + for (PartitionInfo partitionInfo : partitionInfos) { + waitAllReplicasReady(tableInfo.getTableId(), partitionInfo.getPartitionId(), 2); + } + + // verify append for 1 partition with 2 bucket + lastCount = + verifyAppendForAlterPartitionTableBucket(tablePath, partitionInfos, 2, lastCount); + + // add another partition, which may have 2 buckets + admin.createPartition(tablePath, newPartitionSpec("c", "c1"), false).get(); + partitionInfos = admin.listPartitionInfos(tablePath).get(); + assertThat(partitionInfos.size()).isEqualTo(2); + + // wait until new bucket replicas are ready + for (PartitionInfo partitionInfo : partitionInfos) { + waitAllReplicasReady(tableInfo.getTableId(), partitionInfo.getPartitionId(), 2); + } + + // newly created partition should also have 2 buckets + verifyAppendForAlterPartitionTableBucket(tablePath, partitionInfos, 2, lastCount); + } + + private int verifyAppendForAlterPartitionTableBucket( + TablePath tablePath, List partitionInfos, int bucketNum, int lastCount) + throws Exception { + Configuration clientConf = FLUSS_CLUSTER_EXTENSION.getClientConfig(); + // use round-robin bucket assigner, so that we can append data to all buckets + clientConf.set( + ConfigOptions.CLIENT_WRITER_BUCKET_NO_KEY_ASSIGNER, + ConfigOptions.NoKeyAssigner.ROUND_ROBIN); + Connection conn = ConnectionFactory.createConnection(clientConf); + Table table = conn.getTable(tablePath); + AppendWriter appendWriter = table.newAppend().createWriter(); + + int recordsPerPartition = 5; + for (PartitionInfo partitionInfo : partitionInfos) { + String partitionName = partitionInfo.getPartitionName(); + for (int j = 0; j < recordsPerPartition; j++) { + InternalRow row = row(j, "a" + j, partitionName); + appendWriter.append(row); + } + } + appendWriter.flush(); + + int expectedCount = partitionInfos.size() * recordsPerPartition + lastCount; + try (LogScanner logScanner = table.newScan().createLogScanner()) { + for (PartitionInfo partitionInfo : partitionInfos) { + for (int i = 0; i < bucketNum; i++) { + logScanner.subscribeFromBeginning(partitionInfo.getPartitionId(), i); + } + } + + int count = 0; + Set allBuckets = new HashSet<>(); + while (count < expectedCount) { + ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1)); + allBuckets.addAll(scanRecords.buckets()); + + count += scanRecords.count(); + } + assertThat(allBuckets.size()).isEqualTo(partitionInfos.size() * bucketNum); + assertThat(count).isEqualTo(expectedCount); + } + conn.close(); + return expectedCount; + } + @Test void testWriteToNonExistsPartitionWhenDisabledDynamicPartition() throws Exception { clientConf.set(ConfigOptions.CLIENT_WRITER_DYNAMIC_CREATE_PARTITION_ENABLED, false); diff --git a/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java b/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java index adbbce0af4..c9d88ca12b 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java @@ -33,6 +33,8 @@ @Internal public class FlussConfigUtils { + public static final String BUCKET_NUM = "bucket.num"; + public static final Map> TABLE_OPTIONS; public static final Map> CLIENT_OPTIONS; public static final String TABLE_PREFIX = "table."; diff --git a/fluss-common/src/main/java/org/apache/fluss/metadata/TableChange.java b/fluss-common/src/main/java/org/apache/fluss/metadata/TableChange.java index bb027d6905..f217b0b10b 100644 --- a/fluss-common/src/main/java/org/apache/fluss/metadata/TableChange.java +++ b/fluss-common/src/main/java/org/apache/fluss/metadata/TableChange.java @@ -26,6 +26,10 @@ /** {@link TableChange} represents the modification of the Fluss Table. */ public interface TableChange { + static BucketNumOption bucketNum(int bucketNum) { + return new BucketNumOption(bucketNum); + } + /** * A table change to add the column with specified position. * @@ -63,8 +67,9 @@ static AddColumn addColumn( * ALTER TABLE <table_name> MODIFY <column_definition> COMMENT '<column_comment>' <column_position> * * - * @param oldColumn the definition of the old column. - * @param newColumn the definition of the new column. + * @param columnName the name of the column. + * @param dataType the data type of the column. + * @param comment the comment of the column. * @param columnPosition the new position of the column. * @return a TableChange represents the modification. */ @@ -85,8 +90,8 @@ static ModifyColumn modifyColumn( * ALTER TABLE <table_name> RENAME <old_column_name> TO <new_column_name> * * - * @param oldColumn the definition of the old column. - * @param newName the name of the new column. + * @param oldColumnName the name of the old column. + * @param newColumnName the name of the new column. * @return a TableChange represents the modification. */ static RenameColumn renameColumn(String oldColumnName, String newColumnName) { @@ -117,6 +122,46 @@ static ResetOption reset(String key) { return new ResetOption(key); } + /** + * A table change to change the bucket num. + * + *

It is equal to the following statement: + * + *

+     *    ALTER TABLE <table_name> SET 'bucket.num' = '<bucketNum>';
+     * 
+ */ + class BucketNumOption implements TableChange { + private final int bucketNum; + + public BucketNumOption(int bucketNum) { + this.bucketNum = bucketNum; + } + + public int getBucketNum() { + return bucketNum; + } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) { + return false; + } + BucketNumOption that = (BucketNumOption) o; + return bucketNum == that.bucketNum; + } + + @Override + public int hashCode() { + return Objects.hashCode(bucketNum); + } + + @Override + public String toString() { + return "BucketNumOption{" + "bucketNum=" + bucketNum + '}'; + } + } + /** * A table change to set the table option. * diff --git a/fluss-common/src/main/java/org/apache/fluss/metadata/TableDescriptor.java b/fluss-common/src/main/java/org/apache/fluss/metadata/TableDescriptor.java index 9f18dd401f..6ecf48dff3 100644 --- a/fluss-common/src/main/java/org/apache/fluss/metadata/TableDescriptor.java +++ b/fluss-common/src/main/java/org/apache/fluss/metadata/TableDescriptor.java @@ -159,6 +159,11 @@ public List getBucketKeys() { .orElse(Collections.emptyList()); } + public Optional getBucketCount() { + return this.getTableDistribution() + .flatMap(TableDescriptor.TableDistribution::getBucketCount); + } + /** * Check if the table is using a default bucket key. A default bucket key is: * @@ -299,6 +304,29 @@ public TableDescriptor withBucketCount(int newBucketCount) { customProperties); } + /** + * Returns a new TableDescriptor instance that is a copy of this TableDescriptor with a new + * bucket count, new properties and new customProperties. + */ + public TableDescriptor withBucketCountAndProperties( + @Nullable Integer newBucketCount, + Map newProperties, + Map newCustomProperties) { + return new TableDescriptor( + schema, + comment, + partitionKeys, + newBucketCount != null + ? new TableDistribution( + newBucketCount, + Optional.ofNullable(tableDistribution) + .map(TableDistribution::getBucketKeys) + .orElse(Collections.emptyList())) + : tableDistribution, + newProperties, + newCustomProperties); + } + public Optional getComment() { return Optional.ofNullable(comment); } diff --git a/fluss-common/src/main/java/org/apache/fluss/metadata/TableInfo.java b/fluss-common/src/main/java/org/apache/fluss/metadata/TableInfo.java index 00f58b81f0..dbdcd05009 100644 --- a/fluss-common/src/main/java/org/apache/fluss/metadata/TableInfo.java +++ b/fluss-common/src/main/java/org/apache/fluss/metadata/TableInfo.java @@ -411,6 +411,23 @@ public TableDescriptor toTableDescriptor() { .build(); } + public TableInfo toNewTableInfo(int numBuckets) { + return new TableInfo( + tablePath, + tableId, + schemaId, + schema, + bucketKeys, + partitionKeys, + numBuckets, + properties, + customProperties, + remoteDataDir, + comment, + createdTime, + modifiedTime); + } + /** Utility to create a {@link TableInfo} from a {@link TableDescriptor} and other metadata. */ public static TableInfo of( TablePath tablePath, diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java index 23d8f0b2e9..f27c374142 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java @@ -231,11 +231,7 @@ public class FlinkConnectorOptions { // -------------------------------------------------------------------------------------------- public static final List ALTER_DISALLOW_OPTIONS = - Arrays.asList( - AUTO_INCREMENT_FIELDS.key(), - BUCKET_NUMBER.key(), - BUCKET_KEY.key(), - BOOTSTRAP_SERVERS.key()); + Arrays.asList(AUTO_INCREMENT_FIELDS.key(), BUCKET_KEY.key(), BOOTSTRAP_SERVERS.key()); // ------------------------------------------------------------------------------------------- // Only used internally to support materialized table diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java index 01f9a9817e..39fb179be4 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java @@ -416,9 +416,14 @@ private static void convertFlussTablePropertiesToFlinkOptions( public static List toFlussTableChanges( org.apache.flink.table.catalog.TableChange tableChange) { if (tableChange instanceof org.apache.flink.table.catalog.TableChange.SetOption) { - return Collections.singletonList( - convertSetOption( - (org.apache.flink.table.catalog.TableChange.SetOption) tableChange)); + org.apache.flink.table.catalog.TableChange.SetOption setOption = + (org.apache.flink.table.catalog.TableChange.SetOption) tableChange; + // convert to bucket num if the key is bucket.num + if (BUCKET_NUMBER.key().equals(setOption.getKey())) { + return Collections.singletonList(convertBucketNumOption(setOption)); + } + + return Collections.singletonList(convertSetOption(setOption)); } else if (tableChange instanceof org.apache.flink.table.catalog.TableChange.AddColumn) { org.apache.flink.table.catalog.TableChange.AddColumn addColumn = (org.apache.flink.table.catalog.TableChange.AddColumn) tableChange; @@ -471,6 +476,16 @@ public static List toFlussTableChanges( } } + private static TableChange.BucketNumOption convertBucketNumOption( + org.apache.flink.table.catalog.TableChange.SetOption flinkSetOption) { + try { + return TableChange.bucketNum(Integer.parseInt(flinkSetOption.getValue())); + } catch (NumberFormatException e) { + throw new IllegalArgumentException( + String.format("Invalid bucket num: %s.", flinkSetOption.getValue()), e); + } + } + private static TableChange.ColumnPosition toFlussColumnPosition( org.apache.flink.table.catalog.TableChange.ColumnPosition columnPosition) { if (columnPosition == null) { diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java index eebe586112..02fbebbd9c 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java @@ -261,29 +261,22 @@ void testAlterTableConfig() throws Exception { .hasMessageContaining("table.auto-partition.enabled"); String unSupportedDml2 = - "alter table test_alter_table_append_only set ('bucket.num' = '1000')"; - assertThatThrownBy(() -> tEnv.executeSql(unSupportedDml2)) - .rootCause() - .isInstanceOf(CatalogException.class) - .hasMessage("The option 'bucket.num' is not supported to alter yet."); - - String unSupportedDml3 = "alter table test_alter_table_append_only set ('bucket.key' = 'a')"; - assertThatThrownBy(() -> tEnv.executeSql(unSupportedDml3)) + assertThatThrownBy(() -> tEnv.executeSql(unSupportedDml2)) .rootCause() .isInstanceOf(CatalogException.class) .hasMessage("The option 'bucket.key' is not supported to alter yet."); - String unSupportedDml4 = + String unSupportedDml3 = "alter table test_alter_table_append_only reset ('bootstrap.servers')"; - assertThatThrownBy(() -> tEnv.executeSql(unSupportedDml4)) + assertThatThrownBy(() -> tEnv.executeSql(unSupportedDml3)) .rootCause() .isInstanceOf(CatalogException.class) .hasMessage("The option 'bootstrap.servers' is not supported to alter yet."); - String unSupportedDml5 = + String unSupportedDml4 = "alter table test_alter_table_append_only set ('auto-increment.fields' = 'b')"; - assertThatThrownBy(() -> tEnv.executeSql(unSupportedDml5)) + assertThatThrownBy(() -> tEnv.executeSql(unSupportedDml4)) .rootCause() .isInstanceOf(CatalogException.class) .hasMessage("The option 'auto-increment.fields' is not supported to alter yet."); @@ -308,6 +301,49 @@ void testAlterTableSchema() throws Exception { assertThat(table.getUnresolvedSchema()).isEqualTo(expectedSchema); } + @Test + void testAlterTableBucket() throws Exception { + String ddl = + "create table test_alter_table_bucket (" + + "a string, " + + "b int) " + + "with ('bucket.num' = '5')"; + tEnv.executeSql(ddl); + + CatalogTable table = + (CatalogTable) + catalog.getTable(new ObjectPath(DEFAULT_DB, "test_alter_table_bucket")); + Map expectedOptions = new HashMap<>(); + expectedOptions.put("bucket.num", "5"); + expectedOptions.put("table.datalake.format", "paimon"); + assertOptionsEqual(table.getOptions(), expectedOptions); + + // test alter add bucket number + String alter = "alter table test_alter_table_bucket set ('bucket.num' = '10')"; + tEnv.executeSql(alter); + table = + (CatalogTable) + catalog.getTable(new ObjectPath(DEFAULT_DB, "test_alter_table_bucket")); + expectedOptions.put("bucket.num", "10"); + assertOptionsEqual(table.getOptions(), expectedOptions); + + String alterSub = "alter table test_alter_table_bucket set ('bucket.num' = '5')"; + assertThatThrownBy(() -> tEnv.executeSql(alterSub)) + .cause() + .cause() + .isInstanceOf(InvalidAlterTableException.class) + .hasMessageContaining( + "Bucket number can only be increased, current bucket number is 10, new bucket number is 5"); + + // test alter invalid bucket number + alter = "alter table test_alter_table_bucket set ('bucket.num' = 'x')"; + String finalAlter = alter; + assertThatThrownBy(() -> tEnv.executeSql(finalAlter)) + .cause() + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid bucket num: x."); + } + @Test void testCreateUnSupportedTable() { // test invalid property diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java index 94166e2d05..7d9a91ed43 100644 --- a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java @@ -57,6 +57,9 @@ public class PaimonConversions { // again private static final String PARTITION_GENERATE_LEGACY_NAME_OPTION_KEY = "partition.legacy-name"; + // property key for bucket number + private static final String BUCKET_NUM = "bucket.num"; + // for fluss config public static final String FLUSS_CONF_PREFIX = "fluss."; public static final String TABLE_DATALAKE_PAIMON_PREFIX = "table.datalake.paimon."; @@ -120,7 +123,14 @@ public static List toPaimonSchemaChanges(List tableCh List schemaChanges = new ArrayList<>(tableChanges.size()); for (TableChange tableChange : tableChanges) { - if (tableChange instanceof TableChange.SetOption) { + if (tableChange instanceof TableChange.BucketNumOption) { + TableChange.BucketNumOption bucketNumOption = + (TableChange.BucketNumOption) tableChange; + schemaChanges.add( + SchemaChange.setOption( + convertFlussPropertyKeyToPaimon(BUCKET_NUM), + String.valueOf(bucketNumOption.getBucketNum()))); + } else if (tableChange instanceof TableChange.SetOption) { TableChange.SetOption setOption = (TableChange.SetOption) tableChange; String key = convertFlussPropertyKeyToPaimon(setOption.getKey()); validateAlterPaimonOptions(key); diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java index 0efa057b89..edb186c837 100644 --- a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java +++ b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java @@ -1056,6 +1056,79 @@ void testCreatePaimonDvTableWithNonStringPartitionColumn() throws Exception { "Only support String type as partitioned key when 'deletion-vectors.enabled' is set to true for paimon, found 'c3' is not String type."); } + @Test + void testAlterLakeEnableTableBucket() throws Exception { + Map customProperties = new HashMap<>(); + customProperties.put("bucket.num", "3"); + + // create table + TableDescriptor tableDescriptor = + TableDescriptor.builder() + .schema( + Schema.newBuilder() + .column("c1", DataTypes.INT()) + .column("c2", DataTypes.STRING()) + .build()) + .property(ConfigOptions.TABLE_DATALAKE_ENABLED, true) + .customProperties(customProperties) + .distributedBy(BUCKET_NUM) + .build(); + TablePath tablePath = TablePath.of(DATABASE, "alter_table_bucket"); + admin.createTable(tablePath, tableDescriptor, false).get(); + Table paimonTable = + paimonCatalog.getTable(Identifier.create(DATABASE, tablePath.getTableName())); + verifyPaimonTable( + paimonTable, + tableDescriptor, + RowType.of( + new DataType[] { + org.apache.paimon.types.DataTypes.INT(), + org.apache.paimon.types.DataTypes.STRING(), + // for __bucket, __offset, __timestamp + org.apache.paimon.types.DataTypes.INT(), + org.apache.paimon.types.DataTypes.BIGINT(), + org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS() + }, + new String[] { + "c1", + "c2", + BUCKET_COLUMN_NAME, + OFFSET_COLUMN_NAME, + TIMESTAMP_COLUMN_NAME + }), + null, + BUCKET_NUM); + + // test alter table bucket + List tableChanges = Collections.singletonList(TableChange.bucketNum(10)); + admin.alterTable(tablePath, tableChanges, false).get(); + paimonTable = paimonCatalog.getTable(Identifier.create(DATABASE, tablePath.getTableName())); + customProperties.put("bucket.num", "10"); + tableDescriptor = + tableDescriptor.withProperties(tableDescriptor.getProperties(), customProperties); + verifyPaimonTable( + paimonTable, + tableDescriptor, + RowType.of( + new DataType[] { + org.apache.paimon.types.DataTypes.INT(), + org.apache.paimon.types.DataTypes.STRING(), + // for __bucket, __offset, __timestamp + org.apache.paimon.types.DataTypes.INT(), + org.apache.paimon.types.DataTypes.BIGINT(), + org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS() + }, + new String[] { + "c1", + "c2", + BUCKET_COLUMN_NAME, + OFFSET_COLUMN_NAME, + TIMESTAMP_COLUMN_NAME + }), + null, + BUCKET_NUM); + } + private void verifyPaimonTable( Table paimonTable, TableDescriptor flussTable, diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/PaimonLakeCatalogTest.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/PaimonLakeCatalogTest.java index b56ba17cb7..c61ca1cbf2 100644 --- a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/PaimonLakeCatalogTest.java +++ b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/PaimonLakeCatalogTest.java @@ -86,7 +86,7 @@ void testAlterTableProperties() throws Exception { Table table = flussPaimonCatalog.getPaimonCatalog().getTable(identifier); // value should be null for key - assertThat(table.options().get("key")).isEqualTo(null); + assertThat(table.options().get("fluss.key")).isEqualTo(null); // set the value for key flussPaimonCatalog.alterTable( @@ -107,6 +107,15 @@ void testAlterTableProperties() throws Exception { table = flussPaimonCatalog.getPaimonCatalog().getTable(identifier); // we have reset the value for key assertThat(table.options().get("fluss.key")).isEqualTo(null); + + // test for bucket.num + assertThat(table.options().get("fluss.bucket.num")).isEqualTo(null); + flussPaimonCatalog.alterTable( + tablePath, + Collections.singletonList(TableChange.bucketNum(3)), + LAKE_CATALOG_CONTEXT); + table = flussPaimonCatalog.getPaimonCatalog().getTable(identifier); + assertThat(table.options().get("fluss.bucket.num")).isEqualTo("3"); } @Test diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java index 90ab136fde..479c7966b4 100644 --- a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java +++ b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java @@ -111,6 +111,9 @@ protected static Configuration initConfig() { public static void beforeAll(Configuration conf) { clientConf = conf; + clientConf.set( + ConfigOptions.CLIENT_WRITER_BUCKET_NO_KEY_ASSIGNER, + ConfigOptions.NoKeyAssigner.ROUND_ROBIN); conn = ConnectionFactory.createConnection(clientConf); admin = conn.getAdmin(); paimonCatalog = getPaimonCatalog(); @@ -123,6 +126,14 @@ public void beforeEach() { execEnv.setParallelism(2); } + protected static void reconnect() throws Exception { + admin.close(); + conn.close(); + + conn = ConnectionFactory.createConnection(clientConf); + admin = conn.getAdmin(); + } + protected JobClient buildTieringJob(StreamExecutionEnvironment execEnv) throws Exception { Configuration flussConfig = new Configuration(clientConf); flussConfig.set(POLL_TIERING_TABLE_INTERVAL, Duration.ofMillis(500L)); @@ -263,7 +274,7 @@ protected long createLogTable(TablePath tablePath) throws Exception { protected long createLogTable(TablePath tablePath, int bucketNum) throws Exception { return createLogTable( - tablePath, bucketNum, false, Collections.emptyMap(), Collections.emptyMap()); + tablePath, bucketNum, true, false, Collections.emptyMap(), Collections.emptyMap()); } protected long createLogTable( @@ -273,15 +284,30 @@ protected long createLogTable( Map properties, Map customProperties) throws Exception { + return createLogTable( + tablePath, bucketNum, true, isPartitioned, properties, customProperties); + } + + protected long createLogTable( + TablePath tablePath, + int bucketNum, + boolean withBucketKey, + boolean isPartitioned, + Map properties, + Map customProperties) + throws Exception { Schema.Builder schemaBuilder = Schema.newBuilder().column("a", DataTypes.INT()).column("b", DataTypes.STRING()); TableDescriptor.Builder tableBuilder = TableDescriptor.builder() - .distributedBy(bucketNum, "a") .property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true") .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS, Duration.ofMillis(500)); + if (withBucketKey) { + tableBuilder.distributedBy(bucketNum, "a"); + } + if (isPartitioned) { schemaBuilder.column("c", DataTypes.STRING()); tableBuilder.property(ConfigOptions.TABLE_AUTO_PARTITION_ENABLED, true); @@ -412,6 +438,23 @@ protected void assertReplicaStatus(TableBucket tb, long expectedLogEndOffset) { }); } + protected void assertReplicaStatus(Set tbSet, long expectedSumLogEndOffset) { + retry( + Duration.ofMinutes(1), + () -> { + long sumLogEndOffset = 0; + for (TableBucket tb : tbSet) { + Replica replica = getLeaderReplica(tb); + // datalake snapshot id should be updated + assertThat(replica.getLogTablet().getLakeTableSnapshotId()) + .isGreaterThanOrEqualTo(0); + assertThat(replica.getLakeLogEndOffset()).isGreaterThan(0); + sumLogEndOffset += replica.getLakeLogEndOffset(); + } + assertThat(sumLogEndOffset).isEqualTo(expectedSumLogEndOffset); + }); + } + protected void waitUntilBucketSynced( TablePath tablePath, long tableId, int bucketCount, boolean isPartition) { Set tableBuckets = new HashSet<>(); diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java index bf92cd5f57..f44e22d4d1 100644 --- a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java +++ b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java @@ -34,6 +34,7 @@ import org.apache.fluss.row.TimestampLtz; import org.apache.fluss.row.TimestampNtz; import org.apache.fluss.server.testutils.FlussClusterExtension; +import org.apache.fluss.shaded.guava32.com.google.common.collect.ImmutableSet; import org.apache.fluss.types.DataTypes; import org.apache.fluss.utils.types.Tuple2; @@ -434,6 +435,180 @@ void testTieringForAlterTable() throws Exception { } } + @Test + void testTieringNonPartitionedTableForAlterTableBucket() throws Exception { + // start tiering job + JobClient jobClient = buildTieringJob(execEnv); + + try { + // create a log table + TablePath t1 = TablePath.of(DEFAULT_DB, "logTableAlterTableBucket"); + long t1Id = + createLogTable( + t1, 1, false, false, Collections.emptyMap(), Collections.emptyMap()); + TableBucket t1Bucket0 = new TableBucket(t1Id, 0); + List flussRows = new ArrayList<>(); + // write records + int seq = 0; + for (int i = 0; i < 30; i++) { + List rows = Collections.singletonList(row(seq, "v" + seq)); + flussRows.addAll(rows); + writeRows(t1, rows, true); + seq++; + } + + // check the status of replica after synced; + assertReplicaStatus(t1Bucket0, 30); + // check data in paimon + checkDataInPaimonAppendOnlyTableForMultipleBuckets(t1, flussRows); + // check snapshot offsets + Map expectedOffsets = new HashMap<>(); + expectedOffsets.put(t1Bucket0, 30L); + checkFlussOffsetsInSnapshot(t1, expectedOffsets); + + // alter table bucket num from 1 to 2 + TableChange.BucketNumOption bucketNumOption = TableChange.bucketNum(2); + List changes = Collections.singletonList(bucketNumOption); + admin.alterTable(t1, changes, false).get(); + + TableBucket t1Bucket1 = new TableBucket(t1Id, 1); + // write records again to all buckets + for (int i = 0; i < 30; i++) { + List rows = Collections.singletonList(row(seq, "v" + seq)); + flussRows.addAll(rows); + writeRows(t1, rows, true); + seq++; + } + + // check the status of replica after synced; + assertReplicaStatus(ImmutableSet.of(t1Bucket0, t1Bucket1), 60); + // check data in paimon of all buckets + checkDataInPaimonAppendOnlyTableForMultipleBuckets(t1, flussRows); + // check snapshot offsets + assertThat( + getLeaderReplica(t1Bucket0).getLakeLogEndOffset() + + getLeaderReplica(t1Bucket1).getLakeLogEndOffset()) + .isEqualTo(60); + expectedOffsets.put(t1Bucket0, getLeaderReplica(t1Bucket0).getLakeLogEndOffset()); + expectedOffsets.put(t1Bucket1, getLeaderReplica(t1Bucket1).getLakeLogEndOffset()); + checkFlussOffsetsInSnapshot(t1, expectedOffsets); + } finally { + jobClient.cancel().get(); + } + } + + @Test + void testTieringPartitionedTableForAlterTableBucket() throws Exception { + // start tiering job + JobClient jobClient = buildTieringJob(execEnv); + + try { + // create partitioned table and wait partitions are ready + TablePath partitionedTablePath = + TablePath.of(DEFAULT_DB, "partitionedTableAlterTableBucket"); + Tuple2 tableIdAndDescriptor = + createPartitionedTable(partitionedTablePath); + Map partitionNameByIds = waitUntilPartitions(partitionedTablePath); + + List partitionFlussRows = new ArrayList<>(); + // now, write rows into partitioned table + TableDescriptor partitionedTableDescriptor = tableIdAndDescriptor.f1; + Map> writtenRowsByPartition = new HashMap<>(); + int partitionSeq = 0; + for (String partitionName : partitionNameByIds.values()) { + List partitionRows = new ArrayList<>(); + for (int i = 0; i < 3; i++) { + partitionRows.add(row(partitionSeq, "v" + partitionSeq, partitionName)); + partitionSeq++; + } + partitionFlussRows.addAll(partitionRows); + writtenRowsByPartition.put(partitionName, partitionRows); + + writeRows(partitionedTablePath, partitionRows, true); + } + long tableId = tableIdAndDescriptor.f0; + + // wait until synced to paimon + Map expectedOffset = new HashMap<>(); + for (Long partitionId : partitionNameByIds.keySet()) { + TableBucket tableBucket = new TableBucket(tableId, partitionId, 0); + assertReplicaStatus(tableBucket, 3); + expectedOffset.put(tableBucket, 3L); + } + + // now, let's check data in paimon per partition + // check data in paimon + String partitionCol = partitionedTableDescriptor.getPartitionKeys().get(0); + for (String partitionName : partitionNameByIds.values()) { + checkDataInPaimonAppendOnlyPartitionedTable( + partitionedTablePath, + Collections.singletonMap(partitionCol, partitionName), + writtenRowsByPartition.get(partitionName), + 0); + } + + checkFlussOffsetsInSnapshot(partitionedTablePath, expectedOffset); + + // alter partitioned table bucket from 1 to 2 + TableChange.BucketNumOption bucketNumOption = TableChange.bucketNum(2); + List changes = Collections.singletonList(bucketNumOption); + admin.alterTable(partitionedTablePath, changes, false).get(); + + List partitionInfos = + admin.listPartitionInfos(partitionedTablePath).get(); + // wait until new bucket replicas are ready + for (PartitionInfo partitionInfo : partitionInfos) { + for (int i = 0; i < 2; i++) { + FLUSS_CLUSTER_EXTENSION.waitUntilAllReplicaReady( + new TableBucket(tableId, partitionInfo.getPartitionId(), i)); + } + } + + // reconnect to force update partition metadata + reconnect(); + + // write rows into partitioned table again + for (String partitionName : partitionNameByIds.values()) { + List partitionRows = new ArrayList<>(); + for (int i = 0; i < 3; i++) { + partitionRows.add(row(partitionSeq, "v" + partitionSeq, partitionName)); + partitionSeq++; + } + partitionFlussRows.addAll(partitionRows); + writtenRowsByPartition.get(partitionName).addAll(partitionRows); + + writeRows(partitionedTablePath, partitionRows, true); + } + + // wait until synced to paimon + for (Long partitionId : partitionNameByIds.keySet()) { + TableBucket tableBucket0 = new TableBucket(tableId, partitionId, 0); + TableBucket tableBucket1 = new TableBucket(tableId, partitionId, 1); + assertReplicaStatus(ImmutableSet.of(tableBucket0, tableBucket1), 6); + assertThat(getLeaderReplica(tableBucket0).getLakeLogEndOffset()).isGreaterThan(3); + assertThat(getLeaderReplica(tableBucket1).getLakeLogEndOffset()).isGreaterThan(0); + expectedOffset.put( + tableBucket0, getLeaderReplica(tableBucket0).getLakeLogEndOffset()); + expectedOffset.put( + tableBucket1, getLeaderReplica(tableBucket1).getLakeLogEndOffset()); + } + + // now, let's check data in paimon per partition + // check data in paimon + for (String partitionName : partitionNameByIds.values()) { + checkDataInPaimonAppendOnlyPartitionedTableForMultipleBuckets( + partitionedTablePath, + Collections.singletonMap(partitionCol, partitionName), + partitionFlussRows, + writtenRowsByPartition.get(partitionName).size()); + } + + checkFlussOffsetsInSnapshot(partitionedTablePath, expectedOffset); + } finally { + jobClient.cancel().get(); + } + } + @Test void testTieringToDvEnabledTable() throws Exception { TablePath t1 = TablePath.of(DEFAULT_DB, "pkTableWithDv"); @@ -511,6 +686,32 @@ private void checkDataInPaimonAppendOnlyTable( assertThat(flussRowIterator.hasNext()).isFalse(); } + /** Check for multiple buckets. */ + private void checkDataInPaimonAppendOnlyTableForMultipleBuckets( + TablePath tablePath, List expectedRows) throws Exception { + Iterator paimonRowIterator = + getPaimonRowCloseableIterator(tablePath); + int paimonCount = 0; + Map startingOffsets = new HashMap<>(); + while (paimonRowIterator.hasNext()) { + org.apache.paimon.data.InternalRow row = paimonRowIterator.next(); + int offset = row.getInt(0); + InternalRow flussRow = expectedRows.get(offset); + + assertThat(row.getInt(0)).isEqualTo(flussRow.getInt(0)); + assertThat(row.getString(1).toString()).isEqualTo(flussRow.getString(1).toString()); + + int bucket = row.getInt(2); + startingOffsets.putIfAbsent(bucket, 0L); + // the idx 2 is __bucket, so use 3 + assertThat(row.getLong(3)).isEqualTo(startingOffsets.get(bucket)); + startingOffsets.put(bucket, startingOffsets.get(bucket) + 1); + + paimonCount++; + } + assertThat(paimonCount).isEqualTo(expectedRows.size()); + } + private void checkDataInPaimonAppendOnlyPartitionedTable( TablePath tablePath, Map partitionSpec, @@ -532,6 +733,40 @@ private void checkDataInPaimonAppendOnlyPartitionedTable( assertThat(flussRowIterator.hasNext()).isFalse(); } + /** Check for multiple buckets. */ + private void checkDataInPaimonAppendOnlyPartitionedTableForMultipleBuckets( + TablePath tablePath, + Map partitionSpec, + List expectedRows, + int expectedCount) + throws Exception { + Iterator paimonRowIterator = + getPaimonRowCloseableIterator(tablePath, partitionSpec); + int paimonCount = 0; + Map, Long> startingOffsets = new HashMap<>(); + while (paimonRowIterator.hasNext()) { + org.apache.paimon.data.InternalRow row = paimonRowIterator.next(); + int offset = row.getInt(0); + InternalRow flussRow = expectedRows.get(offset); + + assertThat(row.getInt(0)).isEqualTo(flussRow.getInt(0)); + assertThat(row.getString(1).toString()).isEqualTo(flussRow.getString(1).toString()); + + String partitionName = row.getString(2).toString(); + int bucket = row.getInt(3); + startingOffsets.putIfAbsent(Tuple2.of(partitionName, bucket), 0L); + // the idx 3 is __bucket, so use 4 + assertThat(row.getLong(4)) + .isEqualTo(startingOffsets.get(Tuple2.of(partitionName, bucket))); + startingOffsets.put( + Tuple2.of(partitionName, bucket), + startingOffsets.get(Tuple2.of(partitionName, bucket)) + 1); + + paimonCount++; + } + assertThat(paimonCount).isEqualTo(expectedCount); + } + private CloseableIterator getPaimonRowCloseableIterator( TablePath tablePath, Map partitionSpec) throws Exception { Identifier tableIdentifier = diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java index 96f39ef08a..943b0daf66 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java @@ -62,6 +62,7 @@ import org.apache.fluss.server.coordinator.event.AccessContextEvent; import org.apache.fluss.server.coordinator.event.AddServerTagEvent; import org.apache.fluss.server.coordinator.event.AdjustIsrReceivedEvent; +import org.apache.fluss.server.coordinator.event.AlterTableOrPartitionBucketEvent; import org.apache.fluss.server.coordinator.event.CancelRebalanceEvent; import org.apache.fluss.server.coordinator.event.CommitKvSnapshotEvent; import org.apache.fluss.server.coordinator.event.CommitLakeTableSnapshotEvent; @@ -89,6 +90,8 @@ import org.apache.fluss.server.coordinator.event.SchemaChangeEvent; import org.apache.fluss.server.coordinator.event.TableRegistrationChangeEvent; import org.apache.fluss.server.coordinator.event.watcher.CoordinatorChangeWatcher; +import org.apache.fluss.server.coordinator.event.watcher.PartitionAssignmentChangeWatcher; +import org.apache.fluss.server.coordinator.event.watcher.TableAssignmentChangeWatcher; import org.apache.fluss.server.coordinator.event.watcher.TableChangeWatcher; import org.apache.fluss.server.coordinator.event.watcher.TabletServerChangeWatcher; import org.apache.fluss.server.coordinator.lease.KvSnapshotLeaseManager; @@ -178,6 +181,8 @@ public class CoordinatorEventProcessor implements EventProcessor { private final CoordinatorChannelManager coordinatorChannelManager; private final CoordinatorChangeWatcher coordinatorChangeWatcher; private final TabletServerChangeWatcher tabletServerChangeWatcher; + private final TableAssignmentChangeWatcher tableAssignmentChangeWatcher; + private final PartitionAssignmentChangeWatcher partitionAssignmentChangeWatcher; private final CoordinatorMetadataCache serverMetadataCache; private final CoordinatorRequestBatch coordinatorRequestBatch; private final String internalListenerName; @@ -234,6 +239,10 @@ public CoordinatorEventProcessor( this.tableChangeWatcher = new TableChangeWatcher(zooKeeperClient, coordinatorEventManager); this.tabletServerChangeWatcher = new TabletServerChangeWatcher(zooKeeperClient, coordinatorEventManager); + this.tableAssignmentChangeWatcher = + new TableAssignmentChangeWatcher(zooKeeperClient, coordinatorEventManager); + this.partitionAssignmentChangeWatcher = + new PartitionAssignmentChangeWatcher(zooKeeperClient, coordinatorEventManager); this.coordinatorRequestBatch = new CoordinatorRequestBatch( coordinatorChannelManager, coordinatorEventManager, coordinatorContext); @@ -273,6 +282,8 @@ public void startup() { coordinatorChangeWatcher.start(); tabletServerChangeWatcher.start(); tableChangeWatcher.start(); + tableAssignmentChangeWatcher.start(); + partitionAssignmentChangeWatcher.start(); LOG.info("Initializing coordinator context."); try { initCoordinatorContext(); @@ -338,6 +349,10 @@ public int getCoordinatorEpoch() { return coordinatorContext.getCoordinatorEpoch(); } + public int getCoordinatorZkVersion() { + return coordinatorContext.getCoordinatorZkVersion(); + } + private void initCoordinatorContext() throws Exception { long start = System.currentTimeMillis(); // get all coordinator servers @@ -562,6 +577,8 @@ private void onShutdown() { coordinatorChangeWatcher.stop(); tableChangeWatcher.stop(); tabletServerChangeWatcher.stop(); + tableAssignmentChangeWatcher.stop(); + partitionAssignmentChangeWatcher.stop(); } @Override @@ -574,6 +591,8 @@ public void process(CoordinatorEvent event) { processDropTable((DropTableEvent) event); } else if (event instanceof DropPartitionEvent) { processDropPartition((DropPartitionEvent) event); + } else if (event instanceof AlterTableOrPartitionBucketEvent) { + processAlterTableOrPartitionBucket((AlterTableOrPartitionBucketEvent) event); } else if (event instanceof SchemaChangeEvent) { SchemaChangeEvent schemaChangeEvent = (SchemaChangeEvent) event; processSchemaChange(schemaChangeEvent); @@ -919,6 +938,141 @@ private void processDropPartition(DropPartitionEvent dropPartitionEvent) { dropTableInfo.getTablePath(), tableId, tablePartition.getPartitionId()); } + private void processAlterTableOrPartitionBucket( + AlterTableOrPartitionBucketEvent alterTableOrPartitionBucketEvent) { + long tableId = alterTableOrPartitionBucketEvent.getTableId(); + Long partitionId = alterTableOrPartitionBucketEvent.getPartitionId(); + boolean isPartitionedTable = partitionId != null; + + Map bucketsToBeAdded; + if (!isPartitionedTable) { + bucketsToBeAdded = + alterTableOrPartitionBucketEvent + .getTableOrPartitionAssignment() + .getBucketAssignments() + .entrySet() + .stream() + .filter( + e -> + !coordinatorContext + .getTableAssignment(tableId) + .containsKey(e.getKey())) + .collect( + Collectors.toMap( + e -> new TableBucket(tableId, e.getKey()), + Map.Entry::getValue)); + } else { + bucketsToBeAdded = + alterTableOrPartitionBucketEvent + .getTableOrPartitionAssignment() + .getBucketAssignments() + .entrySet() + .stream() + .filter( + e -> + !coordinatorContext + .getPartitionAssignment( + new TablePartition( + tableId, partitionId)) + .containsKey(e.getKey())) + .collect( + Collectors.toMap( + e -> new TableBucket(tableId, partitionId, e.getKey()), + Map.Entry::getValue)); + } + + if (coordinatorContext.isTableQueuedForDeletion(tableId)) { + if (!bucketsToBeAdded.isEmpty()) { + LOG.warn( + "Skipping adding buckets {} for table {} since it is currently being deleted.", + bucketsToBeAdded, + tableId); + restoreBucketReplicaAssignment(tableId, partitionId); + } else { + LOG.info( + "Ignoring bucket change during table deletion as no new buckets are added"); + } + } else if (!bucketsToBeAdded.isEmpty()) { + LOG.info("New buckets to be added {}", bucketsToBeAdded); + bucketsToBeAdded.forEach( + (tableBucket, bucketAssignment) -> + coordinatorContext.updateBucketReplicaAssignment( + tableBucket, bucketAssignment.getReplicas())); + tableManager.onCreateNewTableBucket(tableId, bucketsToBeAdded.keySet()); + + Set tableBuckets = new HashSet<>(); + alterTableOrPartitionBucketEvent + .getTableOrPartitionAssignment() + .getBucketAssignments() + .keySet() + .forEach( + bucketId -> + tableBuckets.add( + new TableBucket(tableId, partitionId, bucketId))); + + // Update table info with new bucket number + TableInfo tableInfo = coordinatorContext.getTableInfoById(tableId); + coordinatorContext.putTableInfo( + tableInfo.toNewTableInfo( + alterTableOrPartitionBucketEvent + .getTableOrPartitionAssignment() + .getBucketAssignments() + .size())); + + updateTabletServerMetadataCache( + new HashSet<>(coordinatorContext.getLiveTabletServers().values()), + null, + null, + tableBuckets); + } + } + + private void restoreBucketReplicaAssignment(long tableId, @Nullable Long partitionId) { + LOG.info( + "Restoring the bucket replica assignment for table {}, partition {}", + tableId, + partitionId); + + if (partitionId == null) { + Map oldTableAssignment = + coordinatorContext.getTableAssignment(tableId).entrySet().stream() + .collect( + Collectors.toMap( + Map.Entry::getKey, + entry -> new BucketAssignment(entry.getValue()))); + try { + zooKeeperClient.updateTableAssignment( + tableId, + new TableAssignment(oldTableAssignment), + coordinatorContext.getCoordinatorZkVersion()); + } catch (Exception e) { + LOG.error("Failed to restore table assignment for table {}", tableId, e); + } + } else { + Map oldPartitionAssignment = + coordinatorContext + .getPartitionAssignment(new TablePartition(tableId, partitionId)) + .entrySet() + .stream() + .collect( + Collectors.toMap( + Map.Entry::getKey, + entry -> new BucketAssignment(entry.getValue()))); + try { + zooKeeperClient.updatePartitionAssignment( + tableId, + new PartitionAssignment(tableId, oldPartitionAssignment), + coordinatorContext.getCoordinatorZkVersion()); + } catch (Exception e) { + LOG.error( + "Failed to restore partition assignment for table {} partition {}", + tableId, + partitionId, + e); + } + } + } + private void processDeleteReplicaResponseReceived( DeleteReplicaResponseReceivedEvent deleteReplicaResponseReceivedEvent) { List deleteReplicaResultForBuckets = diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java index 3441053671..4d331834f1 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java @@ -288,7 +288,9 @@ public void addStopReplicaRequestForTabletServers( *
  • case8: One newly tabletServer added into cluster *
  • case9: One tabletServer is removed from cluster *
  • case10: schemaId is changed after table is created. - *
  • case 11: TableRegistration changed after table is created. + *
  • case11: TableRegistration changed after table is created. + *
  • case12: Alter table bucket and new bucketAssigment of this table or table's partition + * generated. * */ // todo: improve this with different phase enum. @@ -313,7 +315,7 @@ public void addUpdateMetadataRequestForTabletServers( updateMetadataRequestBucketMap.put(tableId, Collections.emptyList()); } } else { - // case1, case2, case5, case7, case8 + // case1, case2, case5, case7, case8, case12 for (TableBucket tableBucket : tableBuckets) { long currentTableId = tableBucket.getTableId(); Long currentPartitionId = tableBucket.getPartitionId(); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java index bdc97434ec..3bf2d9e5b8 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java @@ -232,6 +232,7 @@ public final class CoordinatorService extends RpcServiceBase implements Coordina private final boolean kvTableAllowCreation; private final Supplier eventManagerSupplier; private final Supplier coordinatorEpochSupplier; + private final Supplier coordinatorZkVersionSupplier; private final CoordinatorMetadataCache metadataCache; private final LakeTableTieringManager lakeTableTieringManager; @@ -272,6 +273,8 @@ public CoordinatorService( () -> coordinatorEventProcessorSupplier.get().getCoordinatorEventManager(); this.coordinatorEpochSupplier = () -> coordinatorEventProcessorSupplier.get().getCoordinatorEpoch(); + this.coordinatorZkVersionSupplier = + () -> coordinatorEventProcessorSupplier.get().getCoordinatorZkVersion(); this.lakeTableTieringManager = lakeTableTieringManager; this.metadataCache = metadataCache; this.lakeCatalogDynamicLoader = lakeCatalogDynamicLoader; @@ -532,7 +535,9 @@ public CompletableFuture alterTable(AlterTableRequest reques alterTableConfigChanges, tablePropertyChanges, request.isIgnoreIfNotExists(), - currentSession().getPrincipal()); + metadataCache, + currentSession().getPrincipal(), + coordinatorZkVersionSupplier.get()); } return CompletableFuture.completedFuture(new AlterTableResponse()); @@ -545,7 +550,11 @@ public static TablePropertyChanges toTablePropertyChanges(List tabl } for (TableChange tableChange : tableChanges) { - if (tableChange instanceof TableChange.SetOption) { + if (tableChange instanceof TableChange.BucketNumOption) { + TableChange.BucketNumOption bucketNumOption = + (TableChange.BucketNumOption) tableChange; + builder.setBucketNum(bucketNumOption.getBucketNum()); + } else if (tableChange instanceof TableChange.SetOption) { TableChange.SetOption setOption = (TableChange.SetOption) tableChange; String optionKey = setOption.getKey(); if (isTableStorageConfig(optionKey)) { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java index 65ba95d2ab..d496ab7f47 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java @@ -17,6 +17,7 @@ package org.apache.fluss.server.coordinator; +import org.apache.fluss.cluster.TabletServerInfo; import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; import org.apache.fluss.exception.DatabaseAlreadyExistException; @@ -25,6 +26,7 @@ import org.apache.fluss.exception.FlussRuntimeException; import org.apache.fluss.exception.InvalidAlterTableException; import org.apache.fluss.exception.InvalidPartitionException; +import org.apache.fluss.exception.InvalidTableException; import org.apache.fluss.exception.LakeTableAlreadyExistException; import org.apache.fluss.exception.PartitionAlreadyExistsException; import org.apache.fluss.exception.PartitionNotExistException; @@ -49,8 +51,10 @@ import org.apache.fluss.security.acl.FlussPrincipal; import org.apache.fluss.server.entity.DatabasePropertyChanges; import org.apache.fluss.server.entity.TablePropertyChanges; +import org.apache.fluss.server.metadata.CoordinatorMetadataCache; import org.apache.fluss.server.utils.TableDescriptorValidation; import org.apache.fluss.server.zk.ZooKeeperClient; +import org.apache.fluss.server.zk.data.BucketAssignment; import org.apache.fluss.server.zk.data.DatabaseRegistration; import org.apache.fluss.server.zk.data.PartitionAssignment; import org.apache.fluss.server.zk.data.PartitionRegistration; @@ -74,7 +78,9 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.Callable; +import java.util.stream.Collectors; +import static org.apache.fluss.server.utils.TableAssignmentUtils.generateAssignmentOfNewlyAddedBuckets; import static org.apache.fluss.server.utils.TableDescriptorValidation.validateAlterTableProperties; /** A manager for metadata. */ @@ -502,7 +508,9 @@ public void alterTableProperties( List tableChanges, TablePropertyChanges tablePropertyChanges, boolean ignoreIfNotExists, - FlussPrincipal flussPrincipal) { + CoordinatorMetadataCache metadataCache, + FlussPrincipal flussPrincipal, + int expectedZkVersion) { try { // it throws TableNotExistException if the table or database not exists TableRegistration tableReg = getTableRegistration(tablePath); @@ -512,7 +520,7 @@ public void alterTableProperties( TableInfo tableInfo = tableReg.toTableInfo(tablePath, schemaInfo); // validate the changes - validateAlterTableProperties(tableInfo, tablePropertyChanges.tableKeysToChange()); + validateAlterTableProperties(tableInfo, tablePropertyChanges); TableDescriptor tableDescriptor = tableInfo.toTableDescriptor(); TableDescriptor newDescriptor = @@ -544,11 +552,24 @@ public void alterTableProperties( // pre alter table properties, e.g. create lake table in lake storage if it's to // enable datalake for the table preAlterTableProperties( - tablePath, tableDescriptor, newDescriptor, tableChanges, flussPrincipal); + tablePath, + tableInfo, + tableDescriptor, + newDescriptor, + tableChanges, + metadataCache, + flussPrincipal, + expectedZkVersion); + // update the table to zk TableRegistration updatedTableRegistration = - tableReg.newProperties( - newDescriptor.getProperties(), newDescriptor.getCustomProperties()); + tableReg.newBucketCountAndProperties( + newDescriptor + .getTableDistribution() + .flatMap(TableDescriptor.TableDistribution::getBucketCount) + .orElse(tableInfo.getNumBuckets()), + newDescriptor.getProperties(), + newDescriptor.getCustomProperties()); zookeeperClient.updateTable(tablePath, updatedTableRegistration); } else { LOG.info( @@ -571,10 +592,16 @@ public void alterTableProperties( private void preAlterTableProperties( TablePath tablePath, + TableInfo tableInfo, TableDescriptor tableDescriptor, TableDescriptor newDescriptor, List tableChanges, - FlussPrincipal flussPrincipal) { + CoordinatorMetadataCache metadataCache, + FlussPrincipal flussPrincipal, + int expectedZkVersion) { + // Alter table bucket + alterTableBucket(tableInfo, newDescriptor, metadataCache, expectedZkVersion); + LakeCatalog.Context lakeCatalogContext = new CoordinatorService.DefaultLakeCatalogContext( false, flussPrincipal, tableDescriptor, newDescriptor); @@ -590,7 +617,7 @@ private void preAlterTableProperties( } // to enable lake table - if (!isDataLakeEnabled(tableDescriptor)) { + if (!tableInfo.getTableConfig().isDataLakeEnabled()) { // before create table in fluss, we may create in lake try { lakeCatalog.createTable(tablePath, newDescriptor, lakeCatalogContext); @@ -624,6 +651,69 @@ private void preAlterTableProperties( } } + private void alterTableBucket( + TableInfo table, + TableDescriptor newDescriptor, + CoordinatorMetadataCache metadataCache, + int expectedZkVersion) { + Optional newNumBucketsOp = + newDescriptor + .getTableDistribution() + .flatMap(TableDescriptor.TableDistribution::getBucketCount); + if (!newNumBucketsOp.isPresent()) { + return; + } + + // no need to alter bucket number + if (newNumBucketsOp.get().equals(table.getNumBuckets())) { + return; + } + + if (!table.getBucketKeys().isEmpty() || !table.getPrimaryKeys().isEmpty()) { + throw new InvalidTableException( + "Alter table bucket is not supported for Log table with bucket keys or PrimaryKey Table now."); + } + + int newNumBuckets = newNumBucketsOp.get(); + int oldNumBuckets = table.getNumBuckets(); + int addedBuckets = newNumBuckets - oldNumBuckets; + int replicationFactor = table.getTableConfig().getReplicationFactor(); + TabletServerInfo[] servers = metadataCache.getLiveServers(); + if (table.isPartitioned()) { + // Currently, for partitioned table we only support alter bucket number for all + // partition together. + // We may support alter bucket number for each partition separately in the future. + + List partitionIds = + listPartitions(table.getTablePath()).values().stream() + .map(PartitionRegistration::getPartitionId) + .collect(Collectors.toList()); + Map combinedAssignments = new HashMap<>(); + for (Long partitionId : partitionIds) { + PartitionAssignment existingAssignment = getPartitionAssignment(partitionId); + TableAssignment newlyAddedBucketsAssignment = + generateAssignmentOfNewlyAddedBuckets( + addedBuckets, replicationFactor, servers, existingAssignment); + Map combined = new HashMap<>(); + combined.putAll(existingAssignment.getBucketAssignments()); + combined.putAll(newlyAddedBucketsAssignment.getBucketAssignments()); + combinedAssignments.put( + partitionId, new PartitionAssignment(table.getTableId(), combined)); + } + alterPartitionAssignments(combinedAssignments, expectedZkVersion); + } else { + TableAssignment existingAssignment = getTableAssignment(table.getTableId()); + TableAssignment addedBucketsAssignment = + generateAssignmentOfNewlyAddedBuckets( + addedBuckets, replicationFactor, servers, existingAssignment); + alterTableAssignment( + table.getTableId(), + existingAssignment, + addedBucketsAssignment, + expectedZkVersion); + } + } + /** * Get a new TableDescriptor with updated properties. * @@ -633,6 +723,9 @@ private void preAlterTableProperties( */ private @Nullable TableDescriptor getUpdatedTableDescriptor( TableDescriptor tableDescriptor, TablePropertyChanges tablePropertyChanges) { + // new bucket num + Integer bucketNum = tablePropertyChanges.getBucketNum(); + Map newProperties = new HashMap<>(tableDescriptor.getProperties()); Map newCustomProperties = new HashMap<>(tableDescriptor.getCustomProperties()); @@ -651,11 +744,42 @@ private void preAlterTableProperties( } // no properties change happen - if (newProperties.equals(tableDescriptor.getProperties()) + if (bucketNum == null + && newProperties.equals(tableDescriptor.getProperties()) && newCustomProperties.equals(tableDescriptor.getCustomProperties())) { return null; } else { - return tableDescriptor.withProperties(newProperties, newCustomProperties); + return tableDescriptor.withBucketCountAndProperties( + bucketNum, newProperties, newCustomProperties); + } + } + + public void alterTableAssignment( + long tableId, + TableAssignment existingAssignment, + TableAssignment addedBucketsAssignment, + int expectedZkVersion) { + try { + Map combinedAssignment = new HashMap<>(); + combinedAssignment.putAll(existingAssignment.getBucketAssignments()); + combinedAssignment.putAll(addedBucketsAssignment.getBucketAssignments()); + zookeeperClient.updateTableAssignment( + tableId, new TableAssignment(combinedAssignment), expectedZkVersion); + } catch (Exception e) { + throw new FlussRuntimeException( + "Failed to update table assignment for table id " + tableId, e); + } + } + + public void alterPartitionAssignments( + Map partitionAssignments, int expectedZkVersion) { + try { + zookeeperClient.updatePartitionAssignments(partitionAssignments, expectedZkVersion); + } catch (Exception e) { + throw new FlussRuntimeException( + "Failed to atomically update partition assignments for partitions " + + partitionAssignments.keySet(), + e); } } @@ -751,6 +875,32 @@ public TableRegistration getTableRegistration(TablePath tablePath) { return optionalTable.get(); } + public TableAssignment getTableAssignment(long tableId) { + Optional optionalTableAssignment; + try { + optionalTableAssignment = zookeeperClient.getTableAssignment(tableId); + } catch (Exception e) { + throw new RuntimeException(e); + } + if (!optionalTableAssignment.isPresent()) { + throw new TableNotExistException("Table '" + tableId + "' does not exist."); + } + return optionalTableAssignment.get(); + } + + public PartitionAssignment getPartitionAssignment(long partitionId) { + Optional optionalPartitionAssignment; + try { + optionalPartitionAssignment = zookeeperClient.getPartitionAssignment(partitionId); + } catch (Exception e) { + throw new RuntimeException(e); + } + if (!optionalPartitionAssignment.isPresent()) { + throw new PartitionNotExistException("Partition '" + partitionId + "' does not exist."); + } + return optionalPartitionAssignment.get(); + } + public SchemaInfo getLatestSchema(TablePath tablePath) throws SchemaNotExistException { final int currentSchemaId; try { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/TableManager.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/TableManager.java index b92a2dad6e..0b799918aa 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/TableManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/TableManager.java @@ -146,7 +146,7 @@ public void onCreateNewPartition( onCreateNewTableBucket(tableId, newTableBuckets); } - private void onCreateNewTableBucket(long tableId, Set tableBuckets) { + public void onCreateNewTableBucket(long tableId, Set tableBuckets) { LOG.info( "New table buckets: {} for table {}.", tableBuckets, diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/AlterTableOrPartitionBucketEvent.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/AlterTableOrPartitionBucketEvent.java new file mode 100644 index 0000000000..03d81ff43a --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/AlterTableOrPartitionBucketEvent.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator.event; + +import org.apache.fluss.server.zk.data.TableAssignment; + +import javax.annotation.Nullable; + +import java.util.Objects; + +/** An event for alter table or partition bucket number. */ +public class AlterTableOrPartitionBucketEvent implements CoordinatorEvent { + + private final long tableId; + private final @Nullable Long partitionId; + private final TableAssignment tableAssignment; + + public AlterTableOrPartitionBucketEvent(long tableId, TableAssignment tableAssignment) { + this(tableId, null, tableAssignment); + } + + public AlterTableOrPartitionBucketEvent( + long tableId, @Nullable Long partitionId, TableAssignment tableAssignment) { + this.tableId = tableId; + this.partitionId = partitionId; + this.tableAssignment = tableAssignment; + } + + public long getTableId() { + return tableId; + } + + @Nullable + public Long getPartitionId() { + return partitionId; + } + + public TableAssignment getTableOrPartitionAssignment() { + return tableAssignment; + } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) { + return false; + } + AlterTableOrPartitionBucketEvent that = (AlterTableOrPartitionBucketEvent) o; + return tableId == that.tableId + && Objects.equals(partitionId, that.partitionId) + && Objects.equals(tableAssignment, that.tableAssignment); + } + + @Override + public int hashCode() { + return Objects.hash(tableId, partitionId, tableAssignment); + } + + @Override + public String toString() { + return "AlterTableOrPartitionBucketEvent{" + + "tableId=" + + tableId + + ", partitionId=" + + partitionId + + ", tableAssignment=" + + tableAssignment + + '}'; + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/watcher/PartitionAssignmentChangeWatcher.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/watcher/PartitionAssignmentChangeWatcher.java new file mode 100644 index 0000000000..d792722129 --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/watcher/PartitionAssignmentChangeWatcher.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator.event.watcher; + +import org.apache.fluss.server.coordinator.event.AlterTableOrPartitionBucketEvent; +import org.apache.fluss.server.coordinator.event.EventManager; +import org.apache.fluss.server.zk.ZooKeeperClient; +import org.apache.fluss.server.zk.data.PartitionAssignment; +import org.apache.fluss.shaded.curator5.org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.fluss.shaded.curator5.org.apache.curator.framework.recipes.cache.CuratorCache; +import org.apache.fluss.shaded.curator5.org.apache.curator.framework.recipes.cache.CuratorCacheListener; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.regex.Pattern; + +import static org.apache.fluss.server.coordinator.event.watcher.TableAssignmentChangeWatcher.validBucketChange; +import static org.apache.fluss.server.zk.data.ZkData.PartitionIdZNode; +import static org.apache.fluss.server.zk.data.ZkData.PartitionIdsZNode; + +/** A watcher to watch the partition assignment change(bucket expansion) in zookeeper. */ +public class PartitionAssignmentChangeWatcher { + + private static final Logger LOG = + LoggerFactory.getLogger(PartitionAssignmentChangeWatcher.class); + + private static final Pattern PARTITION_ASSIGNMENT_PATH_PATTERN = + Pattern.compile("^/tabletservers/partitions/\\d+$"); + + private final CuratorCache curatorCache; + + private volatile boolean running; + + private final EventManager eventManager; + + public PartitionAssignmentChangeWatcher( + ZooKeeperClient zooKeeperClient, EventManager eventManager) { + this.curatorCache = + CuratorCache.build(zooKeeperClient.getCuratorClient(), PartitionIdsZNode.path()); + this.eventManager = eventManager; + this.curatorCache.listenable().addListener(new PartitionBucketChangeListener()); + } + + public void start() { + running = true; + curatorCache.start(); + } + + public void stop() { + if (!running) { + return; + } + running = false; + LOG.info("Stopping PartitionAssignmentChangeWatcher"); + curatorCache.close(); + } + + private class PartitionBucketChangeListener implements CuratorCacheListener { + + @Override + public void event(Type type, ChildData oldData, ChildData newData) { + if (newData != null) { + LOG.debug("Received {} event (path: {})", type, newData.getPath()); + } else { + LOG.debug("Received {} event", type); + } + + switch (type) { + case NODE_CHANGED: + { + // only NODE_CHANGE on /tabletservers/partitions/[partitionId] is valid + // partition assignment change + if (newData != null + && PARTITION_ASSIGNMENT_PATH_PATTERN + .matcher(newData.getPath()) + .matches()) { + Long partitionId = PartitionIdZNode.parsePath(newData.getPath()); + if (partitionId == null) { + break; + } + + PartitionAssignment oldPartitionAssignment = + PartitionIdZNode.decode(oldData.getData()); + PartitionAssignment newPartitionAssignment = + PartitionIdZNode.decode(newData.getData()); + if (validBucketChange(oldPartitionAssignment, newPartitionAssignment)) { + eventManager.put( + new AlterTableOrPartitionBucketEvent( + newPartitionAssignment.getTableId(), + partitionId, + newPartitionAssignment)); + } + } + break; + } + default: + break; + } + } + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/watcher/TableAssignmentChangeWatcher.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/watcher/TableAssignmentChangeWatcher.java new file mode 100644 index 0000000000..c7896b45ad --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/watcher/TableAssignmentChangeWatcher.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator.event.watcher; + +import org.apache.fluss.server.coordinator.event.AlterTableOrPartitionBucketEvent; +import org.apache.fluss.server.coordinator.event.EventManager; +import org.apache.fluss.server.zk.ZooKeeperClient; +import org.apache.fluss.server.zk.data.BucketAssignment; +import org.apache.fluss.server.zk.data.TableAssignment; +import org.apache.fluss.shaded.curator5.org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.fluss.shaded.curator5.org.apache.curator.framework.recipes.cache.CuratorCache; +import org.apache.fluss.shaded.curator5.org.apache.curator.framework.recipes.cache.CuratorCacheListener; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.regex.Pattern; + +import static org.apache.fluss.server.zk.data.ZkData.TableIdZNode; +import static org.apache.fluss.server.zk.data.ZkData.TableIdsZNode; + +/** A watcher to watch the table assignment change(bucket expansion) in zookeeper. */ +public class TableAssignmentChangeWatcher { + + private static final Logger LOG = LoggerFactory.getLogger(TableAssignmentChangeWatcher.class); + + private static final Pattern TABLE_ASSIGNMENT_PATH_PATTERN = + Pattern.compile("^/tabletservers/tables/\\d+$"); + + private final CuratorCache curatorCache; + + private volatile boolean running; + + private final EventManager eventManager; + + public TableAssignmentChangeWatcher( + ZooKeeperClient zooKeeperClient, EventManager eventManager) { + this.curatorCache = + CuratorCache.build(zooKeeperClient.getCuratorClient(), TableIdsZNode.path()); + this.eventManager = eventManager; + this.curatorCache.listenable().addListener(new TableBucketChangeListener()); + } + + public void start() { + running = true; + curatorCache.start(); + } + + public void stop() { + if (!running) { + return; + } + running = false; + LOG.info("Stopping TableAssignmentChangeWatcher"); + curatorCache.close(); + } + + /** + * Currently, we only support bucket expansion, which means that the {@code oldTableAssignment} + * must be a proper subset of the {@code newTableAssignment}. + */ + public static boolean validBucketChange( + TableAssignment oldTableAssignment, TableAssignment newTableAssignment) { + Map oldBucketAssignments = + oldTableAssignment.getBucketAssignments(); + Map newBucketAssignments = + newTableAssignment.getBucketAssignments(); + + if (!newBucketAssignments.keySet().containsAll(oldBucketAssignments.keySet()) + || newBucketAssignments.size() <= oldBucketAssignments.size()) { + return false; + } + + // Verify that existing bucket assignments have not been modified + for (Map.Entry entry : oldBucketAssignments.entrySet()) { + if (!entry.getValue().equals(newBucketAssignments.get(entry.getKey()))) { + return false; + } + } + return true; + } + + protected class TableBucketChangeListener implements CuratorCacheListener { + + @Override + public void event(Type type, ChildData oldData, ChildData newData) { + if (newData != null) { + LOG.debug("Received {} event (path: {})", type, newData.getPath()); + } else { + LOG.debug("Received {} event", type); + } + + switch (type) { + case NODE_CHANGED: + { + // only NODE_CHANGE on /tabletservers/tables/[tableId] is valid + // table assignment change + if (newData != null + && TABLE_ASSIGNMENT_PATH_PATTERN + .matcher(newData.getPath()) + .matches()) { + Long tableId = TableIdZNode.parsePath(newData.getPath()); + if (tableId == null) { + break; + } + + TableAssignment oldTableAssignment = + TableIdZNode.decode(oldData.getData()); + TableAssignment newTableAssignment = + TableIdZNode.decode(newData.getData()); + if (validBucketChange(oldTableAssignment, newTableAssignment)) { + eventManager.put( + new AlterTableOrPartitionBucketEvent( + tableId, newTableAssignment)); + } + } + break; + } + default: + break; + } + } + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/entity/TablePropertyChanges.java b/fluss-server/src/main/java/org/apache/fluss/server/entity/TablePropertyChanges.java index 00085feb6e..291cc84a0e 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/entity/TablePropertyChanges.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/entity/TablePropertyChanges.java @@ -17,6 +17,8 @@ package org.apache.fluss.server.entity; +import javax.annotation.Nullable; + import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -25,6 +27,8 @@ /** To describe the changes of the properties of a table. */ public class TablePropertyChanges { + private final @Nullable Integer bucketNum; + public final Map tablePropertiesToSet; public final Set tablePropertiesToReset; @@ -32,16 +36,22 @@ public class TablePropertyChanges { public final Set customPropertiesToReset; protected TablePropertyChanges( + @Nullable Integer bucketNum, Map tablePropertiesToSet, Set tablePropertiesToReset, Map customPropertiesToSet, Set customPropertiesToReset) { + this.bucketNum = bucketNum; this.tablePropertiesToSet = tablePropertiesToSet; this.tablePropertiesToReset = tablePropertiesToReset; this.customPropertiesToSet = customPropertiesToSet; this.customPropertiesToReset = customPropertiesToReset; } + public @Nullable Integer getBucketNum() { + return bucketNum; + } + public Set tableKeysToChange() { Set keys = new HashSet<>(tablePropertiesToSet.keySet()); keys.addAll(tablePropertiesToReset); @@ -60,12 +70,19 @@ public static TablePropertyChanges.Builder builder() { /** The builder for {@link TablePropertyChanges}. */ public static class Builder { + + private Integer bucketNum; + private final Map tablePropertiesToSet = new HashMap<>(); private final Set tablePropertiesToReset = new HashSet<>(); private final Map customPropertiesToSet = new HashMap<>(); private final Set customPropertiesToReset = new HashSet<>(); + public void setBucketNum(int bucketNum) { + this.bucketNum = bucketNum; + } + public void setTableProperty(String key, String value) { tablePropertiesToSet.put(key, value); } @@ -84,6 +101,7 @@ public void resetCustomProperty(String key) { public TablePropertyChanges build() { return new TablePropertyChanges( + bucketNum, tablePropertiesToSet, tablePropertiesToReset, customPropertiesToSet, diff --git a/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java b/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java index f7a89828ab..af417c05c4 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java @@ -222,6 +222,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.fluss.config.FlussConfigUtils.BUCKET_NUM; import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toByteBuffer; import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toPbAclInfo; import static org.apache.fluss.utils.Preconditions.checkNotNull; @@ -293,6 +294,9 @@ public static TableChange toTableChange(PbAlterConfig pbAlterConfig) { AlterConfigOpType opType = AlterConfigOpType.from(pbAlterConfig.getOpType()); switch (opType) { case SET: // SET_OPTION + if (BUCKET_NUM.equals(pbAlterConfig.getConfigKey())) { + return TableChange.bucketNum(Integer.parseInt(pbAlterConfig.getConfigValue())); + } return TableChange.set( pbAlterConfig.getConfigKey(), pbAlterConfig.getConfigValue()); case DELETE: // RESET_OPTION diff --git a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableAssignmentUtils.java b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableAssignmentUtils.java index e69b295f3f..5e8af52575 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableAssignmentUtils.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableAssignmentUtils.java @@ -34,6 +34,7 @@ import java.util.Map; import java.util.Random; import java.util.Set; +import java.util.stream.IntStream; /** Utils for the assignment of tables. */ public class TableAssignmentUtils { @@ -46,7 +47,8 @@ protected static TableAssignment generateAssignment( int replicationFactor, TabletServerInfo[] servers, int startIndex, - int nextReplicaShift) { + int nextReplicaShift, + int startBucketId) { if (nBuckets <= 0) { throw new InvalidBucketsException("Number of buckets must be larger than 0."); } @@ -68,14 +70,20 @@ protected static TableAssignment generateAssignment( replicationFactor, Arrays.stream(servers).mapToInt(TabletServerInfo::getId).toArray(), startIndex, - nextReplicaShift); + nextReplicaShift, + startBucketId); } else { if (Arrays.stream(servers).anyMatch(tsInfo -> tsInfo.getRack() == null)) { throw new InvalidServerRackInfoException( "Not all tabletServers have rack information for replica rack aware assignment."); } else { return generateRackAwareAssignment( - nBuckets, replicationFactor, servers, startIndex, nextReplicaShift); + nBuckets, + replicationFactor, + servers, + startIndex, + nextReplicaShift, + startBucketId); } } } @@ -162,14 +170,77 @@ protected static TableAssignment generateAssignment( * replica distribution is even across tabletServers and racks. */ public static TableAssignment generateAssignment( - int nBuckets, int replicationFactor, TabletServerInfo[] servers) + int nBuckets, + int replicationFactor, + TabletServerInfo[] servers, + int fixStartIndex, + int startBucketId) throws InvalidReplicationFactorException { + return generateAssignment( + nBuckets, + replicationFactor, + servers, + fixStartIndex, + fixStartIndex >= 0 ? fixStartIndex : randomInt(servers.length), + startBucketId); + } + + public static TableAssignment generateAssignment( + int nBuckets, int replicationFactor, TabletServerInfo[] servers) + throws InvalidBucketsException, InvalidReplicationFactorException { return generateAssignment( nBuckets, replicationFactor, servers, randomInt(servers.length), - randomInt(servers.length)); + randomInt(servers.length), + 0); + } + + /** + * Generate the assignment of new buckets for a table or partition when expanding the number of + * buckets. + * + *

    This method is used when a table or partition needs to be scaled out by adding new + * buckets. It ensures that the assignment of new buckets follows the same distribution strategy + * as the existing buckets, maintaining consistency in replica placement across the cluster. + * + * @param newlyAddedBuckets the number of new buckets to assign (must be greater than 0) + * @param replicationFactor the replication factor for each bucket (must be greater than 0 and + * not exceed the number of available servers) + * @param servers the array of available tablet servers for assignment + * @param existingAssignment the current table assignment, which must contain at least bucket 0 + * @return a new {@link TableAssignment} containing only the assignments for the newly added + * buckets, with bucket IDs starting from the current number of buckets + * @throws InvalidBucketsException if newlyAddedBuckets is less than or equal to 0, or if the + * existing assignment is invalid (missing bucket 0) + * @throws InvalidReplicationFactorException if replicationFactor is invalid or exceeds the + * number of available servers + */ + public static TableAssignment generateAssignmentOfNewlyAddedBuckets( + int newlyAddedBuckets, + int replicationFactor, + TabletServerInfo[] servers, + TableAssignment existingAssignment) + throws InvalidBucketsException, InvalidReplicationFactorException { + if (newlyAddedBuckets <= 0) { + throw new InvalidBucketsException( + "The newly added number of buckets must be larger than 0."); + } + + BucketAssignment existingAssignmentBucket0 = existingAssignment.getBucketAssignment(0); + if (existingAssignmentBucket0 == null) { + // Fluss ensures that every created table or partition contains at least one bucket, + // so this situation should not occur under normal circumstances. + throw new InvalidBucketsException( + "Unexpected existing bucket assignment, bucket id 0 is missing. Assignment: " + + existingAssignment); + } + + int startIndex = getStartIndex(servers, existingAssignmentBucket0); + int oldNumBuckets = existingAssignment.getBuckets().size(); + return generateAssignment( + newlyAddedBuckets, replicationFactor, servers, startIndex, oldNumBuckets); } private static TableAssignment generateRackUnawareAssignment( @@ -177,9 +248,10 @@ private static TableAssignment generateRackUnawareAssignment( int replicationFactor, int[] serverIds, int startIndex, - int nextReplicaShift) { + int nextReplicaShift, + int startBucketId) { Map assignments = new HashMap<>(); - int currentBucketId = 0; + int currentBucketId = Math.max(0, startBucketId); for (int i = 0; i < nBuckets; i++) { if (currentBucketId > 0 && (currentBucketId % serverIds.length == 0)) { nextReplicaShift += 1; @@ -203,7 +275,8 @@ private static TableAssignment generateRackAwareAssignment( int replicationFactor, TabletServerInfo[] servers, int startIndex, - int nextReplicaShift) { + int nextReplicaShift, + int startBucketId) { Map serverRackMap = new HashMap<>(); for (TabletServerInfo server : servers) { serverRackMap.put(server.getId(), server.getRack()); @@ -212,7 +285,7 @@ private static TableAssignment generateRackAwareAssignment( List arrangedServerList = getRackAlternatedTabletServerList(serverRackMap); int numServers = arrangedServerList.size(); Map assignments = new HashMap<>(); - int currentBucketId = 0; + int currentBucketId = Math.max(0, startBucketId); for (int i = 0; i < nBuckets; i++) { if (currentBucketId > 0 && (currentBucketId % arrangedServerList.size() == 0)) { nextReplicaShift += 1; @@ -313,4 +386,30 @@ private static Map> getInverseMap(Map ser results.forEach((rack, rackAndIdList) -> rackAndIdList.sort(Integer::compareTo)); return results; } + + /** + * Calculate the start index for assigning new buckets based on the existing assignment. + * + *

    This ensures that when new buckets are assigned, they follow the same distribution pattern + * as the existing buckets, maintaining consistency in the assignment strategy. For example, if + * bucket 0's first replica is on server with ID 5, and the server array is [3, 4, 5, 6, 7], + * this method will return index 2 (the position of server 5), so new buckets will start their + * round-robin assignment from that position. + * + * @param servers the array of available tablet servers, sorted by ID + * @param existingAssignmentBucket0 the bucket assignment for bucket 0, which must contain at + * least one replica + * @return the index in the server array where the round-robin assignment should start for new + * buckets (always >= 0) + */ + private static int getStartIndex( + TabletServerInfo[] servers, BucketAssignment existingAssignmentBucket0) { + int headId = existingAssignmentBucket0.getReplicas().get(0); + int index = + IntStream.range(0, servers.length) + .filter(i -> servers[i].getId() >= headId) + .findFirst() + .orElse(-1); + return Math.max(0, index); + } } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java index fcd5f1688a..e2e9973c58 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java @@ -36,6 +36,7 @@ import org.apache.fluss.metadata.Schema; import org.apache.fluss.metadata.TableDescriptor; import org.apache.fluss.metadata.TableInfo; +import org.apache.fluss.server.entity.TablePropertyChanges; import org.apache.fluss.types.DataType; import org.apache.fluss.types.DataTypeRoot; import org.apache.fluss.types.RowType; @@ -154,9 +155,27 @@ private static void checkTableLakeFormatMatchesCluster( } public static void validateAlterTableProperties( - TableInfo currentTable, Set tableKeysToChange) { - TableConfig currentConfig = currentTable.getTableConfig(); + TableInfo currentTable, TablePropertyChanges tablePropertyChanges) { + // valid table bucket + if (tablePropertyChanges.getBucketNum() != null) { + if (currentTable.getNumBuckets() >= tablePropertyChanges.getBucketNum()) { + throw new InvalidAlterTableException( + "Bucket number can only be increased, current bucket number is " + + currentTable.getNumBuckets() + + ", new bucket number is " + + tablePropertyChanges.getBucketNum()); + } + + // only support one of alter table bucket or alter table properties for atomic change. + if (!tablePropertyChanges.tableKeysToChange().isEmpty() + || !tablePropertyChanges.customKeysToChange().isEmpty()) { + throw new InvalidAlterTableException( + "Cannot alter table properties and bucket number at the same time."); + } + } + TableConfig currentConfig = currentTable.getTableConfig(); + Set tableKeysToChange = tablePropertyChanges.tableKeysToChange(); List unsupportedKeys = tableKeysToChange.stream() .filter(k -> isTableStorageConfig(k) && !isAlterableTableOption(k)) diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java index cb34a2f2ed..99db48b351 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java @@ -486,6 +486,27 @@ public void updatePartitionAssignment( partitionId); } + /** + * Atomically update multiple partition assignments in a single ZK transaction. This ensures + * that either all partition assignments are updated or none of them are. + */ + public void updatePartitionAssignments( + Map partitionAssignments, int expectedZkVersion) + throws Exception { + List updateOps = new ArrayList<>(); + for (Map.Entry entry : partitionAssignments.entrySet()) { + String path = PartitionIdZNode.path(entry.getKey()); + byte[] data = PartitionIdZNode.encode(entry.getValue()); + updateOps.add(zkOp.updateOp(path, data)); + } + List ops = wrapRequestsWithEpochCheck(updateOps, expectedZkVersion); + + zkClient.transaction().forOperations(ops); + LOG.debug( + "Atomically updated partition assignments for partition ids {}.", + partitionAssignments.keySet()); + } + public void deleteTableAssignment(long tableId) throws Exception { String path = TableIdZNode.path(tableId); zkClient.delete().deletingChildrenIfNeeded().forPath(path); @@ -526,10 +547,14 @@ public void batchRegisterLeaderAndIsrForTablePartition( // So we have to create parent dictionary in advance. RegisterTableBucketLeadAndIsrInfo firstInfo = registerList.get(0); String bucketsParentPath = BucketIdsZNode.path(firstInfo.getTableBucket()); - zkClient.create() - .creatingParentsIfNeeded() - .withMode(CreateMode.PERSISTENT) - .forPath(bucketsParentPath); + // We need to check if the bucketsParentPath exists in advance, because when adding new + // buckets for an existing table, the bucketsParentPath already exists. + if (zkClient.checkExists().forPath(bucketsParentPath) == null) { + zkClient.create() + .creatingParentsIfNeeded() + .withMode(CreateMode.PERSISTENT) + .forPath(bucketsParentPath); + } for (RegisterTableBucketLeadAndIsrInfo info : registerList) { LOG.info( diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/TableRegistration.java b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/TableRegistration.java index 89e7716d01..e828598192 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/TableRegistration.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/TableRegistration.java @@ -148,8 +148,10 @@ public static TableRegistration newTable( currentMillis); } - public TableRegistration newProperties( - Map newProperties, Map newCustomProperties) { + public TableRegistration newBucketCountAndProperties( + Integer bucketCount, + Map newProperties, + Map newCustomProperties) { final long currentMillis = System.currentTimeMillis(); return new TableRegistration( tableId, diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java index 941512e0f2..be90bcbbb7 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java @@ -464,6 +464,24 @@ public static String path(long tableId) { return TableIdsZNode.path() + "/" + tableId; } + /** + * Extracts the tableId from the given zookeeper path. If the given path is not a valid + * {@link TableIdZNode} path, returns null. + */ + @Nullable + public static Long parsePath(String zkPath) { + String[] split = zkPath.split("/tables/"); + if (split.length != 2) { + return null; + } + + try { + return Long.parseLong(split[1]); + } catch (NumberFormatException e) { + return null; + } + } + public static byte[] encode(TableAssignment tableAssignment) { return JsonSerdeUtils.writeValueAsBytes( tableAssignment, TableAssignmentJsonSerde.INSTANCE); @@ -486,6 +504,24 @@ public static String path(long partitionId) { return PartitionIdsZNode.path() + "/" + partitionId; } + /** + * Extracts the partitionId from the given zookeeper path. If the given path is not a valid + * {@link PartitionIdZNode} path, returns null. + */ + @Nullable + public static Long parsePath(String zkPath) { + String[] split = zkPath.split("/partitions/"); + if (split.length != 2) { + return null; + } + + try { + return Long.parseLong(split[1]); + } catch (NumberFormatException e) { + return null; + } + } + public static byte[] encode(PartitionAssignment tableAssignment) { return JsonSerdeUtils.writeValueAsBytes( tableAssignment, PartitionAssignmentJsonSerde.INSTANCE); diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java index 0694e9787a..7930f840bb 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java @@ -48,6 +48,7 @@ import org.apache.fluss.rpc.protocol.ApiKeys; import org.apache.fluss.server.coordinator.event.AccessContextEvent; import org.apache.fluss.server.coordinator.event.AdjustIsrReceivedEvent; +import org.apache.fluss.server.coordinator.event.AlterTableOrPartitionBucketEvent; import org.apache.fluss.server.coordinator.event.CommitKvSnapshotEvent; import org.apache.fluss.server.coordinator.event.CommitRemoteLogManifestEvent; import org.apache.fluss.server.coordinator.event.CoordinatorEventManager; @@ -80,6 +81,7 @@ import org.apache.fluss.server.zk.data.ZkData; import org.apache.fluss.server.zk.data.ZkData.PartitionIdsZNode; import org.apache.fluss.server.zk.data.ZkData.TableIdsZNode; +import org.apache.fluss.server.zk.data.ZkVersion; import org.apache.fluss.testutils.common.AllCallbackWrapper; import org.apache.fluss.types.DataTypes; import org.apache.fluss.utils.ExceptionUtils; @@ -969,7 +971,13 @@ void testTableRegistrationChange() throws Exception { builder.setCustomProperty("custom.key", "custom.value"); TablePropertyChanges tablePropertyChanges = builder.build(); metadataManager.alterTableProperties( - t1, Collections.emptyList(), tablePropertyChanges, false, null); + t1, + Collections.emptyList(), + tablePropertyChanges, + false, + null, + null, + ZkVersion.MATCH_ANY_VERSION.getVersion()); // get updated table info and verify metadata update request is sent TableInfo updatedTableInfo = metadataManager.getTable(t1); @@ -1171,6 +1179,101 @@ private void verifyIsr(TableBucket tb, int expectedLeader, List expecte .hasSameElementsAs(expectedIsr); } + @Test + void testAlterTableOrPartitionBucket() throws Exception { + // make sure all request to gateway should be successful + initCoordinatorChannel(); + + int nBuckets = 3; + int replicationFactor = 3; + TabletServerInfo[] tabletServerInfos = + new TabletServerInfo[] { + new TabletServerInfo(0, "rack0"), + new TabletServerInfo(1, "rack1"), + new TabletServerInfo(2, "rack2") + }; + + // 1. test for non-partitioned table + // create a table + TablePath t1 = TablePath.of(defaultDatabase, "test_alter_bucket_t1"); + TableDescriptor t1Descriptor = TEST_TABLE; + TableAssignment t1Assignment = + generateAssignment(nBuckets, replicationFactor, tabletServerInfos); + long t1Id = metadataManager.createTable(t1, t1Descriptor, t1Assignment, false); + verifyTableCreated(t1Id, t1Assignment, nBuckets, replicationFactor); + + // generate new buckets assignments + TableAssignment incrementTableAssignment = + generateAssignment(nBuckets, replicationFactor, tabletServerInfos, 0, nBuckets); + Map mergeAssignments = new HashMap<>(); + mergeAssignments.putAll(t1Assignment.getBucketAssignments()); + mergeAssignments.putAll(incrementTableAssignment.getBucketAssignments()); + TableAssignment mergeTableAssignment = new TableAssignment(mergeAssignments); + // alter bucket + eventProcessor + .getCoordinatorEventManager() + .put(new AlterTableOrPartitionBucketEvent(t1Id, mergeTableAssignment)); + verifyTableCreated(t1Id, mergeTableAssignment, nBuckets * 2, replicationFactor); + + // 2. test for partitioned table + TablePath t2 = TablePath.of(defaultDatabase, "test_alter_bucket_t2"); + // create a partitioned table + TableDescriptor t2TableDescriptor = getPartitionedTable(); + long t2Id = metadataManager.createTable(t2, t2TableDescriptor, null, false); + Map assignments = + generateAssignment(nBuckets, replicationFactor, tabletServerInfos) + .getBucketAssignments(); + PartitionAssignment partitionAssignment = new PartitionAssignment(t2Id, assignments); + Tuple2 partitionIdAndNameTuple2 = + preparePartitionAssignment(t2, t2Id, partitionAssignment); + + long partition1Id = partitionIdAndNameTuple2.f0.partitionId; + long partition2Id = partitionIdAndNameTuple2.f1.partitionId; + + verifyPartitionCreated( + new TablePartition(t2Id, partition1Id), + partitionAssignment, + nBuckets, + replicationFactor); + verifyPartitionCreated( + new TablePartition(t2Id, partition2Id), + partitionAssignment, + nBuckets, + replicationFactor); + + // generate new partition assignments + Map incrementAssignments = + generateAssignment(nBuckets, replicationFactor, tabletServerInfos, 0, nBuckets) + .getBucketAssignments(); + mergeAssignments = new HashMap<>(); + mergeAssignments.putAll(assignments); + mergeAssignments.putAll(incrementAssignments); + PartitionAssignment mergePartitionAssignment = + new PartitionAssignment(t2Id, mergeAssignments); + + // alter partitions bucket + eventProcessor + .getCoordinatorEventManager() + .put( + new AlterTableOrPartitionBucketEvent( + t2Id, partition1Id, mergePartitionAssignment)); + eventProcessor + .getCoordinatorEventManager() + .put( + new AlterTableOrPartitionBucketEvent( + t2Id, partition2Id, mergePartitionAssignment)); + verifyPartitionCreated( + new TablePartition(t2Id, partition1Id), + mergePartitionAssignment, + nBuckets * 2, + replicationFactor); + verifyPartitionCreated( + new TablePartition(t2Id, partition2Id), + mergePartitionAssignment, + nBuckets * 2, + replicationFactor); + } + private CoordinatorEventProcessor buildCoordinatorEventProcessor() { return new CoordinatorEventProcessor( zookeeperClient, diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/event/watcher/TableChangeWatcherTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/event/watcher/TableChangeWatcherTest.java index 72697b7a91..c59009bb85 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/event/watcher/TableChangeWatcherTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/event/watcher/TableChangeWatcherTest.java @@ -44,6 +44,7 @@ import org.apache.fluss.server.zk.data.PartitionAssignment; import org.apache.fluss.server.zk.data.TableAssignment; import org.apache.fluss.server.zk.data.TableRegistration; +import org.apache.fluss.server.zk.data.ZkVersion; import org.apache.fluss.testutils.common.AllCallbackWrapper; import org.apache.fluss.types.DataTypes; @@ -379,7 +380,13 @@ void testTableRegistrationChange() { builder.setCustomProperty("custom.key", "custom.value"); TablePropertyChanges tablePropertyChanges = builder.build(); metadataManager.alterTableProperties( - tablePath, Collections.emptyList(), tablePropertyChanges, false, null); + tablePath, + Collections.emptyList(), + tablePropertyChanges, + false, + null, + null, + ZkVersion.MATCH_ANY_VERSION.getVersion()); // get the updated table registration TableRegistration updatedTableRegistration = diff --git a/fluss-server/src/test/java/org/apache/fluss/server/utils/TableAssignmentUtilsTest.java b/fluss-server/src/test/java/org/apache/fluss/server/utils/TableAssignmentUtilsTest.java index 6777f12fa3..4639403bed 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/utils/TableAssignmentUtilsTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/utils/TableAssignmentUtilsTest.java @@ -38,6 +38,7 @@ import java.util.stream.Collectors; import static org.apache.fluss.server.utils.TableAssignmentUtils.generateAssignment; +import static org.apache.fluss.server.utils.TableAssignmentUtils.generateAssignmentOfNewlyAddedBuckets; import static org.apache.fluss.server.utils.TableAssignmentUtils.getRackAlternatedTabletServerList; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -94,6 +95,7 @@ void testTableAssignmentWithRackUnAware() { 1, toTabletServerInfo(Collections.emptyMap(), Arrays.asList(0, 1, 2, 3)), 0, + 0, 0); TableAssignment expectedAssignment = TableAssignment.builder() @@ -110,6 +112,7 @@ void testTableAssignmentWithRackUnAware() { 3, toTabletServerInfo(Collections.emptyMap(), Arrays.asList(0, 1, 2, 3)), 1, + 0, 0); expectedAssignment = TableAssignment.builder() @@ -126,6 +129,7 @@ void testTableAssignmentWithRackUnAware() { 3, toTabletServerInfo(Collections.emptyMap(), Arrays.asList(0, 1, 2, 3, 4)), 0, + 0, 0); expectedAssignment = TableAssignment.builder() @@ -485,6 +489,113 @@ void testPartialTabletServersHaveRackInfo() { "Not all tabletServers have rack information for replica rack aware assignment."); } + @Test + void testAssignmentOfNewlyAddedBucketsWithFixedStartAndShift() { + // Test servers without rack + List servers = Arrays.asList(0, 1, 2, 3, 4, 5); + TableAssignment fullAssignmentWithoutRack = + generateAssignment( + 12, 3, toTabletServerInfo(Collections.emptyMap(), servers), 0, 1, 0); + TableAssignment newlyAddedBucketsAssignmentWithoutRack = + generateAssignment( + 6, 3, toTabletServerInfo(Collections.emptyMap(), servers), 0, 1, 6); + // we use fix startIndex and nextReplicaShift, so the newly added buckets assignment should + // be a proper subset of full assignment + assertThat(fullAssignmentWithoutRack.getBucketAssignments()) + .containsAllEntriesOf( + newlyAddedBucketsAssignmentWithoutRack.getBucketAssignments()); + + // Test servers with rack + Map rackMap = new HashMap<>(); + rackMap.put(0, "rack1"); + rackMap.put(1, "rack2"); + rackMap.put(2, "rack2"); + rackMap.put(3, "rack3"); + rackMap.put(4, "rack3"); + rackMap.put(5, "rack1"); + + TableAssignment fullAssignment = + generateAssignment( + 12, 3, toTabletServerInfo(rackMap, Collections.emptyList()), 0, 1, 0); + TableAssignment incrementAssignment = + generateAssignment( + 6, 3, toTabletServerInfo(rackMap, Collections.emptyList()), 0, 1, 6); + // we use fix startIndex and nextReplicaShift, so the newly added buckets assignment should + // be a proper subset of full assignment + assertThat(fullAssignment.getBucketAssignments()) + .containsAllEntriesOf(incrementAssignment.getBucketAssignments()); + } + + @Test + void testAssignmentOfNewlyAddedBuckets() { + // Test servers without rack + List servers = Arrays.asList(0, 1, 2, 3, 4, 5); + TableAssignment fullAssignmentWithoutRack = + generateAssignment( + 12, 3, toTabletServerInfo(Collections.emptyMap(), servers), 0, 1, 0); + TableAssignment firstAssignmentWithoutRack = + generateAssignment( + 6, 3, toTabletServerInfo(Collections.emptyMap(), servers), 0, 1, 0); + TableAssignment newlyAddedBucketsAssignmentWithoutRack = + generateAssignmentOfNewlyAddedBuckets( + 6, + 3, + toTabletServerInfo(Collections.emptyMap(), servers), + firstAssignmentWithoutRack); + + // generateAssignmentOfNewlyAddedBuckets() will reuse the startIndex of the first + // assignment, + // however, the nextReplicaShift of the first assignment is not accessible. + // so only the leader and the leader in the full assignment can be ensured to be identical. + assertThat( + fullAssignmentWithoutRack.getBucketAssignments().entrySet().stream() + .filter(entry -> entry.getKey() >= 6) + .map(entry -> entry.getValue().getReplicas().get(0)) + .collect(Collectors.toSet())) + .containsExactlyInAnyOrderElementsOf( + newlyAddedBucketsAssignmentWithoutRack + .getBucketAssignments() + .values() + .stream() + .map(entry -> entry.getReplicas().get(0)) + .collect(Collectors.toSet())); + + // Test servers with rack + Map rackMap = new HashMap<>(); + rackMap.put(0, "rack1"); + rackMap.put(1, "rack2"); + rackMap.put(2, "rack2"); + rackMap.put(3, "rack3"); + rackMap.put(4, "rack3"); + rackMap.put(5, "rack1"); + + TableAssignment fullAssignment = + generateAssignment( + 12, 3, toTabletServerInfo(rackMap, Collections.emptyList()), 0, 1, 0); + TableAssignment firstAssignment = + generateAssignment( + 6, 3, toTabletServerInfo(rackMap, Collections.emptyList()), 0, 1, 0); + TableAssignment newlyAddedBucketsAssignment = + generateAssignmentOfNewlyAddedBuckets( + 6, + 3, + toTabletServerInfo(rackMap, Collections.emptyList()), + firstAssignment); + // generateAssignmentOfNewlyAddedBuckets() will reuse the startIndex of the first + // assignment, + // however, the nextReplicaShift of the first assignment is not accessible. + // so only the leader and the leader in the full assignment can be ensured to be identical. + assertThat( + fullAssignment.getBucketAssignments().entrySet().stream() + .filter(entry -> entry.getKey() >= 6) + .map(entry -> entry.getValue().getReplicas().get(0)) + .collect(Collectors.toSet())) + .containsExactlyInAnyOrderElementsOf( + newlyAddedBucketsAssignment.getBucketAssignments().values().stream() + .map(entry -> entry.getReplicas().get(0)) + .collect(Collectors.toSet())); + } + private static void checkTableAssignment( TableAssignment assignment, Map serverRackMapping, diff --git a/fluss-server/src/test/java/org/apache/fluss/server/utils/TableDescriptorValidationTest.java b/fluss-server/src/test/java/org/apache/fluss/server/utils/TableDescriptorValidationTest.java new file mode 100644 index 0000000000..eb3093d3ad --- /dev/null +++ b/fluss-server/src/test/java/org/apache/fluss/server/utils/TableDescriptorValidationTest.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.utils; + +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.exception.InvalidAlterTableException; +import org.apache.fluss.metadata.TableDescriptor; +import org.apache.fluss.metadata.TableInfo; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.server.entity.TablePropertyChanges; + +import org.junit.jupiter.api.Test; + +import static org.apache.fluss.record.TestData.DATA1_SCHEMA; +import static org.apache.fluss.record.TestData.DEFAULT_REMOTE_DATA_DIR; +import static org.apache.fluss.server.utils.TableDescriptorValidation.validateAlterTableProperties; +import static org.assertj.core.api.Assertions.assertThatNoException; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Test for {@link TableDescriptorValidation}. */ +class TableDescriptorValidationTest { + + @Test + void testValidateAlterBucketNumber() { + TablePath tablePath = TablePath.of("test_db", "test_table"); + TableInfo tableInfo = createLogTableInfo(tablePath, 10); + + // 1. reduce bucket number should throw exception + TablePropertyChanges.Builder builder = TablePropertyChanges.builder(); + builder.setBucketNum(5); + TablePropertyChanges reduceChanges = builder.build(); + assertThatThrownBy(() -> validateAlterTableProperties(tableInfo, reduceChanges)) + .isInstanceOf(InvalidAlterTableException.class) + .hasMessageContaining("Bucket number can only be increased") + .hasMessageContaining("current bucket number is 10") + .hasMessageContaining("new bucket number is 5"); + + // 2. increase bucket number should not throw exception + builder = TablePropertyChanges.builder(); + builder.setBucketNum(20); + TablePropertyChanges increaseChanges = builder.build(); + assertThatNoException() + .isThrownBy(() -> validateAlterTableProperties(tableInfo, increaseChanges)); + + // 3. alter bucket number and properties at the same time should throw exception + builder = TablePropertyChanges.builder(); + builder.setBucketNum(20); + builder.setTableProperty(ConfigOptions.TABLE_TIERED_LOG_LOCAL_SEGMENTS.key(), "3"); + TablePropertyChanges mixedChanges = builder.build(); + assertThatThrownBy(() -> validateAlterTableProperties(tableInfo, mixedChanges)) + .isInstanceOf(InvalidAlterTableException.class) + .hasMessage("Cannot alter table properties and bucket number at the same time."); + } + + private TableInfo createLogTableInfo(TablePath tablePath, int numBuckets) { + TableDescriptor tableDescriptor = + TableDescriptor.builder() + .schema(DATA1_SCHEMA) + .distributedBy(numBuckets) + .property(ConfigOptions.TABLE_REPLICATION_FACTOR, 1) + .build(); + return TableInfo.of( + tablePath, + 1L, + 1, + tableDescriptor, + DEFAULT_REMOTE_DATA_DIR, + System.currentTimeMillis(), + System.currentTimeMillis()); + } +}