Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,20 @@ public class FlinkSourceEnumerator
/** Buckets that have been assigned to readers. */
private final Set<TableBucket> assignedTableBuckets;

/**
* Remaining lake snapshot and hybrid lake/Fluss splits to assign.
*
* <p>The field has three states:
*
* <ul>
* <li>{@code null}: lake split initialization has not run yet, or the source has no lake
* (non-lake table) so initialization will never run.
* <li>empty list: lake split initialization has run, or this enumerator was started in
* Fluss-only (non-lake) mode and must not initialize lake splits after restore.
* <li>non-empty list: lake split initialization has run and these splits still need to be
* assigned.
* </ul>
*/
@Nullable private List<SourceSplitBase> pendingHybridLakeFlussSplits;

private final long scanPartitionDiscoveryIntervalMs;
Expand Down Expand Up @@ -511,18 +525,16 @@ private void startInBatchMode() {

private void startInStreamModeForNonPartitionedTable() {
if (lakeSource != null) {
context.callAsync(
() -> {
// firstly, try to generate hybrid lake splits,
List<SourceSplitBase> splits = generateHybridLakeFlussSplits();
// splits is null,
// we'll fall back to normal fluss splits generation logic
if (splits == null) {
splits = this.initNonPartitionedSplits();
}
return splits;
},
this::handleSplitsAdd);
// Generate lake splits synchronously so that they are available before the
// first checkpoint. This is consistent with the partitioned-table path in
// start() and ensures generateHybridLakeFlussSplits() can safely use
// checkpointTriggeredBefore to distinguish fresh starts from restores.
List<SourceSplitBase> splits = generateHybridLakeFlussSplits();
if (splits == null) {
// no lake snapshot, fall back to normal Fluss splits
splits = this.initNonPartitionedSplits();
}
handleSplitsAdd(splits, null);
} else {
// init bucket splits and assign
context.callAsync(this::initNonPartitionedSplits, this::handleSplitsAdd);
Expand Down Expand Up @@ -869,13 +881,19 @@ private List<SourceSplitBase> getLogSplit(
/** Return the hybrid lake and fluss splits. Return null if no lake snapshot. */
@Nullable
private List<SourceSplitBase> generateHybridLakeFlussSplits() {
// still have pending lake fluss splits,
// should be restored from checkpoint, shouldn't
// list splits again
// Restored from checkpoint with pending lake splits — return them directly
// without re-generating.
if (pendingHybridLakeFlussSplits != null) {
LOG.info("Still have pending lake fluss splits, shouldn't list splits again.");
return pendingHybridLakeFlussSplits;
}
// Restored from checkpoint but pending lake split is null(e.g. the source was
// originally started in Fluss-only mode without lake). Do not generate lake
// splits for this restore; mark as initialized and return empty list.
if (checkpointTriggeredBefore) {
pendingHybridLakeFlussSplits = Collections.emptyList();
return pendingHybridLakeFlussSplits;
}
try {
LakeSplitGenerator lakeSplitGenerator =
new LakeSplitGenerator(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,12 +270,15 @@ private List<SourceSplitBase> deserializeRemainingHybridLakeFlussSplits(
if (in.readBoolean()) {
int numSplits = in.readInt();
List<SourceSplitBase> splits = new ArrayList<>(numSplits);
int version = in.readInt();
if (numSplits == 0) {
return splits;
}
SourceSplitSerializer sourceSplitSerializer =
new SourceSplitSerializer(
checkNotNull(
lakeSource,
"lake source must not be null when there are hybrid lake splits."));
int version = in.readInt();
for (int i = 0; i < numSplits; i++) {
int splitSizeInBytes = in.readInt();
byte[] splitBytes = new byte[splitSizeInBytes];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.fluss.flink.source.split.LogSplit;
import org.apache.fluss.flink.source.split.SnapshotSplit;
import org.apache.fluss.flink.source.split.SourceSplitBase;
import org.apache.fluss.flink.source.state.SourceEnumeratorState;
import org.apache.fluss.flink.utils.FlinkTestBase;
import org.apache.fluss.lake.source.LakeSource;
import org.apache.fluss.lake.source.LakeSplit;
Expand Down Expand Up @@ -210,6 +211,96 @@ void testInvalidSplitAssignmentBatchSize() throws Exception {
}
}

@Test
void testRestoreFlussOnlySourceWithLakeSourceDoesNotGenerateLakeSplits(@TempDir Path tempDir)
throws Throwable {
long tableId =
createTable(DEFAULT_TABLE_PATH, DEFAULT_AUTO_PARTITIONED_LOG_TABLE_DESCRIPTOR);
ZooKeeperClient zooKeeperClient = FLUSS_CLUSTER_EXTENSION.getZooKeeperClient();
Map<Long, String> partitionNameByIds =
waitUntilPartitions(zooKeeperClient, DEFAULT_TABLE_PATH);
Long partitionId = partitionNameByIds.keySet().stream().sorted().findFirst().get();
String partitionName = partitionNameByIds.get(partitionId);

LakeTableSnapshot lakeTableSnapshot =
new LakeTableSnapshot(
0,
ImmutableMap.of(
new TableBucket(tableId, partitionId, 0), 50L,
new TableBucket(tableId, partitionId, 1), 50L,
new TableBucket(tableId, partitionId, 2), 50L));
LakeTableHelper lakeTableHelper = new LakeTableHelper(zooKeeperClient, tempDir.toString());
lakeTableHelper.registerLakeTableSnapshotV1(tableId, lakeTableSnapshot);

ResolvedPartitionSpec partitionSpec =
ResolvedPartitionSpec.fromPartitionName(
Collections.singletonList("name"), partitionName);
LakeSource<LakeSplit> lakeSource =
new TestingLakeSource(
DEFAULT_BUCKET_NUM,
Collections.singletonList(
new PartitionInfo(
partitionId, partitionSpec, DEFAULT_REMOTE_DATA_DIR)));

SourceEnumeratorState checkpointState;
try (MockSplitEnumeratorContext<SourceSplitBase> context =
new MockSplitEnumeratorContext<>(1)) {
FlinkSourceEnumerator enumerator =
new FlinkSourceEnumerator(
DEFAULT_TABLE_PATH,
flussConf,
true,
false,
context,
OffsetsInitializer.timestamp(1000L),
DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS,
streaming,
null,
null,
LeaseContext.DEFAULT,
false);

checkpointState = enumerator.snapshotState(1L);
}

try (MockSplitEnumeratorContext<SourceSplitBase> context =
new MockSplitEnumeratorContext<>(DEFAULT_BUCKET_NUM);
MockWorkExecutor workExecutor = new MockWorkExecutor(context);
FlinkSourceEnumerator restoredEnumerator =
new FlinkSourceEnumerator(
DEFAULT_TABLE_PATH,
flussConf,
false,
true,
context,
checkpointState.getAssignedBuckets(),
checkpointState.getAssignedPartitions(),
checkpointState.getRemainingHybridLakeFlussSplits(),
OffsetsInitializer.full(),
DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS,
streaming,
null,
lakeSource,
workExecutor,
LeaseContext.DEFAULT,
true)) {
restoredEnumerator.start();
runPeriodicPartitionDiscovery(workExecutor);

for (int i = 0; i < DEFAULT_BUCKET_NUM; i++) {
registerReader(context, restoredEnumerator, i);
}

List<SourceSplitBase> assignedSplits =
getReadersAssignments(context).values().stream()
.flatMap(List::stream)
.collect(Collectors.toList());
assertThat(assignedSplits).isNotEmpty();
assertThat(assignedSplits).allMatch(split -> split instanceof LogSplit);
assertThat(assignedSplits).noneMatch(split -> split instanceof LakeSnapshotSplit);
}
}

@Test
void testPkTableWithSnapshotSplits() throws Throwable {
long tableId = createTable(DEFAULT_TABLE_PATH, DEFAULT_PK_TABLE_DESCRIPTOR);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,4 +169,26 @@ void testInconsistentLakeSourceSerde() throws Exception {
serializer.deserialize(serializer.getVersion(), serialized);
assertThat(deserializedSourceEnumeratorState).isEqualTo(sourceEnumeratorState);
}

@Test
void testEmptyPendingSplitsCheckpointSerdeWithoutLakeSource() throws Exception {
FlussSourceEnumeratorStateSerializer serializer =
new FlussSourceEnumeratorStateSerializer(null);

SourceEnumeratorState sourceEnumeratorState =
new SourceEnumeratorState(
Collections.emptySet(),
Collections.emptyMap(),
Collections.emptyList(),
LeaseContext.DEFAULT.getKvSnapshotLeaseId());

byte[] serialized = serializer.serialize(sourceEnumeratorState);
SourceEnumeratorState deserializedSourceEnumeratorState =
serializer.deserialize(serializer.getVersion(), serialized);

assertThat(deserializedSourceEnumeratorState).isEqualTo(sourceEnumeratorState);
assertThat(deserializedSourceEnumeratorState.getRemainingHybridLakeFlussSplits())
.isNotNull()
.isEmpty();
}
}