Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
import org.apache.flink.table.connector.source.DataStreamScanProvider;
import org.apache.flink.table.data.RowData;

import javax.annotation.Nullable;

import java.util.Optional;
import java.util.function.Function;

/**
Expand All @@ -41,16 +44,27 @@ public class PaimonDataStreamScanProvider implements DataStreamScanProvider, Lin
private final Function<StreamExecutionEnvironment, DataStream<RowData>> producer;
private final String name;
private final Table table;
@Nullable private final Integer parallelism;

public PaimonDataStreamScanProvider(
boolean isBounded,
Function<StreamExecutionEnvironment, DataStream<RowData>> producer,
String name,
Table table) {
this(isBounded, producer, name, table, null);
}

public PaimonDataStreamScanProvider(
boolean isBounded,
Function<StreamExecutionEnvironment, DataStream<RowData>> producer,
String name,
Table table,
@Nullable Integer parallelism) {
this.isBounded = isBounded;
this.producer = producer;
this.name = name;
this.table = table;
this.parallelism = parallelism;
}

@Override
Expand All @@ -64,6 +78,11 @@ public boolean isBounded() {
return isBounded;
}

@Override
public Optional<Integer> getParallelism() {
return Optional.ofNullable(parallelism);
}

@Override
public LineageVertex getLineageVertex() {
return LineageUtils.sourceLineageVertex(name, isBounded, table);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.flink.source;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.FlinkConnectorOptions.WatermarkEmitStrategy;
import org.apache.paimon.flink.PaimonDataStreamScanProvider;
import org.apache.paimon.flink.lookup.FileStoreLookupFunction;
Expand Down Expand Up @@ -206,7 +207,8 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
.env(env)
.build(),
tableIdentifier.asSummaryString(),
table);
table,
options.get(FlinkConnectorOptions.SCAN_PARALLELISM));
}

private ScanRuntimeProvider createPushedAggregateScan() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
return dataStreamSource;
},
tableIdentifier.asSummaryString(),
table);
table,
options.get(FlinkConnectorOptions.SCAN_PARALLELISM));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,24 @@ void testScanProviderImplementsLineageVertexProvider() throws Exception {
assertThat(vertex.datasets().get(0).name()).isEqualTo("paimon.db.src");
}

@Test
void testScanProviderGetParallelism() throws Exception {
FileStoreTable table =
createTable(new HashMap<>(), Collections.emptyList(), Arrays.asList("f0"));

PaimonDataStreamScanProvider noOverride =
new PaimonDataStreamScanProvider(true, env -> null, "paimon.db.src", table);
assertThat(noOverride.getParallelism()).isEmpty();

PaimonDataStreamScanProvider explicit =
new PaimonDataStreamScanProvider(true, env -> null, "paimon.db.src", table, 16);
assertThat(explicit.getParallelism()).contains(16);

PaimonDataStreamScanProvider nullValue =
new PaimonDataStreamScanProvider(true, env -> null, "paimon.db.src", table, null);
assertThat(nullValue.getParallelism()).isEmpty();
}

@Test
void testSinkProviderImplementsLineageVertexProvider() throws Exception {
FileStoreTable table =
Expand Down
Loading