From 7f7363cf8fdd3386255854554c646f759293ae62 Mon Sep 17 00:00:00 2001 From: wudi Date: Fri, 29 May 2026 11:39:11 +0800 Subject: [PATCH] [improve](streaming-job) avoid potential OOM when reading large snapshot splits (#63833) ## Summary - Default-skip flink-cdc's in-snapshot backfill on the from-to path so large splits no longer accumulate the entire chunk + backfill stream in the fetcher's outputBuffer; from-to is at-least-once and tolerates the duplicates this introduces. TVF (job-driven and standalone) keeps the standard `false` default for exactly-once via per-task offset commit. - Expose `skip_snapshot_backfill` as a user-facing property with strict `true`/`false` validation on both from-to (CREATE JOB) and TVF (SELECT FROM cdc_stream(...)) entry points. - Fix snapshot completion under `pollWithoutBuffer`: a split is now marked complete only after its high-watermark event has been consumed (`splitState.getHighWatermark() != null`), not on the first non-empty fetcher batch. Without this, enabling the new default truncates any split larger than debezium's `max.batch.size` and yields an NPE on offset extraction. - Read `streaming_task_timeout_multiplier` live in `StreamingMultiTblTask.isTimeout()` so `admin set frontend config` affects already-running tasks, matching the `@ConfField(mutable=true)` contract. --- .../doris/job/cdc/DataSourceConfigKeys.java | 1 + .../streaming/DataSourceConfigValidator.java | 10 ++ .../insert/streaming/StreamingInsertJob.java | 2 + .../streaming/StreamingMultiTblTask.java | 5 +- .../CdcStreamTableValuedFunction.java | 12 ++ .../service/PipelineCoordinator.java | 14 +- .../cdcclient/sink/DorisBatchStreamLoad.java | 2 +- .../reader/JdbcIncrementalSourceReader.java | 40 ++++- .../cdcclient/source/reader/SourceReader.java | 5 + .../reader/mysql/MySqlSourceReader.java | 38 ++++- .../reader/postgres/PostgresSourceReader.java | 4 + ...eaming_mysql_job_snapshot_fat_split.groovy | 153 ++++++++++++++++++ ...ing_postgres_job_snapshot_fat_split.groovy | 141 ++++++++++++++++ 13 files changed, 418 insertions(+), 9 deletions(-) create mode 100644 regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_snapshot_fat_split.groovy create mode 100644 regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_snapshot_fat_split.groovy diff --git a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java index 3708e8dc6a3eb1..75617e4a907ed9 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java +++ b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java @@ -38,6 +38,7 @@ public class DataSourceConfigKeys { public static final String SNAPSHOT_SPLIT_KEY = "snapshot_split_key"; public static final String SNAPSHOT_PARALLELISM = "snapshot_parallelism"; public static final String SNAPSHOT_PARALLELISM_DEFAULT = "1"; + public static final String SKIP_SNAPSHOT_BACKFILL = "skip_snapshot_backfill"; // MySQL CDC client identity. Single value "5400" or range "5400-5408". public static final String SERVER_ID = "server_id"; public static final String SSL_MODE = "ssl_mode"; diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java index 4ca1e605ef5c67..1c1dc10c0d5051 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java @@ -50,6 +50,7 @@ public class DataSourceConfigValidator { DataSourceConfigKeys.EXCLUDE_TABLES, DataSourceConfigKeys.SNAPSHOT_SPLIT_SIZE, DataSourceConfigKeys.SNAPSHOT_PARALLELISM, + DataSourceConfigKeys.SKIP_SNAPSHOT_BACKFILL, DataSourceConfigKeys.SSL_MODE, DataSourceConfigKeys.SSL_ROOTCERT, DataSourceConfigKeys.SLOT_NAME, @@ -208,12 +209,21 @@ private static boolean isValidValue(String key, String value, String dataSourceT || key.equals(DataSourceConfigKeys.SNAPSHOT_PARALLELISM)) { return isPositiveInt(value); } + if (key.equals(DataSourceConfigKeys.SKIP_SNAPSHOT_BACKFILL)) { + return isValidBoolean(value); + } if (key.equals(DataSourceConfigKeys.SERVER_ID)) { return parseServerIdRange(value) != null; } return true; } + // Strict boolean: only "true"/"false" (case-insensitive); Boolean.parseBoolean would + // silently coerce typos like "yes" to false. + public static boolean isValidBoolean(String value) { + return "true".equalsIgnoreCase(value) || "false".equalsIgnoreCase(value); + } + public static boolean isPositiveInt(String value) { if (value == null) { return false; diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java index 55c761a5d426fa..09090ba3ad184b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java @@ -276,6 +276,8 @@ private void checkRequiredSourceProperties() { if (!sourceProperties.containsKey(DataSourceConfigKeys.OFFSET)) { sourceProperties.put(DataSourceConfigKeys.OFFSET, DataSourceConfigKeys.OFFSET_LATEST); } + // from-to is at-least-once; default-skip in-snapshot backfill. + sourceProperties.putIfAbsent(DataSourceConfigKeys.SKIP_SNAPSHOT_BACKFILL, "true"); } private List createTableIfNotExists() throws Exception { diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java index 1d3fdf66f04002..c257613749d6fc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java @@ -82,7 +82,6 @@ public class StreamingMultiTblTask extends AbstractStreamingTask { private long loadBytes = 0L; private long filteredRows = 0L; private long loadedRows = 0L; - private long timeoutMs; private long runningBackendId; public StreamingMultiTblTask(Long jobId, @@ -103,7 +102,6 @@ public StreamingMultiTblTask(Long jobId, this.jobProperties = jobProperties; this.targetDb = targetDb; this.cloudCluster = cloudCluster; - this.timeoutMs = Config.streaming_task_timeout_multiplier * jobProperties.getMaxIntervalSecond() * 1000L; } @Override @@ -327,6 +325,9 @@ public boolean isTimeout() { // It's still pending, waiting for scheduling. return false; } + // Read multiplier live so config changes affect already-running tasks. + long timeoutMs = Config.streaming_task_timeout_multiplier + * jobProperties.getMaxIntervalSecond() * 1000L; long elapsed = System.currentTimeMillis() - startTimeMs; if (elapsed > timeoutMs) { log.info("Task {} timeout detected: elapsed={}ms, timeoutMs={}ms", taskId, elapsed, timeoutMs); diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java index aa60924601c586..ae3c815789fd38 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java @@ -156,6 +156,7 @@ private void validate(Map properties) throws AnalysisException { } validatePositiveIntIfPresent(properties, DataSourceConfigKeys.SNAPSHOT_SPLIT_SIZE); validatePositiveIntIfPresent(properties, DataSourceConfigKeys.SNAPSHOT_PARALLELISM); + validateBooleanIfPresent(properties, DataSourceConfigKeys.SKIP_SNAPSHOT_BACKFILL); // TVF entrypoint shares server_id checks with the from-to path's validateSource. try { DataSourceConfigValidator.validateServerIdConfig(properties); @@ -186,6 +187,17 @@ private static void validatePgIdentifierIfPresent(Map properties } } + private static void validateBooleanIfPresent(Map properties, String key) + throws AnalysisException { + String value = properties.get(key); + if (value == null) { + return; + } + if (!DataSourceConfigValidator.isValidBoolean(value)) { + throw new AnalysisException("Invalid value for key '" + key + "': " + value); + } + } + private void generateFileStatus() { this.fileStatuses.clear(); this.fileStatuses.add(new TBrokerFileStatus(URI, false, Integer.MAX_VALUE, false)); diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java index 9b2dd5d357fdb6..18bd9fcc211974 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java @@ -168,6 +168,7 @@ private void buildStreamRecords( long elapsedTime = System.currentTimeMillis() - startTime; boolean timeoutReached = elapsedTime > Constants.POLL_SPLIT_RECORDS_TIMEOUTS; if (shouldStop( + sourceReader, isSnapshotSplit, hasReceivedData, lastMessageIsHeartbeat, @@ -315,6 +316,7 @@ private RecordWithMeta buildRecordResponse( boolean timeoutReached = elapsedTime > Constants.POLL_SPLIT_RECORDS_TIMEOUTS; if (shouldStop( + sourceReader, isSnapshotSplit, hasReceivedData, lastMessageIsHeartbeat, @@ -484,6 +486,7 @@ public void writeRecords(WriteRecordRequest writeRecordRequest) throws Exception && elapsedTime >= maxIntervalMillis; if (shouldStop( + sourceReader, isSnapshotSplit, scannedRows > 0, lastMessageIsHeartbeat, @@ -615,6 +618,7 @@ public static boolean isHeartbeatEvent(SourceRecord record) { * @return true if should stop, false if should continue */ private boolean shouldStop( + SourceReader sourceReader, boolean isSnapshotSplit, boolean hasData, boolean lastMessageIsHeartbeat, @@ -622,11 +626,13 @@ private boolean shouldStop( long maxIntervalMillis, boolean timeoutReached) { - // 1. Snapshot split with data: if no more data in queue, stop immediately (no need to wait - // for timeout) - // snapshot split will be written to the debezium queue all at once. - // multiple snapshot splits are handled in the source reader. + // Snapshot split: wait until every split has received its high-watermark event; + // an empty poll alone is not a finish signal under pollWithoutBuffer where the + // fetcher returns one ChangeEventQueue batch at a time. if (isSnapshotSplit) { + if (!sourceReader.isSnapshotFinished()) { + return false; + } LOG.info( "Snapshot split finished, no more data available. Total elapsed: {} ms", elapsedTime); diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java index 4583b049b813d6..d1aa7ccb38f37e 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java @@ -417,7 +417,7 @@ public void load(String label, BatchRecordBuffer buffer) throws IOException { OBJECT_MAPPER.readValue(loadResult, RespContent.class); if (DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) { long cacheByteBeforeFlush = - currentCacheBytes.getAndAdd(-respContent.getLoadBytes()); + currentCacheBytes.getAndAdd(-buffer.getBufferSizeBytes()); LOG.info( "load success, cacheBeforeFlushBytes: {}, currentCacheBytes : {}", cacheByteBeforeFlush, diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java index ddbc71c7fd7212..f0987eb97eec05 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java @@ -468,6 +468,9 @@ private Iterator pollRecordsFromSnapshotReaders() throws Exception return Collections.emptyIterator(); } + // A split is finished only after its high-watermark event has been consumed. + refreshCompletedSplits(); + if (completedSplitIds.size() >= snapshotReaderContexts.size()) { LOG.info("All {} snapshot splits have been completed", snapshotReaderContexts.size()); return Collections.emptyIterator(); @@ -515,6 +518,11 @@ private void startParallelPolling() { Fetcher, SnapshotSplitState> context = snapshotReaderContexts.get(index); + // Skip splits already drained to high-watermark; otherwise their poll futures spin + // returning null and starve siblings. + if (completedSplitIds.contains(context.getSplit().splitId())) { + continue; + } CompletableFuture future = CompletableFuture.supplyAsync( @@ -569,11 +577,12 @@ private PollResult waitForAnyCompletion() throws Exception { snapshot.remove(future); PollResult result = future.get(); if (result != null) { + // Split completion is determined later by splitState.getHighWatermark() + // != null, not by receiving a non-empty batch. LOG.info( "Got result from reader {}, {} futures remaining", result.context.getSplit().splitId(), snapshot.size()); - completedSplitIds.add(result.context.getSplit().splitId()); return result; } // If result is null (no data), continue checking other futures @@ -839,6 +848,35 @@ public Map extractSnapshotStateOffset(Object splitState) { return offsetRes; } + @Override + public boolean isSnapshotFinished() { + if (snapshotReaderContexts.isEmpty()) { + return true; + } + for (SnapshotReaderContext< + org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit, + Fetcher, + SnapshotSplitState> + context : snapshotReaderContexts) { + if (context.getSplitState().getHighWatermark() == null) { + return false; + } + } + return true; + } + + private void refreshCompletedSplits() { + for (SnapshotReaderContext< + org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit, + Fetcher, + SnapshotSplitState> + context : snapshotReaderContexts) { + if (context.getSplitState().getHighWatermark() != null) { + completedSplitIds.add(context.getSplit().splitId()); + } + } + } + @Override public Map extractBinlogStateOffset(Object splitState) { Preconditions.checkNotNull(splitState, "splitState is null"); diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/SourceReader.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/SourceReader.java index 95eeb0526810c3..b41f66f89fbdda 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/SourceReader.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/SourceReader.java @@ -98,4 +98,9 @@ default String serializeTableSchemas() { * indicate how far the source TX log can be discarded. */ default void commitSourceOffset(String jobId, SourceSplit sourceSplit) {} + + /** Whether all snapshot splits have received their high-watermark event. */ + default boolean isSnapshotFinished() { + return true; + } } diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java index e5115d1c51a393..1bc7db23fd4322 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java @@ -489,6 +489,9 @@ private Iterator pollRecordsFromSnapshotReaders() throws Exception return Collections.emptyIterator(); } + // A split is finished only after its high-watermark event has been consumed. + refreshCompletedSplits(); + if (completedSplitIds.size() >= snapshotReaderContexts.size()) { LOG.info("All {} snapshot splits have been completed", snapshotReaderContexts.size()); return Collections.emptyIterator(); @@ -533,6 +536,11 @@ private void startParallelPolling() { final int index = i; SnapshotReaderContext context = snapshotReaderContexts.get(index); + // Skip splits already drained to high-watermark; otherwise their poll futures spin + // returning null and starve siblings. + if (completedSplitIds.contains(context.getSplit().splitId())) { + continue; + } CompletableFuture future = CompletableFuture.supplyAsync( @@ -587,11 +595,12 @@ private PollResult waitForAnyCompletion() throws Exception { snapshot.remove(future); PollResult result = future.get(); if (result != null) { + // Split completion is determined later by splitState.getHighWatermark() + // != null, not by receiving a non-empty batch. LOG.info( "Got result from reader {}, {} futures remaining", result.context.getSplit().splitId(), snapshot.size()); - completedSplitIds.add(result.context.getSplit().splitId()); return result; } // If result is null (no data), continue checking other futures @@ -992,6 +1001,10 @@ private MySqlSourceConfig generateMySqlConfig( objectPath, cdcConfig.get(DataSourceConfigKeys.SNAPSHOT_SPLIT_KEY)); } + // FE injects "true" on TVF path; from-to leaves it absent → default false. + configFactory.skipSnapshotBackfill( + Boolean.parseBoolean(cdcConfig.get(DataSourceConfigKeys.SKIP_SNAPSHOT_BACKFILL))); + return configFactory.createConfig(subtaskId); } @@ -1013,6 +1026,29 @@ public Map extractSnapshotStateOffset(Object splitState) { return new HashMap<>(highWatermark.getOffset()); } + @Override + public boolean isSnapshotFinished() { + if (snapshotReaderContexts.isEmpty()) { + return true; + } + for (SnapshotReaderContext + context : snapshotReaderContexts) { + if (context.getSplitState().getHighWatermark() == null) { + return false; + } + } + return true; + } + + private void refreshCompletedSplits() { + for (SnapshotReaderContext + context : snapshotReaderContexts) { + if (context.getSplitState().getHighWatermark() != null) { + completedSplitIds.add(context.getSplit().splitId()); + } + } + } + @Override public Map extractBinlogStateOffset(Object splitState) { Preconditions.checkNotNull(splitState, "splitState is null"); diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java index 8bf53a1eb97ea1..2e09e48957a615 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java @@ -279,6 +279,10 @@ private PostgresSourceConfig generatePostgresConfig( // support scan partition table configFactory.setIncludePartitionedTables(true); + // FE injects "true" on TVF path; from-to leaves it absent → default false. + configFactory.skipSnapshotBackfill( + Boolean.parseBoolean(cdcConfig.get(DataSourceConfigKeys.SKIP_SNAPSHOT_BACKFILL))); + // subtaskId use pg create slot in snapshot phase, slotname is slot_name_subtaskId return configFactory.create(subtaskId); } diff --git a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_snapshot_fat_split.groovy b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_snapshot_fat_split.groovy new file mode 100644 index 00000000000000..c941a5a0afc6a9 --- /dev/null +++ b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_snapshot_fat_split.groovy @@ -0,0 +1,153 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +import org.awaitility.Awaitility + +import static java.util.concurrent.TimeUnit.SECONDS + +// Regression for the snapshot completion bug: a single split whose row count exceeds +// debezium's default max.batch.size=2048 must still be drained completely. The fix +// keys completion off the high-watermark event instead of the first non-empty batch. +// 2100 rows / split_size=3000 -> one snapshot split that the fetcher needs at least +// two batches to drain (2048 + 52 + hw event). +suite("test_streaming_mysql_job_snapshot_fat_split", "p0,external,mysql,external_docker,external_docker_mysql,nondatalake") { + def jobName = "test_streaming_mysql_job_snapshot_fat_split_name" + def currentDb = (sql "select database()")[0][0] + def table1 = "streaming_fat_split_mysql" + def mysqlDb = "test_cdc_db" + + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + sql """drop table if exists ${currentDb}.${table1} force""" + + String enabled = context.config.otherConfigs.get("enableJdbcTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + String mysql_port = context.config.otherConfigs.get("mysql_57_port"); + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String s3_endpoint = getS3Endpoint() + String bucket = getS3BucketName() + String driver_url = "https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-j-8.4.0.jar" + + // ===== Prepare MySQL side: 2100 rows so a single split spans > 1 fetcher batch ===== + connect("root", "123456", "jdbc:mysql://${externalEnvIp}:${mysql_port}") { + sql """CREATE DATABASE IF NOT EXISTS ${mysqlDb}""" + sql """DROP TABLE IF EXISTS ${mysqlDb}.${table1}""" + sql """CREATE TABLE ${mysqlDb}.${table1} ( + `id` bigint NOT NULL, + `payload` varchar(32), + `version` int, + PRIMARY KEY (`id`) + ) ENGINE=InnoDB""" + + // Bulk insert in 500-row chunks to stay under MySQL's default max_allowed_packet. + int total = 2100 + int chunk = 500 + int sent = 0 + while (sent < total) { + int end = Math.min(sent + chunk, total) + StringBuilder sb = new StringBuilder() + sb.append("INSERT INTO ${mysqlDb}.${table1} (id, payload, version) VALUES ") + for (int i = sent + 1; i <= end; i++) { + if (i > sent + 1) sb.append(", ") + sb.append("(${i}, 'snap', 0)") + } + sql sb.toString() + sent = end + } + } + + sql """CREATE JOB ${jobName} + ON STREAMING + FROM MYSQL ( + "jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}?serverTimezone=UTC", + "driver_url" = "${driver_url}", + "driver_class" = "com.mysql.cj.jdbc.Driver", + "user" = "root", + "password" = "123456", + "database" = "${mysqlDb}", + "include_tables" = "${table1}", + "offset" = "initial", + "snapshot_split_size" = "3000" + ) + TO DATABASE ${currentDb} ( + "table.create.properties.replication_num" = "1" + ) + """ + + try { + Awaitility.await().atMost(300, SECONDS) + .pollInterval(2, SECONDS).until( + { + def cnt = sql """select count(1) from ${currentDb}.${table1}""" + log.info("snapshot row count: " + cnt) + cnt.get(0).get(0) == 2100 + } + ) + } catch (Exception ex) { + def showjob = sql """select * from jobs("type"="insert") where Name='${jobName}'""" + def showtask = sql """select * from tasks("type"="insert") where JobName='${jobName}'""" + log.info("show job: " + showjob) + log.info("show task: " + showtask) + throw ex + } + + def distinctCount = sql """SELECT COUNT(DISTINCT id) FROM ${currentDb}.${table1}""" + assert distinctCount.get(0).get(0) == 2100 + def boundary = sql """SELECT MIN(id), MAX(id) FROM ${currentDb}.${table1}""" + assert boundary.get(0).get(0) == 1 + assert boundary.get(0).get(1) == 2100 + // Specifically assert rows past the first batch (id > 2048) are present. + def tail = sql """SELECT COUNT(1) FROM ${currentDb}.${table1} WHERE id BETWEEN 2049 AND 2100""" + assert tail.get(0).get(0) == 52 + + // ===== Incremental phase: verify post-snapshot DML still flows ===== + connect("root", "123456", "jdbc:mysql://${externalEnvIp}:${mysql_port}") { + sql """INSERT INTO ${mysqlDb}.${table1} (id, payload, version) VALUES (3000, 'incr_ins', 1)""" + sql """UPDATE ${mysqlDb}.${table1} SET version=99 WHERE id=2100""" + sql """DELETE FROM ${mysqlDb}.${table1} WHERE id=1""" + } + + try { + Awaitility.await().atMost(180, SECONDS) + .pollInterval(2, SECONDS).until( + { + def cnt = sql """select count(1) from ${currentDb}.${table1}""" + def upd = sql """select version from ${currentDb}.${table1} where id=2100""" + def ins = sql """select count(1) from ${currentDb}.${table1} where id=3000""" + def del = sql """select count(1) from ${currentDb}.${table1} where id=1""" + def v = upd.size() == 0 ? null : upd.get(0).get(0) + log.info("incr cnt=${cnt} id2100.version=${v} id3000.exists=${ins} id1.exists=${del}") + cnt.get(0).get(0) == 2100 && + v != null && v.toString() == '99' && + ins.get(0).get(0) == 1 && + del.get(0).get(0) == 0 + } + ) + } catch (Exception ex) { + def showjob = sql """select * from jobs("type"="insert") where Name='${jobName}'""" + def showtask = sql """select * from tasks("type"="insert") where JobName='${jobName}'""" + log.info("show job (incr): " + showjob) + log.info("show task (incr): " + showtask) + throw ex + } + + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + + def jobCountRsp = sql """select count(1) from jobs("type"="insert") where Name ='${jobName}'""" + assert jobCountRsp.get(0).get(0) == 0 + } +} diff --git a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_snapshot_fat_split.groovy b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_snapshot_fat_split.groovy new file mode 100644 index 00000000000000..117e4495453ac0 --- /dev/null +++ b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_snapshot_fat_split.groovy @@ -0,0 +1,141 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +import org.awaitility.Awaitility + +import static java.util.concurrent.TimeUnit.SECONDS + +// Regression for the snapshot completion bug: a single split whose row count exceeds +// debezium's default max.batch.size=2048 must still be drained completely. The fix +// keys completion off the high-watermark event instead of the first non-empty batch. +// 2100 rows / split_size=3000 -> one snapshot split that the fetcher needs at least +// two batches to drain (2048 + 52 + hw event). +suite("test_streaming_postgres_job_snapshot_fat_split", "p0,external,pg,external_docker,external_docker_pg,nondatalake") { + def jobName = "test_streaming_postgres_job_snapshot_fat_split_name" + def currentDb = (sql "select database()")[0][0] + def table1 = "streaming_fat_split_pg" + def pgDB = "postgres" + def pgSchema = "cdc_test" + def pgUser = "postgres" + def pgPassword = "123456" + + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + sql """drop table if exists ${currentDb}.${table1} force""" + + String enabled = context.config.otherConfigs.get("enableJdbcTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + String pg_port = context.config.otherConfigs.get("pg_14_port"); + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String s3_endpoint = getS3Endpoint() + String bucket = getS3BucketName() + String driver_url = "https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar" + + // ===== Prepare PG side: 2100 rows so a single split spans > 1 fetcher batch ===== + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}""" + sql """create table ${pgDB}.${pgSchema}.${table1} ( + id bigint PRIMARY KEY, + payload varchar(32), + version integer + )""" + sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} (id, payload, version) + SELECT g, 'snap', 0 FROM generate_series(1, 2100) g""" + } + + sql """CREATE JOB ${jobName} + ON STREAMING + FROM POSTGRES ( + "jdbc_url" = "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}", + "driver_url" = "${driver_url}", + "driver_class" = "org.postgresql.Driver", + "user" = "${pgUser}", + "password" = "${pgPassword}", + "database" = "${pgDB}", + "schema" = "${pgSchema}", + "include_tables" = "${table1}", + "offset" = "initial", + "snapshot_split_size" = "3000" + ) + TO DATABASE ${currentDb} ( + "table.create.properties.replication_num" = "1" + ) + """ + + try { + Awaitility.await().atMost(300, SECONDS) + .pollInterval(2, SECONDS).until( + { + def cnt = sql """select count(1) from ${currentDb}.${table1}""" + log.info("snapshot row count: " + cnt) + cnt.get(0).get(0) == 2100 + } + ) + } catch (Exception ex) { + def showjob = sql """select * from jobs("type"="insert") where Name='${jobName}'""" + def showtask = sql """select * from tasks("type"="insert") where JobName='${jobName}'""" + log.info("show job: " + showjob) + log.info("show task: " + showtask) + throw ex + } + + def distinctCount = sql """SELECT COUNT(DISTINCT id) FROM ${currentDb}.${table1}""" + assert distinctCount.get(0).get(0) == 2100 + def boundary = sql """SELECT MIN(id), MAX(id) FROM ${currentDb}.${table1}""" + assert boundary.get(0).get(0) == 1 + assert boundary.get(0).get(1) == 2100 + // Specifically assert rows past the first batch (id > 2048) are present. + def tail = sql """SELECT COUNT(1) FROM ${currentDb}.${table1} WHERE id BETWEEN 2049 AND 2100""" + assert tail.get(0).get(0) == 52 + + // ===== Incremental phase: verify post-snapshot DML still flows ===== + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} (id, payload, version) VALUES (3000, 'incr_ins', 1)""" + sql """UPDATE ${pgDB}.${pgSchema}.${table1} SET version=99 WHERE id=2100""" + sql """DELETE FROM ${pgDB}.${pgSchema}.${table1} WHERE id=1""" + } + + try { + Awaitility.await().atMost(180, SECONDS) + .pollInterval(2, SECONDS).until( + { + def cnt = sql """select count(1) from ${currentDb}.${table1}""" + def upd = sql """select version from ${currentDb}.${table1} where id=2100""" + def ins = sql """select count(1) from ${currentDb}.${table1} where id=3000""" + def del = sql """select count(1) from ${currentDb}.${table1} where id=1""" + def v = upd.size() == 0 ? null : upd.get(0).get(0) + log.info("incr cnt=${cnt} id2100.version=${v} id3000.exists=${ins} id1.exists=${del}") + cnt.get(0).get(0) == 2100 && + v != null && v.toString() == '99' && + ins.get(0).get(0) == 1 && + del.get(0).get(0) == 0 + } + ) + } catch (Exception ex) { + def showjob = sql """select * from jobs("type"="insert") where Name='${jobName}'""" + def showtask = sql """select * from tasks("type"="insert") where JobName='${jobName}'""" + log.info("show job (incr): " + showjob) + log.info("show task (incr): " + showtask) + throw ex + } + + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + + def jobCountRsp = sql """select count(1) from jobs("type"="insert") where Name ='${jobName}'""" + assert jobCountRsp.get(0).get(0) == 0 + } +}