diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/PaimonDataStreamScanProvider.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/PaimonDataStreamScanProvider.java index 776575dc0a72..3a63dfc15c71 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/PaimonDataStreamScanProvider.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/PaimonDataStreamScanProvider.java @@ -29,6 +29,9 @@ import org.apache.flink.table.connector.source.DataStreamScanProvider; import org.apache.flink.table.data.RowData; +import javax.annotation.Nullable; + +import java.util.Optional; import java.util.function.Function; /** @@ -41,16 +44,27 @@ public class PaimonDataStreamScanProvider implements DataStreamScanProvider, Lin private final Function> producer; private final String name; private final Table table; + @Nullable private final Integer parallelism; public PaimonDataStreamScanProvider( boolean isBounded, Function> producer, String name, Table table) { + this(isBounded, producer, name, table, null); + } + + public PaimonDataStreamScanProvider( + boolean isBounded, + Function> producer, + String name, + Table table, + @Nullable Integer parallelism) { this.isBounded = isBounded; this.producer = producer; this.name = name; this.table = table; + this.parallelism = parallelism; } @Override @@ -64,6 +78,11 @@ public boolean isBounded() { return isBounded; } + @Override + public Optional getParallelism() { + return Optional.ofNullable(parallelism); + } + @Override public LineageVertex getLineageVertex() { return LineageUtils.sourceLineageVertex(name, isBounded, table); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java index a49f87c63fe9..c66291163f30 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java @@ -19,6 +19,7 @@ package org.apache.paimon.flink.source; import org.apache.paimon.CoreOptions; +import org.apache.paimon.flink.FlinkConnectorOptions; import org.apache.paimon.flink.FlinkConnectorOptions.WatermarkEmitStrategy; import org.apache.paimon.flink.PaimonDataStreamScanProvider; import org.apache.paimon.flink.lookup.FileStoreLookupFunction; @@ -206,7 +207,8 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { .env(env) .build(), tableIdentifier.asSummaryString(), - table); + table, + options.get(FlinkConnectorOptions.SCAN_PARALLELISM)); } private ScanRuntimeProvider createPushedAggregateScan() { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java index d9ede90a144f..c428e6c656c6 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java @@ -132,7 +132,8 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { return dataStreamSource; }, tableIdentifier.asSummaryString(), - table); + table, + options.get(FlinkConnectorOptions.SCAN_PARALLELISM)); } @Override diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lineage/LineageUtilsTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lineage/LineageUtilsTest.java index 62d601ec1b23..991d0bbb0cc8 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lineage/LineageUtilsTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lineage/LineageUtilsTest.java @@ -192,6 +192,24 @@ void testScanProviderImplementsLineageVertexProvider() throws Exception { assertThat(vertex.datasets().get(0).name()).isEqualTo("paimon.db.src"); } + @Test + void testScanProviderGetParallelism() throws Exception { + FileStoreTable table = + createTable(new HashMap<>(), Collections.emptyList(), Arrays.asList("f0")); + + PaimonDataStreamScanProvider noOverride = + new PaimonDataStreamScanProvider(true, env -> null, "paimon.db.src", table); + assertThat(noOverride.getParallelism()).isEmpty(); + + PaimonDataStreamScanProvider explicit = + new PaimonDataStreamScanProvider(true, env -> null, "paimon.db.src", table, 16); + assertThat(explicit.getParallelism()).contains(16); + + PaimonDataStreamScanProvider nullValue = + new PaimonDataStreamScanProvider(true, env -> null, "paimon.db.src", table, null); + assertThat(nullValue.getParallelism()).isEmpty(); + } + @Test void testSinkProviderImplementsLineageVertexProvider() throws Exception { FileStoreTable table =