From a01bd57c85e7c7ef3d1609f977eb9790579ce39c Mon Sep 17 00:00:00 2001 From: luoyuxia Date: Wed, 20 May 2026 11:29:20 +0800 Subject: [PATCH 1/2] [flink] Preserve log-only source restore mode --- .../enumerator/FlinkSourceEnumerator.java | 20 +++- .../FlussSourceEnumeratorStateSerializer.java | 5 +- .../enumerator/FlinkSourceEnumeratorTest.java | 92 +++++++++++++++++++ .../SourceEnumeratorStateSerializerTest.java | 22 +++++ 4 files changed, 137 insertions(+), 2 deletions(-) diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java index e04989cbb2..c8dda7637b 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java @@ -134,6 +134,19 @@ public class FlinkSourceEnumerator /** Buckets that have been assigned to readers. */ private final Set assignedTableBuckets; + /** + * Remaining lake snapshot and hybrid lake/Fluss splits to assign. + * + *

The field has three states: + * + *

+ */ @Nullable private List pendingHybridLakeFlussSplits; private final long scanPartitionDiscoveryIntervalMs; @@ -1207,11 +1220,16 @@ public void addReader(int subtaskId) { @Override public SourceEnumeratorState snapshotState(long checkpointId) { + List remainingHybridLakeFlussSplits = + // Preserve Fluss-only (non-lake) startup across restore. Otherwise a restored + // enumerator with a non-null lakeSource would treat null as "not initialized yet" + // and generate lake snapshot splits. + lakeSource == null ? Collections.emptyList() : pendingHybridLakeFlussSplits; final SourceEnumeratorState enumeratorState = new SourceEnumeratorState( assignedTableBuckets, assignedPartitions, - pendingHybridLakeFlussSplits, + remainingHybridLakeFlussSplits, leaseContext.getKvSnapshotLeaseId()); LOG.debug("Source Checkpoint is {}", enumeratorState); return enumeratorState; diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/state/FlussSourceEnumeratorStateSerializer.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/state/FlussSourceEnumeratorStateSerializer.java index 121041df13..996313e607 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/state/FlussSourceEnumeratorStateSerializer.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/state/FlussSourceEnumeratorStateSerializer.java @@ -270,12 +270,15 @@ private List deserializeRemainingHybridLakeFlussSplits( if (in.readBoolean()) { int numSplits = in.readInt(); List splits = new ArrayList<>(numSplits); + int version = in.readInt(); + if (numSplits == 0) { + return splits; + } SourceSplitSerializer sourceSplitSerializer = new SourceSplitSerializer( checkNotNull( lakeSource, "lake source must not be null when there are hybrid lake splits.")); - int version = in.readInt(); for (int i = 0; i < numSplits; i++) { int splitSizeInBytes = in.readInt(); byte[] splitBytes = new byte[splitSizeInBytes]; diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java index 6d239f965b..7ccf4ac887 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java @@ -32,6 +32,7 @@ import org.apache.fluss.flink.source.split.LogSplit; import org.apache.fluss.flink.source.split.SnapshotSplit; import org.apache.fluss.flink.source.split.SourceSplitBase; +import org.apache.fluss.flink.source.state.SourceEnumeratorState; import org.apache.fluss.flink.utils.FlinkTestBase; import org.apache.fluss.lake.source.LakeSource; import org.apache.fluss.lake.source.LakeSplit; @@ -210,6 +211,97 @@ void testInvalidSplitAssignmentBatchSize() throws Exception { } } + @Test + void testRestoreFlussOnlySourceWithLakeSourceDoesNotGenerateLakeSplits(@TempDir Path tempDir) + throws Throwable { + long tableId = + createTable(DEFAULT_TABLE_PATH, DEFAULT_AUTO_PARTITIONED_LOG_TABLE_DESCRIPTOR); + ZooKeeperClient zooKeeperClient = FLUSS_CLUSTER_EXTENSION.getZooKeeperClient(); + Map partitionNameByIds = + waitUntilPartitions(zooKeeperClient, DEFAULT_TABLE_PATH); + Long partitionId = partitionNameByIds.keySet().stream().sorted().findFirst().get(); + String partitionName = partitionNameByIds.get(partitionId); + + LakeTableSnapshot lakeTableSnapshot = + new LakeTableSnapshot( + 0, + ImmutableMap.of( + new TableBucket(tableId, partitionId, 0), 50L, + new TableBucket(tableId, partitionId, 1), 50L, + new TableBucket(tableId, partitionId, 2), 50L)); + LakeTableHelper lakeTableHelper = new LakeTableHelper(zooKeeperClient, tempDir.toString()); + lakeTableHelper.registerLakeTableSnapshotV1(tableId, lakeTableSnapshot); + + ResolvedPartitionSpec partitionSpec = + ResolvedPartitionSpec.fromPartitionName( + Collections.singletonList("name"), partitionName); + LakeSource lakeSource = + new TestingLakeSource( + DEFAULT_BUCKET_NUM, + Collections.singletonList( + new PartitionInfo( + partitionId, partitionSpec, DEFAULT_REMOTE_DATA_DIR))); + + SourceEnumeratorState checkpointState; + try (MockSplitEnumeratorContext context = + new MockSplitEnumeratorContext<>(1)) { + FlinkSourceEnumerator enumerator = + new FlinkSourceEnumerator( + DEFAULT_TABLE_PATH, + flussConf, + true, + false, + context, + OffsetsInitializer.timestamp(1000L), + DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS, + streaming, + null, + null, + LeaseContext.DEFAULT, + false); + + checkpointState = enumerator.snapshotState(1L); + assertThat(checkpointState.getRemainingHybridLakeFlussSplits()).isNotNull().isEmpty(); + } + + try (MockSplitEnumeratorContext context = + new MockSplitEnumeratorContext<>(DEFAULT_BUCKET_NUM); + MockWorkExecutor workExecutor = new MockWorkExecutor(context); + FlinkSourceEnumerator restoredEnumerator = + new FlinkSourceEnumerator( + DEFAULT_TABLE_PATH, + flussConf, + false, + true, + context, + checkpointState.getAssignedBuckets(), + checkpointState.getAssignedPartitions(), + checkpointState.getRemainingHybridLakeFlussSplits(), + OffsetsInitializer.full(), + DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS, + streaming, + null, + lakeSource, + workExecutor, + LeaseContext.DEFAULT, + true)) { + restoredEnumerator.start(); + runPeriodicPartitionDiscovery(workExecutor); + + for (int i = 0; i < DEFAULT_BUCKET_NUM; i++) { + registerReader(context, restoredEnumerator, i); + } + + List assignedSplits = + getReadersAssignments(context).values().stream() + .flatMap(List::stream) + .collect(Collectors.toList()); + assertThat(assignedSplits).isNotEmpty(); + assertThat(assignedSplits).allMatch(split -> split instanceof LogSplit); + assertThat(assignedSplits).noneMatch(split -> split instanceof LakeSnapshotSplit); + } + } + @Test void testPkTableWithSnapshotSplits() throws Throwable { long tableId = createTable(DEFAULT_TABLE_PATH, DEFAULT_PK_TABLE_DESCRIPTOR); diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/state/SourceEnumeratorStateSerializerTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/state/SourceEnumeratorStateSerializerTest.java index 5273d12d16..fa90eda96f 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/state/SourceEnumeratorStateSerializerTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/state/SourceEnumeratorStateSerializerTest.java @@ -169,4 +169,26 @@ void testInconsistentLakeSourceSerde() throws Exception { serializer.deserialize(serializer.getVersion(), serialized); assertThat(deserializedSourceEnumeratorState).isEqualTo(sourceEnumeratorState); } + + @Test + void testEmptyPendingSplitsCheckpointSerdeWithoutLakeSource() throws Exception { + FlussSourceEnumeratorStateSerializer serializer = + new FlussSourceEnumeratorStateSerializer(null); + + SourceEnumeratorState sourceEnumeratorState = + new SourceEnumeratorState( + Collections.emptySet(), + Collections.emptyMap(), + Collections.emptyList(), + LeaseContext.DEFAULT.getKvSnapshotLeaseId()); + + byte[] serialized = serializer.serialize(sourceEnumeratorState); + SourceEnumeratorState deserializedSourceEnumeratorState = + serializer.deserialize(serializer.getVersion(), serialized); + + assertThat(deserializedSourceEnumeratorState).isEqualTo(sourceEnumeratorState); + assertThat(deserializedSourceEnumeratorState.getRemainingHybridLakeFlussSplits()) + .isNotNull() + .isEmpty(); + } } From e58b0cbf8870fa0ab86f417c3f0751c044428b0b Mon Sep 17 00:00:00 2001 From: luoyuxia Date: Wed, 20 May 2026 17:38:14 +0800 Subject: [PATCH 2/2] address comment Co-Authored-By: Claude Opus 4.6 --- .../enumerator/FlinkSourceEnumerator.java | 44 +++++++++---------- .../enumerator/FlinkSourceEnumeratorTest.java | 1 - 2 files changed, 22 insertions(+), 23 deletions(-) diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java index c8dda7637b..ab7b975ffb 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java @@ -140,7 +140,8 @@ public class FlinkSourceEnumerator *

The field has three states: * *

    - *
  • {@code null}: lake split initialization has not run yet. + *
  • {@code null}: lake split initialization has not run yet, or the source has no lake + * (non-lake table) so initialization will never run. *
  • empty list: lake split initialization has run, or this enumerator was started in * Fluss-only (non-lake) mode and must not initialize lake splits after restore. *
  • non-empty list: lake split initialization has run and these splits still need to be @@ -524,18 +525,16 @@ private void startInBatchMode() { private void startInStreamModeForNonPartitionedTable() { if (lakeSource != null) { - context.callAsync( - () -> { - // firstly, try to generate hybrid lake splits, - List splits = generateHybridLakeFlussSplits(); - // splits is null, - // we'll fall back to normal fluss splits generation logic - if (splits == null) { - splits = this.initNonPartitionedSplits(); - } - return splits; - }, - this::handleSplitsAdd); + // Generate lake splits synchronously so that they are available before the + // first checkpoint. This is consistent with the partitioned-table path in + // start() and ensures generateHybridLakeFlussSplits() can safely use + // checkpointTriggeredBefore to distinguish fresh starts from restores. + List splits = generateHybridLakeFlussSplits(); + if (splits == null) { + // no lake snapshot, fall back to normal Fluss splits + splits = this.initNonPartitionedSplits(); + } + handleSplitsAdd(splits, null); } else { // init bucket splits and assign context.callAsync(this::initNonPartitionedSplits, this::handleSplitsAdd); @@ -882,13 +881,19 @@ private List getLogSplit( /** Return the hybrid lake and fluss splits. Return null if no lake snapshot. */ @Nullable private List generateHybridLakeFlussSplits() { - // still have pending lake fluss splits, - // should be restored from checkpoint, shouldn't - // list splits again + // Restored from checkpoint with pending lake splits — return them directly + // without re-generating. if (pendingHybridLakeFlussSplits != null) { LOG.info("Still have pending lake fluss splits, shouldn't list splits again."); return pendingHybridLakeFlussSplits; } + // Restored from checkpoint but pending lake split is null(e.g. the source was + // originally started in Fluss-only mode without lake). Do not generate lake + // splits for this restore; mark as initialized and return empty list. + if (checkpointTriggeredBefore) { + pendingHybridLakeFlussSplits = Collections.emptyList(); + return pendingHybridLakeFlussSplits; + } try { LakeSplitGenerator lakeSplitGenerator = new LakeSplitGenerator( @@ -1220,16 +1225,11 @@ public void addReader(int subtaskId) { @Override public SourceEnumeratorState snapshotState(long checkpointId) { - List remainingHybridLakeFlussSplits = - // Preserve Fluss-only (non-lake) startup across restore. Otherwise a restored - // enumerator with a non-null lakeSource would treat null as "not initialized yet" - // and generate lake snapshot splits. - lakeSource == null ? Collections.emptyList() : pendingHybridLakeFlussSplits; final SourceEnumeratorState enumeratorState = new SourceEnumeratorState( assignedTableBuckets, assignedPartitions, - remainingHybridLakeFlussSplits, + pendingHybridLakeFlussSplits, leaseContext.getKvSnapshotLeaseId()); LOG.debug("Source Checkpoint is {}", enumeratorState); return enumeratorState; diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java index 7ccf4ac887..5a82e8d497 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java @@ -261,7 +261,6 @@ void testRestoreFlussOnlySourceWithLakeSourceDoesNotGenerateLakeSplits(@TempDir false); checkpointState = enumerator.snapshotState(1L); - assertThat(checkpointState.getRemainingHybridLakeFlussSplits()).isNotNull().isEmpty(); } try (MockSplitEnumeratorContext context =