[flink] Preserve log-only source restore mode#3355
Conversation
cdabb47 to
71b01aa
Compare
71b01aa to
a01bd57
Compare
|
@loserwang1024 Could you please help review |
| // Preserve Fluss-only (non-lake) startup across restore. Otherwise a restored | ||
| // enumerator with a non-null lakeSource would treat null as "not initialized yet" | ||
| // and generate lake snapshot splits. | ||
| lakeSource == null ? Collections.emptyList() : pendingHybridLakeFlussSplits; |
There was a problem hiding this comment.
I understand what you mean, but the code seems hard to understand without sufficient context. I've also thought about this:
-
If the enumerator is created by FlinkSource#createEnumerator, it indicates a stateless restart. Therefore, whether to generate lake splits depends on whether it's a LakeSource.
-
If the enumerator is created by FlinkSource#restoreEnumerator, there's no need to generate lake splits again. This is because before the first checkpoint is taken, FlinkSourceEnumerator#start → FlinkSourceEnumerator#generateHybridLakeFlussSplits has already been executed. Thus, upon restoration, the lake splits do not need to be regenerated.
Therefore, even if the job was previously started from a specified timestamp, according to this logic, as long as a checkpoint has been taken, upon stateful restart it will not read the lake splits again.
There was a problem hiding this comment.
I agree the original approach in snapshotState() is hard to understand without sufficient context.
I've reworked the fix to use the checkpointTriggeredBefore flag in generateHybridLakeFlussSplits() instead. While the restore-awareness logic still requires some thought, this approach keeps all the complexity contained within a single method rather than spreading it across snapshotState().
Additionally, I changed startInStreamModeForNonPartitionedTable to call generateHybridLakeFlussSplits() synchronously, consistent with the partitioned-table path in start(). This ensures lake split initialization always completes before any checkpoint can be triggered, which is a prerequisite for the checkpointTriggeredBefore guard to work correctly.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
@loserwang1024 Comments has been addressed. |
Purpose
Linked issue: close #3354
Preserve the original Fluss-only (non-lake) startup semantics after a Flink source restore. When the source starts from a specified Fluss log position, it should not initialize lake snapshot splits after restore just because
lakeSourceis available again.Brief change log
remainingHybridLakeFlussSplitslist when the enumerator checkpoints withlakeSource == null.lakeSource, while still requiringlakeSourcefor non-empty hybrid split state.pendingHybridLakeFlussSplitssonull, empty, and non-empty states are explicit.lakeSourceand a registered lake snapshot, and verifies noLakeSnapshotSplitis generated.lakeSource == null.Tests
./mvnw -pl fluss-flink/fluss-flink-common -Dtest=SourceEnumeratorStateSerializerTest,FlinkSourceEnumeratorTest#testRestoreFlussOnlySourceWithLakeSourceDoesNotGenerateLakeSplits test./mvnw -pl fluss-flink/fluss-flink-common spotless:applygit diff --checkAPI and Format
No public API, storage format, or checkpoint serializer version change. The fix only changes the value stored in the existing enumerator state field for Fluss-only source checkpoints and permits that empty-state sentinel to deserialize without a lake source.
Documentation
No new feature or user-facing configuration change.
Generative AI disclosure