Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public class DataSourceConfigKeys {
public static final String SNAPSHOT_SPLIT_KEY = "snapshot_split_key";
public static final String SNAPSHOT_PARALLELISM = "snapshot_parallelism";
public static final String SNAPSHOT_PARALLELISM_DEFAULT = "1";
public static final String SKIP_SNAPSHOT_BACKFILL = "skip_snapshot_backfill";
// MySQL CDC client identity. Single value "5400" or range "5400-5408".
public static final String SERVER_ID = "server_id";
public static final String SSL_MODE = "ssl_mode";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public class DataSourceConfigValidator {
DataSourceConfigKeys.EXCLUDE_TABLES,
DataSourceConfigKeys.SNAPSHOT_SPLIT_SIZE,
DataSourceConfigKeys.SNAPSHOT_PARALLELISM,
DataSourceConfigKeys.SKIP_SNAPSHOT_BACKFILL,
DataSourceConfigKeys.SSL_MODE,
DataSourceConfigKeys.SSL_ROOTCERT,
DataSourceConfigKeys.SLOT_NAME,
Expand Down Expand Up @@ -208,12 +209,21 @@ private static boolean isValidValue(String key, String value, String dataSourceT
|| key.equals(DataSourceConfigKeys.SNAPSHOT_PARALLELISM)) {
return isPositiveInt(value);
}
if (key.equals(DataSourceConfigKeys.SKIP_SNAPSHOT_BACKFILL)) {
return isValidBoolean(value);
}
if (key.equals(DataSourceConfigKeys.SERVER_ID)) {
return parseServerIdRange(value) != null;
}
return true;
}

// Strict boolean: only "true"/"false" (case-insensitive); Boolean.parseBoolean would
// silently coerce typos like "yes" to false.
public static boolean isValidBoolean(String value) {
return "true".equalsIgnoreCase(value) || "false".equalsIgnoreCase(value);
}

public static boolean isPositiveInt(String value) {
if (value == null) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,8 @@ private void checkRequiredSourceProperties() {
if (!sourceProperties.containsKey(DataSourceConfigKeys.OFFSET)) {
sourceProperties.put(DataSourceConfigKeys.OFFSET, DataSourceConfigKeys.OFFSET_LATEST);
}
// from-to is at-least-once; default-skip in-snapshot backfill.
sourceProperties.putIfAbsent(DataSourceConfigKeys.SKIP_SNAPSHOT_BACKFILL, "true");
}

private List<String> createTableIfNotExists() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ public class StreamingMultiTblTask extends AbstractStreamingTask {
private long loadBytes = 0L;
private long filteredRows = 0L;
private long loadedRows = 0L;
private long timeoutMs;
private long runningBackendId;

public StreamingMultiTblTask(Long jobId,
Expand All @@ -103,7 +102,6 @@ public StreamingMultiTblTask(Long jobId,
this.jobProperties = jobProperties;
this.targetDb = targetDb;
this.cloudCluster = cloudCluster;
this.timeoutMs = Config.streaming_task_timeout_multiplier * jobProperties.getMaxIntervalSecond() * 1000L;
}

@Override
Expand Down Expand Up @@ -327,6 +325,9 @@ public boolean isTimeout() {
// It's still pending, waiting for scheduling.
return false;
}
// Read multiplier live so config changes affect already-running tasks.
long timeoutMs = Config.streaming_task_timeout_multiplier
* jobProperties.getMaxIntervalSecond() * 1000L;
long elapsed = System.currentTimeMillis() - startTimeMs;
if (elapsed > timeoutMs) {
log.info("Task {} timeout detected: elapsed={}ms, timeoutMs={}ms", taskId, elapsed, timeoutMs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ private void validate(Map<String, String> properties) throws AnalysisException {
}
validatePositiveIntIfPresent(properties, DataSourceConfigKeys.SNAPSHOT_SPLIT_SIZE);
validatePositiveIntIfPresent(properties, DataSourceConfigKeys.SNAPSHOT_PARALLELISM);
validateBooleanIfPresent(properties, DataSourceConfigKeys.SKIP_SNAPSHOT_BACKFILL);
// TVF entrypoint shares server_id checks with the from-to path's validateSource.
try {
DataSourceConfigValidator.validateServerIdConfig(properties);
Expand Down Expand Up @@ -186,6 +187,17 @@ private static void validatePgIdentifierIfPresent(Map<String, String> properties
}
}

private static void validateBooleanIfPresent(Map<String, String> properties, String key)
throws AnalysisException {
String value = properties.get(key);
if (value == null) {
return;
}
if (!DataSourceConfigValidator.isValidBoolean(value)) {
throw new AnalysisException("Invalid value for key '" + key + "': " + value);
}
}

private void generateFileStatus() {
this.fileStatuses.clear();
this.fileStatuses.add(new TBrokerFileStatus(URI, false, Integer.MAX_VALUE, false));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ private void buildStreamRecords(
long elapsedTime = System.currentTimeMillis() - startTime;
boolean timeoutReached = elapsedTime > Constants.POLL_SPLIT_RECORDS_TIMEOUTS;
if (shouldStop(
sourceReader,
isSnapshotSplit,
hasReceivedData,
lastMessageIsHeartbeat,
Expand Down Expand Up @@ -315,6 +316,7 @@ private RecordWithMeta buildRecordResponse(
boolean timeoutReached = elapsedTime > Constants.POLL_SPLIT_RECORDS_TIMEOUTS;

if (shouldStop(
sourceReader,
isSnapshotSplit,
hasReceivedData,
lastMessageIsHeartbeat,
Expand Down Expand Up @@ -484,6 +486,7 @@ public void writeRecords(WriteRecordRequest writeRecordRequest) throws Exception
&& elapsedTime >= maxIntervalMillis;

if (shouldStop(
sourceReader,
isSnapshotSplit,
scannedRows > 0,
lastMessageIsHeartbeat,
Expand Down Expand Up @@ -615,18 +618,21 @@ public static boolean isHeartbeatEvent(SourceRecord record) {
* @return true if should stop, false if should continue
*/
private boolean shouldStop(
SourceReader sourceReader,
boolean isSnapshotSplit,
boolean hasData,
boolean lastMessageIsHeartbeat,
long elapsedTime,
long maxIntervalMillis,
boolean timeoutReached) {

// 1. Snapshot split with data: if no more data in queue, stop immediately (no need to wait
// for timeout)
// snapshot split will be written to the debezium queue all at once.
// multiple snapshot splits are handled in the source reader.
// Snapshot split: wait until every split has received its high-watermark event;
// an empty poll alone is not a finish signal under pollWithoutBuffer where the
// fetcher returns one ChangeEventQueue batch at a time.
if (isSnapshotSplit) {
if (!sourceReader.isSnapshotFinished()) {
return false;
}
LOG.info(
"Snapshot split finished, no more data available. Total elapsed: {} ms",
elapsedTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ public void load(String label, BatchRecordBuffer buffer) throws IOException {
OBJECT_MAPPER.readValue(loadResult, RespContent.class);
if (DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
long cacheByteBeforeFlush =
currentCacheBytes.getAndAdd(-respContent.getLoadBytes());
currentCacheBytes.getAndAdd(-buffer.getBufferSizeBytes());
LOG.info(
"load success, cacheBeforeFlushBytes: {}, currentCacheBytes : {}",
cacheByteBeforeFlush,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,9 @@ private Iterator<SourceRecord> pollRecordsFromSnapshotReaders() throws Exception
return Collections.emptyIterator();
}

// A split is finished only after its high-watermark event has been consumed.
refreshCompletedSplits();

if (completedSplitIds.size() >= snapshotReaderContexts.size()) {
LOG.info("All {} snapshot splits have been completed", snapshotReaderContexts.size());
return Collections.emptyIterator();
Expand Down Expand Up @@ -515,6 +518,11 @@ private void startParallelPolling() {
Fetcher<SourceRecords, SourceSplitBase>,
SnapshotSplitState>
context = snapshotReaderContexts.get(index);
// Skip splits already drained to high-watermark; otherwise their poll futures spin
// returning null and starve siblings.
if (completedSplitIds.contains(context.getSplit().splitId())) {
continue;
}

CompletableFuture<PollResult> future =
CompletableFuture.supplyAsync(
Expand Down Expand Up @@ -569,11 +577,12 @@ private PollResult waitForAnyCompletion() throws Exception {
snapshot.remove(future);
PollResult result = future.get();
if (result != null) {
// Split completion is determined later by splitState.getHighWatermark()
// != null, not by receiving a non-empty batch.
LOG.info(
"Got result from reader {}, {} futures remaining",
result.context.getSplit().splitId(),
snapshot.size());
completedSplitIds.add(result.context.getSplit().splitId());
return result;
}
// If result is null (no data), continue checking other futures
Expand Down Expand Up @@ -839,6 +848,35 @@ public Map<String, String> extractSnapshotStateOffset(Object splitState) {
return offsetRes;
}

@Override
public boolean isSnapshotFinished() {
if (snapshotReaderContexts.isEmpty()) {
return true;
}
for (SnapshotReaderContext<
org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit,
Fetcher<SourceRecords, SourceSplitBase>,
SnapshotSplitState>
context : snapshotReaderContexts) {
if (context.getSplitState().getHighWatermark() == null) {
return false;
}
}
return true;
}

private void refreshCompletedSplits() {
for (SnapshotReaderContext<
org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit,
Fetcher<SourceRecords, SourceSplitBase>,
SnapshotSplitState>
context : snapshotReaderContexts) {
if (context.getSplitState().getHighWatermark() != null) {
completedSplitIds.add(context.getSplit().splitId());
}
}
}

@Override
public Map<String, String> extractBinlogStateOffset(Object splitState) {
Preconditions.checkNotNull(splitState, "splitState is null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,4 +98,9 @@ default String serializeTableSchemas() {
* indicate how far the source TX log can be discarded.
*/
default void commitSourceOffset(String jobId, SourceSplit sourceSplit) {}

/** Whether all snapshot splits have received their high-watermark event. */
default boolean isSnapshotFinished() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,9 @@ private Iterator<SourceRecord> pollRecordsFromSnapshotReaders() throws Exception
return Collections.emptyIterator();
}

// A split is finished only after its high-watermark event has been consumed.
refreshCompletedSplits();

if (completedSplitIds.size() >= snapshotReaderContexts.size()) {
LOG.info("All {} snapshot splits have been completed", snapshotReaderContexts.size());
return Collections.emptyIterator();
Expand Down Expand Up @@ -533,6 +536,11 @@ private void startParallelPolling() {
final int index = i;
SnapshotReaderContext<MySqlSnapshotSplit, SnapshotSplitReader, MySqlSnapshotSplitState>
context = snapshotReaderContexts.get(index);
// Skip splits already drained to high-watermark; otherwise their poll futures spin
// returning null and starve siblings.
if (completedSplitIds.contains(context.getSplit().splitId())) {
continue;
}

CompletableFuture<PollResult> future =
CompletableFuture.supplyAsync(
Expand Down Expand Up @@ -587,11 +595,12 @@ private PollResult waitForAnyCompletion() throws Exception {
snapshot.remove(future);
PollResult result = future.get();
if (result != null) {
// Split completion is determined later by splitState.getHighWatermark()
// != null, not by receiving a non-empty batch.
LOG.info(
"Got result from reader {}, {} futures remaining",
result.context.getSplit().splitId(),
snapshot.size());
completedSplitIds.add(result.context.getSplit().splitId());
return result;
}
// If result is null (no data), continue checking other futures
Expand Down Expand Up @@ -992,6 +1001,10 @@ private MySqlSourceConfig generateMySqlConfig(
objectPath, cdcConfig.get(DataSourceConfigKeys.SNAPSHOT_SPLIT_KEY));
}

// FE injects "true" on TVF path; from-to leaves it absent → default false.
configFactory.skipSnapshotBackfill(
Boolean.parseBoolean(cdcConfig.get(DataSourceConfigKeys.SKIP_SNAPSHOT_BACKFILL)));

return configFactory.createConfig(subtaskId);
}

Expand All @@ -1013,6 +1026,29 @@ public Map<String, String> extractSnapshotStateOffset(Object splitState) {
return new HashMap<>(highWatermark.getOffset());
}

@Override
public boolean isSnapshotFinished() {
if (snapshotReaderContexts.isEmpty()) {
return true;
}
for (SnapshotReaderContext<MySqlSnapshotSplit, SnapshotSplitReader, MySqlSnapshotSplitState>
context : snapshotReaderContexts) {
if (context.getSplitState().getHighWatermark() == null) {
return false;
}
}
return true;
}

private void refreshCompletedSplits() {
for (SnapshotReaderContext<MySqlSnapshotSplit, SnapshotSplitReader, MySqlSnapshotSplitState>
context : snapshotReaderContexts) {
if (context.getSplitState().getHighWatermark() != null) {
completedSplitIds.add(context.getSplit().splitId());
}
}
}

@Override
public Map<String, String> extractBinlogStateOffset(Object splitState) {
Preconditions.checkNotNull(splitState, "splitState is null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,10 @@ private PostgresSourceConfig generatePostgresConfig(
// support scan partition table
configFactory.setIncludePartitionedTables(true);

// FE injects "true" on TVF path; from-to leaves it absent → default false.
configFactory.skipSnapshotBackfill(
Boolean.parseBoolean(cdcConfig.get(DataSourceConfigKeys.SKIP_SNAPSHOT_BACKFILL)));

// subtaskId use pg create slot in snapshot phase, slotname is slot_name_subtaskId
return configFactory.create(subtaskId);
}
Expand Down
Loading
Loading