Skip to content

Commit d430e29

Browse files
JNSimbamorningman
authored andcommitted
[Feature](streamjob) Streaming job support cdc_stream TVF (#61826)
### What problem does this PR solve? Previously, streaming jobs only supported the non-TVF path (FROM Source TO Database) for CDC ingestion. This PR adds support for streaming jobs to drive CDC ingestion via the cdc_stream Table-Valued Function (TVF), enabling a more flexible and SQL-native approach. ```sql CREATE JOB test_streaming_job_cdc_stream_postgres_name ON STREAMING DO INSERT INTO db.table SELECT name, age FROM cdc_stream( "type" = "postgres", "jdbc_url" = "jdbc:postgresql://127.0.0.1:5432/postgres", "driver_url" = "postgresql-42.5.0.jar", "driver_class" = "org.postgresql.Driver", "user" = "postgres", "password" = "123456", "database" = "postgres", "schema" = "cdc_test", "table" = "test_streaming_job_cdc_stream_postgres_src", "offset" = "initial" ) ``` Core flow: 1. Snapshot phase: FE fetches split chunks from BE, persists them to the meta table, then creates one StreamingInsertTask per split (or batch). The TVF parameters are rewritten per-task with meta (split boundary info), job.id, and task.id. 2. Binlog phase: After all snapshot splits are completed, transitions to continuous binlog ingestion via the same cdc_stream TVF with binlog offset as meta. 3. Offset commit (2PC safe): After fetchRecordStream completes on BE, the actual end offset is stored in PipelineCoordinator.taskOffsetCache. FE pulls this via brpc in beforeCommitted and stores it in the txn attachment — ensuring the offset is committed atomically with the data. 4. FE restart recovery: State is recovered via txn replay (replayOnCommitted) rather than EditLog, rebuilding chunkHighWatermarkMap and remainingSplits from the meta table.
1 parent 7a0ec6d commit d430e29

27 files changed

Lines changed: 1497 additions & 72 deletions

fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ public class DataSourceConfigKeys {
3535
public static final String OFFSET_LATEST = "latest";
3636
public static final String OFFSET_SNAPSHOT = "snapshot";
3737
public static final String SNAPSHOT_SPLIT_SIZE = "snapshot_split_size";
38+
public static final String SNAPSHOT_SPLIT_KEY = "snapshot_split_key";
3839
public static final String SNAPSHOT_PARALLELISM = "snapshot_parallelism";
3940
public static final String SNAPSHOT_PARALLELISM_DEFAULT = "1";
4041
public static final String SSL_MODE = "ssl_mode";

fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/AbstractStreamingTask.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
import lombok.extern.log4j.Log4j2;
3434
import org.apache.commons.lang3.StringUtils;
3535

36+
import java.util.Collections;
37+
import java.util.List;
3638
import java.util.concurrent.atomic.AtomicBoolean;
3739

3840
@Log4j2
@@ -68,6 +70,15 @@ public AbstractStreamingTask(long jobId, long taskId, UserIdentity userIdentity)
6870

6971
public abstract void run() throws JobException;
7072

73+
/**
74+
* Returns the IDs of backends that ran the scan node for this task.
75+
* Subclasses backed by a TVF query (e.g. StreamingInsertTask) override this
76+
* to return the actual scan backend IDs from the coordinator.
77+
*/
78+
public List<Long> getScanBackendIds() {
79+
return Collections.emptyList();
80+
}
81+
7182
public abstract boolean onSuccess() throws JobException;
7283

7384
public abstract void closeOrReleaseResources();

fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java

Lines changed: 41 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import org.apache.doris.job.offset.SourceOffsetProvider;
5252
import org.apache.doris.job.offset.SourceOffsetProviderFactory;
5353
import org.apache.doris.job.offset.jdbc.JdbcSourceOffsetProvider;
54+
import org.apache.doris.job.offset.jdbc.JdbcTvfSourceOffsetProvider;
5455
import org.apache.doris.job.util.StreamingJobUtils;
5556
import org.apache.doris.load.loadv2.LoadJob;
5657
import org.apache.doris.load.loadv2.LoadStatistic;
@@ -72,6 +73,7 @@
7273
import org.apache.doris.qe.ShowResultSetMetaData;
7374
import org.apache.doris.rpc.RpcException;
7475
import org.apache.doris.service.FrontendOptions;
76+
import org.apache.doris.tablefunction.S3TableValuedFunction;
7577
import org.apache.doris.thrift.TCell;
7678
import org.apache.doris.thrift.TRow;
7779
import org.apache.doris.transaction.TransactionException;
@@ -300,8 +302,11 @@ private void initInsertJob() {
300302
this.tvfType = currentTvf.getFunctionName();
301303
this.originTvfProps = currentTvf.getProperties().getMap();
302304
this.offsetProvider = SourceOffsetProviderFactory.createSourceOffsetProvider(currentTvf.getFunctionName());
303-
// validate offset props
304-
if (jobProperties.getOffsetProperty() != null) {
305+
this.offsetProvider.ensureInitialized(getJobId(), originTvfProps);
306+
this.offsetProvider.initOnCreate();
307+
// validate offset props, only for s3 cause s3 tvf no offset prop
308+
if (jobProperties.getOffsetProperty() != null
309+
&& S3TableValuedFunction.NAME.equalsIgnoreCase(tvfType)) {
305310
Offset offset = validateOffset(jobProperties.getOffsetProperty());
306311
this.offsetProvider.updateOffset(offset);
307312
}
@@ -536,6 +541,8 @@ protected void fetchMeta() throws JobException {
536541
if (originTvfProps == null) {
537542
this.originTvfProps = getCurrentTvf().getProperties().getMap();
538543
}
544+
// when fe restart, offsetProvider.jobId may be null
545+
offsetProvider.ensureInitialized(getJobId(), originTvfProps);
539546
offsetProvider.fetchRemoteMeta(originTvfProps);
540547
} else {
541548
offsetProvider.fetchRemoteMeta(new HashMap<>());
@@ -653,7 +660,14 @@ private void updateJobStatisticAndOffset(StreamingTaskTxnCommitAttachment attach
653660
this.jobStatistic.setLoadBytes(this.jobStatistic.getLoadBytes() + attachment.getLoadBytes());
654661
this.jobStatistic.setFileNumber(this.jobStatistic.getFileNumber() + attachment.getNumFiles());
655662
this.jobStatistic.setFileSize(this.jobStatistic.getFileSize() + attachment.getFileBytes());
656-
offsetProvider.updateOffset(offsetProvider.deserializeOffset(attachment.getOffset()));
663+
Offset endOffset = offsetProvider.deserializeOffset(attachment.getOffset());
664+
offsetProvider.updateOffset(endOffset);
665+
if (!isReplay) {
666+
offsetProvider.onTaskCommitted(attachment.getScannedRows(), attachment.getLoadBytes());
667+
if (runningStreamTask != null) {
668+
offsetProvider.applyEndOffsetToTask(runningStreamTask.getRunningOffset(), endOffset);
669+
}
670+
}
657671

658672
//update metric
659673
if (MetricRepo.isInit && !isReplay) {
@@ -757,7 +771,8 @@ public void replayOnUpdated(StreamingInsertJob replayJob) {
757771
*/
758772
private void modifyPropertiesInternal(Map<String, String> inputProperties) throws AnalysisException, JobException {
759773
StreamingJobProperties inputStreamProps = new StreamingJobProperties(inputProperties);
760-
if (StringUtils.isNotEmpty(inputStreamProps.getOffsetProperty()) && this.tvfType != null) {
774+
if (StringUtils.isNotEmpty(inputStreamProps.getOffsetProperty())
775+
&& S3TableValuedFunction.NAME.equalsIgnoreCase(this.tvfType)) {
761776
Offset offset = validateOffset(inputStreamProps.getOffsetProperty());
762777
this.offsetProvider.updateOffset(offset);
763778
if (Config.isCloudMode()) {
@@ -1016,14 +1031,25 @@ public void beforeCommitted(TransactionState txnState) throws TransactionExcepti
10161031
}
10171032
LoadJob loadJob = loadJobs.get(0);
10181033
LoadStatistic loadStatistic = loadJob.getLoadStatistic();
1034+
1035+
String offsetJson = offsetProvider.getCommitOffsetJson(
1036+
runningStreamTask.getRunningOffset(),
1037+
runningStreamTask.getTaskId(),
1038+
runningStreamTask.getScanBackendIds());
1039+
1040+
1041+
if (StringUtils.isBlank(offsetJson)) {
1042+
throw new TransactionException("Cannot find offset for attachment, load job id is "
1043+
+ runningStreamTask.getTaskId());
1044+
}
10191045
txnState.setTxnCommitAttachment(new StreamingTaskTxnCommitAttachment(
10201046
getJobId(),
10211047
runningStreamTask.getTaskId(),
10221048
loadStatistic.getScannedRows(),
10231049
loadStatistic.getLoadBytes(),
10241050
loadStatistic.getFileNumber(),
10251051
loadStatistic.getTotalFileSizeB(),
1026-
runningStreamTask.getRunningOffset().toSerializedJson()));
1052+
offsetJson));
10271053
} finally {
10281054
if (shouldReleaseLock) {
10291055
writeUnlock();
@@ -1186,10 +1212,7 @@ public void commitOffset(CommitOffsetRequest offsetRequest) throws JobException
11861212
}
11871213
checkDataQuality(offsetRequest);
11881214
updateNoTxnJobStatisticAndOffset(offsetRequest);
1189-
if (offsetRequest.getScannedRows() == 0 && offsetRequest.getLoadBytes() == 0) {
1190-
JdbcSourceOffsetProvider op = (JdbcSourceOffsetProvider) offsetProvider;
1191-
op.setHasMoreData(false);
1192-
}
1215+
offsetProvider.onTaskCommitted(offsetRequest.getScannedRows(), offsetRequest.getLoadBytes());
11931216
if (offsetRequest.getTableSchemas() != null) {
11941217
JdbcSourceOffsetProvider op = (JdbcSourceOffsetProvider) offsetProvider;
11951218
op.setTableSchemas(offsetRequest.getTableSchemas());
@@ -1278,6 +1301,7 @@ public boolean hasReachedEnd() {
12781301
*/
12791302
public void cleanup() throws JobException {
12801303
log.info("cleanup streaming job {}", getJobId());
1304+
12811305
// s3 tvf clean offset
12821306
if (tvfType != null && Config.isCloudMode()) {
12831307
Cloud.DeleteStreamingJobResponse resp = null;
@@ -1298,6 +1322,14 @@ public void cleanup() throws JobException {
12981322
}
12991323
}
13001324

1325+
// For TVF path, provider fields may be null after FE restart
1326+
if (this.offsetProvider instanceof JdbcTvfSourceOffsetProvider) {
1327+
if (originTvfProps == null) {
1328+
this.originTvfProps = getCurrentTvf().getProperties().getMap();
1329+
}
1330+
offsetProvider.ensureInitialized(getJobId(), originTvfProps);
1331+
}
1332+
13011333
if (this.offsetProvider instanceof JdbcSourceOffsetProvider) {
13021334
// jdbc clean chunk meta table
13031335
((JdbcSourceOffsetProvider) this.offsetProvider).cleanMeta(getJobId());

fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import lombok.extern.log4j.Log4j2;
4444

4545
import java.util.Arrays;
46+
import java.util.Collections;
4647
import java.util.List;
4748
import java.util.Map;
4849
import java.util.Optional;
@@ -98,7 +99,7 @@ public void before() throws Exception {
9899
if (!baseCommand.getParsedPlan().isPresent()) {
99100
throw new JobException("Can not get Parsed plan");
100101
}
101-
this.taskCommand = offsetProvider.rewriteTvfParams(baseCommand, runningOffset);
102+
this.taskCommand = offsetProvider.rewriteTvfParams(baseCommand, runningOffset, getTaskId());
102103
this.taskCommand.setLabelName(Optional.of(labelName));
103104
this.stmtExecutor = new StmtExecutor(ctx, new LogicalPlanAdapter(taskCommand, ctx.getStatementContext()));
104105
}
@@ -126,6 +127,14 @@ public void run() throws JobException {
126127
}
127128
}
128129

130+
@Override
131+
public List<Long> getScanBackendIds() {
132+
if (stmtExecutor != null && stmtExecutor.getCoord() != null) {
133+
return stmtExecutor.getCoord().getScanBackendIds();
134+
}
135+
return Collections.emptyList();
136+
}
137+
129138
@Override
130139
public boolean onSuccess() throws JobException {
131140
if (getIsCanceled().get()) {

fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,8 @@ private WriteRecordRequest buildRequestParams() throws JobException {
192192
Map<String, String> props = generateStreamLoadProps();
193193
request.setStreamLoadProps(props);
194194

195+
//`meta` refers to the data synchronized by the job in this instance,
196+
// while `sourceProperties.offset` is the data entered by the user.
195197
Map<String, Object> splitMeta = offset.generateMeta();
196198
Preconditions.checkArgument(!splitMeta.isEmpty(), "split meta is empty");
197199
request.setMeta(splitMeta);

fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.doris.job.extensions.insert.streaming.StreamingJobProperties;
2323
import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
2424

25+
import java.util.List;
2526
import java.util.Map;
2627

2728
/**
@@ -36,6 +37,20 @@ public interface SourceOffsetProvider {
3637
*/
3738
String getSourceType();
3839

40+
/**
41+
* Initialize the offset provider with job ID and original TVF properties.
42+
* Only sets in-memory fields; safe to call on both fresh start and FE restart.
43+
* May perform remote calls (e.g. fetching snapshot splits), so throws JobException.
44+
*/
45+
default void ensureInitialized(Long jobId, Map<String, String> originTvfProps) throws JobException {}
46+
47+
/**
48+
* Performs one-time initialization that must run only on fresh job creation, not on FE restart.
49+
* For example, fetching and persisting snapshot splits to the meta table.
50+
* Default: no-op (most providers need no extra setup).
51+
*/
52+
default void initOnCreate() throws JobException {}
53+
3954
/**
4055
* Get next offset to consume
4156
*
@@ -59,11 +74,12 @@ public interface SourceOffsetProvider {
5974

6075
/**
6176
* Rewrite the TVF parameters in the SQL based on the current offset.
77+
* Only implemented by TVF-based providers (e.g. S3, cdc_stream).
6278
*
6379
* @param nextOffset
6480
* @return rewritten InsertIntoTableCommand
6581
*/
66-
InsertIntoTableCommand rewriteTvfParams(InsertIntoTableCommand originCommand, Offset nextOffset);
82+
InsertIntoTableCommand rewriteTvfParams(InsertIntoTableCommand originCommand, Offset nextOffset, long taskId);
6783

6884
/**
6985
* Update the offset of the source.
@@ -110,6 +126,30 @@ default String getPersistInfo() {
110126
return null;
111127
}
112128

129+
/**
130+
* Returns the serialized JSON offset to store in txn commit attachment.
131+
* Default: serialize running offset directly (e.g. S3 path).
132+
* CDC stream TVF overrides to pull actual end offset from BE after fetchRecordStream completes.
133+
* scanBackendIds: IDs of the BEs that ran the TVF scan node, used to locate taskOffsetCache.
134+
*/
135+
default String getCommitOffsetJson(Offset runningOffset, long taskId, List<Long> scanBackendIds) {
136+
return runningOffset.toSerializedJson();
137+
}
138+
139+
/**
140+
* Called after each task is committed. Providers that track data availability
141+
* (e.g. JDBC binlog) can use this to update internal state such as hasMoreData.
142+
* Default: no-op.
143+
*/
144+
default void onTaskCommitted(long scannedRows, long loadBytes) {}
145+
146+
/**
147+
* Applies the end offset from a committed task back onto the running offset object
148+
* in-place, so that showRange() can display the full [start, end] interval.
149+
* Default: no-op (only meaningful for JDBC providers).
150+
*/
151+
default void applyEndOffsetToTask(Offset runningOffset, Offset endOffset) {}
152+
113153
/**
114154
* Returns true if the provider has reached a natural completion point
115155
* and the job should be marked as FINISHED.

fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProviderFactory.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.doris.job.offset;
1919

2020
import org.apache.doris.job.exception.JobException;
21+
import org.apache.doris.job.offset.jdbc.JdbcTvfSourceOffsetProvider;
2122
import org.apache.doris.job.offset.s3.S3SourceOffsetProvider;
2223

2324
import lombok.extern.log4j.Log4j2;
@@ -31,6 +32,7 @@ public class SourceOffsetProviderFactory {
3132

3233
static {
3334
map.put("s3", S3SourceOffsetProvider.class);
35+
map.put("cdc_stream", JdbcTvfSourceOffsetProvider.class);
3436
}
3537

3638
public static SourceOffsetProvider createSourceOffsetProvider(String sourceType) {

fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java

Lines changed: 31 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -70,10 +70,10 @@
7070
public class JdbcSourceOffsetProvider implements SourceOffsetProvider {
7171
public static final String SPLIT_ID = "splitId";
7272
private static final ObjectMapper objectMapper = new ObjectMapper();
73-
private final int snapshotParallelism;
74-
private Long jobId;
75-
private DataSourceType sourceType;
76-
private Map<String, String> sourceProperties = new HashMap<>();
73+
protected int snapshotParallelism = 1;
74+
protected Long jobId;
75+
protected DataSourceType sourceType;
76+
protected Map<String, String> sourceProperties = new HashMap<>();
7777

7878
List<SnapshotSplit> remainingSplits = new ArrayList<>();
7979
List<SnapshotSplit> finishedSplits = new ArrayList<>();
@@ -92,6 +92,16 @@ public class JdbcSourceOffsetProvider implements SourceOffsetProvider {
9292

9393
volatile boolean hasMoreData = true;
9494

95+
/**
96+
* No-arg constructor for subclass use.
97+
*/
98+
public JdbcSourceOffsetProvider() {
99+
this.chunkHighWatermarkMap = new HashMap<>();
100+
}
101+
102+
/**
103+
* Constructor for FROM Source TO Database.
104+
*/
95105
public JdbcSourceOffsetProvider(Long jobId, DataSourceType sourceType, Map<String, String> sourceProperties) {
96106
this.jobId = jobId;
97107
this.sourceType = sourceType;
@@ -155,10 +165,13 @@ public String getShowMaxOffset() {
155165
return null;
156166
}
157167

168+
/**
169+
* Should never call this.
170+
*/
158171
@Override
159-
public InsertIntoTableCommand rewriteTvfParams(InsertIntoTableCommand originCommand, Offset nextOffset) {
160-
// todo: only for cdc tvf
161-
return null;
172+
public InsertIntoTableCommand rewriteTvfParams(InsertIntoTableCommand originCommand, Offset nextOffset,
173+
long taskId) {
174+
throw new UnsupportedOperationException("rewriteTvfParams not supported for " + getClass().getSimpleName());
162175
}
163176

164177
@Override
@@ -347,7 +360,7 @@ public Offset deserializeOffset(String offset) {
347360

348361
@Override
349362
public Offset deserializeOffsetProperty(String offset) {
350-
// no need
363+
// no need cause cdc_stream has offset property
351364
return null;
352365
}
353366

@@ -415,7 +428,7 @@ public void replayIfNeed(StreamingInsertJob job) throws JobException {
415428
* Assign the HW value to the synchronized Split,
416429
* and remove the Split from remainSplit and place it in finishedSplit.
417430
*/
418-
private List<SnapshotSplit> recalculateRemainingSplits(
431+
protected List<SnapshotSplit> recalculateRemainingSplits(
419432
Map<String, Map<String, Map<String, String>>> chunkHighWatermarkMap,
420433
Map<String, List<SnapshotSplit>> snapshotSplits) {
421434
if (this.finishedSplits == null) {
@@ -541,7 +554,7 @@ private List<SnapshotSplit> requestTableSplits(String table) throws JobException
541554
}
542555
}
543556

544-
private boolean checkNeedSplitChunks(Map<String, String> sourceProperties) {
557+
protected boolean checkNeedSplitChunks(Map<String, String> sourceProperties) {
545558
String startMode = sourceProperties.get(DataSourceConfigKeys.OFFSET);
546559
if (startMode == null) {
547560
return false;
@@ -550,11 +563,18 @@ private boolean checkNeedSplitChunks(Map<String, String> sourceProperties) {
550563
|| DataSourceConfigKeys.OFFSET_SNAPSHOT.equalsIgnoreCase(startMode);
551564
}
552565

553-
private boolean isSnapshotOnlyMode() {
566+
protected boolean isSnapshotOnlyMode() {
554567
String offset = sourceProperties.get(DataSourceConfigKeys.OFFSET);
555568
return DataSourceConfigKeys.OFFSET_SNAPSHOT.equalsIgnoreCase(offset);
556569
}
557570

571+
@Override
572+
public void onTaskCommitted(long scannedRows, long loadBytes) {
573+
if (scannedRows == 0 && loadBytes == 0) {
574+
hasMoreData = false;
575+
}
576+
}
577+
558578
@Override
559579
public boolean hasReachedEnd() {
560580
return isSnapshotOnlyMode()

0 commit comments

Comments
 (0)