Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@
import java.util.stream.Collectors;

import static org.apache.fluss.cluster.rebalance.RebalanceStatus.FINAL_STATUSES;
import static org.apache.fluss.config.FlussConfigUtils.BUCKET_NUM;
import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toResolvedPartitionSpec;
import static org.apache.fluss.utils.Preconditions.checkArgument;
import static org.apache.fluss.utils.Preconditions.checkState;
Expand Down Expand Up @@ -407,7 +408,8 @@ public static AlterTableRequest makeAlterTableRequest(
} else if (tableChange instanceof TableChange.ModifyColumn) {
modifyColumns.add(toPbModifyColumn((TableChange.ModifyColumn) tableChange));
} else if (tableChange instanceof TableChange.SetOption
|| tableChange instanceof TableChange.ResetOption) {
|| tableChange instanceof TableChange.ResetOption
|| tableChange instanceof TableChange.BucketNumOption) {
alterConfigs.add(toPbAlterConfigs(tableChange));
} else {
throw new IllegalArgumentException(
Expand Down Expand Up @@ -624,7 +626,12 @@ public static PbPartitionSpec makePbPartitionSpec(PartitionSpec partitionSpec) {

public static PbAlterConfig toPbAlterConfigs(TableChange tableChange) {
PbAlterConfig info = new PbAlterConfig();
if (tableChange instanceof TableChange.SetOption) {
if (tableChange instanceof TableChange.BucketNumOption) {
TableChange.BucketNumOption bucketNumOption = (TableChange.BucketNumOption) tableChange;
info.setConfigKey(BUCKET_NUM);
info.setConfigValue(String.valueOf(bucketNumOption.getBucketNum()));
info.setOpType(AlterConfigOpType.SET.value());
} else if (tableChange instanceof TableChange.SetOption) {
TableChange.SetOption setOption = (TableChange.SetOption) tableChange;
info.setConfigKey(setOption.getKey());
info.setConfigValue(setOption.getValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,13 @@ public static void waitAllReplicasReady(long tableId, int expectBucketCount) {
}
}

public static void waitAllReplicasReady(long tableId, long partitionId, int expectBucketCount) {
for (int i = 0; i < expectBucketCount; i++) {
FLUSS_CLUSTER_EXTENSION.waitUntilAllReplicaReady(
new TableBucket(tableId, partitionId, i));
}
}

protected static void verifyRows(
RowType rowType,
Map<Long, List<InternalRow>> actualRows,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.config.MemorySize;
import org.apache.fluss.exception.InvalidAlterTableException;
import org.apache.fluss.exception.InvalidTableException;
import org.apache.fluss.fs.FsPath;
import org.apache.fluss.fs.TestFileSystem;
import org.apache.fluss.metadata.DataLakeFormat;
Expand Down Expand Up @@ -70,9 +72,11 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;

import static org.apache.fluss.client.table.scanner.batch.BatchScanUtils.collectRows;
Expand Down Expand Up @@ -1164,6 +1168,128 @@ void verifyAppendOrPut(boolean append, String logFormat, @Nullable String kvForm
}
}

@Test
void testAppendWithAlterTableBucket() throws Exception {
TableDescriptor data1TableDescriptor =
TableDescriptor.builder().schema(DATA1_SCHEMA).distributedBy(1).build();
createTable(DATA1_TABLE_PATH, data1TableDescriptor, false);
TableInfo tableInfo = admin.getTableInfo(DATA1_TABLE_PATH).get();

int lastCount = verifyAppendForAlterTableBucket(1, 0);

// alter table bucket from 1 to 2
List<TableChange> tableChanges = Collections.singletonList(TableChange.bucketNum(2));
admin.alterTable(DATA1_TABLE_PATH, tableChanges, false);

// wait until new bucket replicas are ready
waitAllReplicasReady(tableInfo.getTableId(), 2);

verifyAppendForAlterTableBucket(2, lastCount);
}

int verifyAppendForAlterTableBucket(int bucketNum, int lastCount) throws Exception {
Configuration clientConf = FLUSS_CLUSTER_EXTENSION.getClientConfig();
// use round-robin bucket assigner, so that we can append data to all buckets
clientConf.set(
ConfigOptions.CLIENT_WRITER_BUCKET_NO_KEY_ASSIGNER,
ConfigOptions.NoKeyAssigner.ROUND_ROBIN);
Connection conn = ConnectionFactory.createConnection(clientConf);
int rowCount = 10;
int expectedRowCount = lastCount + rowCount;
try (Table table = conn.getTable(DATA1_TABLE_PATH)) {
AppendWriter appendWriter = table.newAppend().createWriter();

for (int i = 0; i < rowCount; i++) {
GenericRow row = row(i, "a");
appendWriter.append(row).get();
}
appendWriter.flush();

try (LogScanner logScanner = createLogScanner(table)) {
subscribeFromBeginning(logScanner, table);

int count = 0;
Set<TableBucket> allBuckets = new HashSet<>();
while (count < expectedRowCount) {
ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1));
allBuckets.addAll(scanRecords.buckets());
count += scanRecords.count();
}
assertThat(allBuckets.size()).isEqualTo(bucketNum);
assertThat(count).isEqualTo(expectedRowCount);
}
}
conn.close();
return expectedRowCount;
}

@Test
void testInvalidAlterTableBucket() throws Exception {
Schema logTableSchema =
Schema.newBuilder()
.column("a", DataTypes.INT())
.column("b", DataTypes.STRING())
.build();
TableDescriptor td1 =
TableDescriptor.builder().schema(logTableSchema).distributedBy(1, "a").build();
TablePath t1 = TablePath.of("test_db_1", "test_invalid_alter_table_bucket_1");
createTable(t1, td1, false);

// alter table bucket from 1 to 2
List<TableChange> tableChanges = Collections.singletonList(TableChange.bucketNum(2));
// alter table bucket is not supported for log table with bucket keys now
assertThatThrownBy(() -> admin.alterTable(t1, tableChanges, false).get())
.cause()
.isInstanceOf(InvalidTableException.class)
.hasMessage(
"Alter table bucket is not supported for Log table with bucket keys or PrimaryKey Table now.");

Schema pkTableSchema =
Schema.newBuilder()
.column("a", DataTypes.INT())
.column("b", DataTypes.STRING())
.primaryKey("a")
.build();
TableDescriptor td2 =
TableDescriptor.builder().schema(pkTableSchema).distributedBy(1).build();
TablePath t2 = TablePath.of("test_db_2", "test_invalid_alter_table_bucket_2");
createTable(t2, td2, false);

// alter table bucket from 1 to 2
List<TableChange> tableChanges2 = Collections.singletonList(TableChange.bucketNum(2));
// alter table bucket is not supported for pk table now
assertThatThrownBy(() -> admin.alterTable(t2, tableChanges2, false).get())
.cause()
.isInstanceOf(InvalidTableException.class)
.hasMessage(
"Alter table bucket is not supported for Log table with bucket keys or PrimaryKey Table now.");

// test bucket number cannot be decreased
Schema logTableNoBucketKey =
Schema.newBuilder()
.column("a", DataTypes.INT())
.column("b", DataTypes.STRING())
.build();
TableDescriptor td3 =
TableDescriptor.builder().schema(logTableNoBucketKey).distributedBy(3).build();
TablePath t3 = TablePath.of("test_db_3", "test_invalid_alter_table_bucket_3");
createTable(t3, td3, false);

// alter table bucket from 3 to 2 (shrink)
List<TableChange> tableChanges3 = Collections.singletonList(TableChange.bucketNum(2));
assertThatThrownBy(() -> admin.alterTable(t3, tableChanges3, false).get())
.cause()
.isInstanceOf(InvalidAlterTableException.class)
.hasMessageContaining("Bucket number can only be increased");

// alter table bucket from 3 to 3 (same)
List<TableChange> tableChanges4 = Collections.singletonList(TableChange.bucketNum(3));
assertThatThrownBy(() -> admin.alterTable(t3, tableChanges4, false).get())
.cause()
.isInstanceOf(InvalidAlterTableException.class)
.hasMessageContaining("Bucket number can only be increased");
}

@ParameterizedTest
@ValueSource(strings = {"INDEXED", "ARROW"})
void testAppendAndProject(String format) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,25 @@

package org.apache.fluss.client.table;

import org.apache.fluss.client.Connection;
import org.apache.fluss.client.ConnectionFactory;
import org.apache.fluss.client.admin.ClientToServerITCaseBase;
import org.apache.fluss.client.lookup.Lookuper;
import org.apache.fluss.client.table.scanner.log.LogScanner;
import org.apache.fluss.client.table.scanner.log.ScanRecords;
import org.apache.fluss.client.table.writer.AppendWriter;
import org.apache.fluss.client.table.writer.UpsertWriter;
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.exception.PartitionNotExistException;
import org.apache.fluss.exception.TooManyPartitionsException;
import org.apache.fluss.metadata.PartitionInfo;
import org.apache.fluss.metadata.PhysicalTablePath;
import org.apache.fluss.metadata.Schema;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TableChange;
import org.apache.fluss.metadata.TableDescriptor;
import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.row.GenericRow;
import org.apache.fluss.row.InternalRow;
Expand All @@ -37,10 +45,13 @@

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH_PK;
import static org.apache.fluss.testutils.DataTestUtils.row;
Expand Down Expand Up @@ -146,6 +157,95 @@ void testPartitionedLogTable() throws Exception {
verifyPartitionLogs(table, schema.getRowType(), expectPartitionAppendRows);
}

@Test
void testAppendForAlterPartitionTableBucket() throws Exception {
TablePath tablePath = TablePath.of("test_db_1", "test_alter_partition_table_bucket");
createPartitionedTable(tablePath, false);
TableInfo tableInfo = admin.getTableInfo(tablePath).get();

List<PartitionInfo> partitionInfos = admin.listPartitionInfos(tablePath).get();
assertThat(partitionInfos.isEmpty()).isTrue();

// add 1 partition
admin.createPartition(tablePath, newPartitionSpec("c", "c0"), false).get();
partitionInfos = admin.listPartitionInfos(tablePath).get();
assertThat(partitionInfos.size()).isEqualTo(1);

// verify append for 1 partition with 1 bucket
int lastCount = verifyAppendForAlterPartitionTableBucket(tablePath, partitionInfos, 1, 0);

// alter table bucket from 1 to 2
List<TableChange> tableChanges = Collections.singletonList(TableChange.bucketNum(2));
admin.alterTable(tablePath, tableChanges, false);

// wait until new bucket replicas are ready
for (PartitionInfo partitionInfo : partitionInfos) {
waitAllReplicasReady(tableInfo.getTableId(), partitionInfo.getPartitionId(), 2);
}

// verify append for 1 partition with 2 bucket
lastCount =
verifyAppendForAlterPartitionTableBucket(tablePath, partitionInfos, 2, lastCount);

// add another partition, which may have 2 buckets
admin.createPartition(tablePath, newPartitionSpec("c", "c1"), false).get();
partitionInfos = admin.listPartitionInfos(tablePath).get();
assertThat(partitionInfos.size()).isEqualTo(2);

// wait until new bucket replicas are ready
for (PartitionInfo partitionInfo : partitionInfos) {
waitAllReplicasReady(tableInfo.getTableId(), partitionInfo.getPartitionId(), 2);
}

// newly created partition should also have 2 buckets
verifyAppendForAlterPartitionTableBucket(tablePath, partitionInfos, 2, lastCount);
}

private int verifyAppendForAlterPartitionTableBucket(
TablePath tablePath, List<PartitionInfo> partitionInfos, int bucketNum, int lastCount)
throws Exception {
Configuration clientConf = FLUSS_CLUSTER_EXTENSION.getClientConfig();
// use round-robin bucket assigner, so that we can append data to all buckets
clientConf.set(
ConfigOptions.CLIENT_WRITER_BUCKET_NO_KEY_ASSIGNER,
ConfigOptions.NoKeyAssigner.ROUND_ROBIN);
Connection conn = ConnectionFactory.createConnection(clientConf);
Table table = conn.getTable(tablePath);
AppendWriter appendWriter = table.newAppend().createWriter();

int recordsPerPartition = 5;
for (PartitionInfo partitionInfo : partitionInfos) {
String partitionName = partitionInfo.getPartitionName();
for (int j = 0; j < recordsPerPartition; j++) {
InternalRow row = row(j, "a" + j, partitionName);
appendWriter.append(row);
}
}
appendWriter.flush();

int expectedCount = partitionInfos.size() * recordsPerPartition + lastCount;
try (LogScanner logScanner = table.newScan().createLogScanner()) {
for (PartitionInfo partitionInfo : partitionInfos) {
for (int i = 0; i < bucketNum; i++) {
logScanner.subscribeFromBeginning(partitionInfo.getPartitionId(), i);
}
}

int count = 0;
Set<TableBucket> allBuckets = new HashSet<>();
while (count < expectedCount) {
ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1));
allBuckets.addAll(scanRecords.buckets());

count += scanRecords.count();
}
assertThat(allBuckets.size()).isEqualTo(partitionInfos.size() * bucketNum);
assertThat(count).isEqualTo(expectedCount);
}
conn.close();
return expectedCount;
}

@Test
void testWriteToNonExistsPartitionWhenDisabledDynamicPartition() throws Exception {
clientConf.set(ConfigOptions.CLIENT_WRITER_DYNAMIC_CREATE_PARTITION_ENABLED, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
@Internal
public class FlussConfigUtils {

public static final String BUCKET_NUM = "bucket.num";

public static final Map<String, ConfigOption<?>> TABLE_OPTIONS;
public static final Map<String, ConfigOption<?>> CLIENT_OPTIONS;
public static final String TABLE_PREFIX = "table.";
Expand Down
Loading