Skip to content

Commit 355a872

Browse files
authored
Pipe: Fixed the bug that separated historical pipe may not include mod on deletion & The pipe without data.insert may be wrongly separated by pipe and transfer data (apache#17346)
* fix * fix * f
1 parent e48272a commit 355a872

3 files changed

Lines changed: 80 additions & 33 deletions

File tree

integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeAutoSplitIT.java

Lines changed: 49 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
2424
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
2525
import org.apache.iotdb.consensus.ConsensusFactory;
26+
import org.apache.iotdb.db.it.utils.TestUtils;
2627
import org.apache.iotdb.isession.SessionConfig;
2728
import org.apache.iotdb.it.env.MultiEnvFactory;
2829
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
@@ -39,6 +40,8 @@
3940
import java.sql.Connection;
4041
import java.sql.SQLException;
4142
import java.sql.Statement;
43+
import java.util.Arrays;
44+
import java.util.Collections;
4245
import java.util.List;
4346
import java.util.Objects;
4447

@@ -81,13 +84,12 @@ public void setUp() {
8184
public void testSingleEnv() throws Exception {
8285
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
8386

84-
final String sql =
85-
String.format(
86-
"create pipe a2b with source ('source'='iotdb-source') with processor ('processor'='do-nothing-processor') with sink ('node-urls'='%s')",
87-
receiverDataNode.getIpAndPortString());
8887
try (final Connection connection = senderEnv.getConnection();
8988
final Statement statement = connection.createStatement()) {
90-
statement.execute(sql);
89+
statement.execute(
90+
String.format(
91+
"create pipe a2b with sink ('node-urls'='%s')",
92+
receiverDataNode.getIpAndPortString()));
9193
} catch (final SQLException e) {
9294
fail(e.getMessage());
9395
}
@@ -104,5 +106,47 @@ public void testSingleEnv() throws Exception {
104106
|| (Objects.equals(showPipeResult.get(1).id, "a2b_history")
105107
&& Objects.equals(showPipeResult.get(0).id, "a2b_realtime")));
106108
}
109+
110+
// Do not split for pipes without insertion or non-full
111+
TestUtils.executeNonQueries(
112+
senderEnv,
113+
Arrays.asList(
114+
"drop pipe a2b_history",
115+
"drop pipe a2b_realtime",
116+
String.format(
117+
"create pipe a2b1 with source ('inclusion'='schema') with sink ('node-urls'='%s')",
118+
receiverDataNode.getIpAndPortString()),
119+
String.format(
120+
"create pipe a2b2 with source ('realtime.enable'='false') with sink ('node-urls'='%s')",
121+
receiverDataNode.getIpAndPortString()),
122+
String.format(
123+
"create pipe a2b3 with source ('history.enable'='false') with sink ('node-urls'='%s')",
124+
receiverDataNode.getIpAndPortString())));
125+
126+
try (final SyncConfigNodeIServiceClient client =
127+
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
128+
final List<TShowPipeInfo> showPipeResult =
129+
client.showPipe(new TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList;
130+
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
131+
Assert.assertEquals(3, showPipeResult.size());
132+
}
133+
134+
TestUtils.executeNonQueries(
135+
senderEnv,
136+
Arrays.asList(
137+
"drop pipe a2b1",
138+
"drop pipe a2b2",
139+
"drop pipe a2b3",
140+
"insert into root.test.device(time, field) values(0,1),(1,2)",
141+
"delete from root.test.device.* where time == 0",
142+
String.format(
143+
"create pipe a2b with source ('inclusion'='all') with sink ('node-urls'='%s')",
144+
receiverDataNode.getIpAndPortString())));
145+
146+
TestUtils.assertDataEventuallyOnEnv(
147+
receiverEnv,
148+
"select * from root.test.device",
149+
"Time,root.test.device.field,",
150+
Collections.singleton("1,2.0,"));
107151
}
108152
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -584,7 +584,7 @@ public boolean hasPipeReleaseRegionRelatedResource(final int consensusGroupId) {
584584
}
585585
}
586586

587-
public boolean isFullSync(final PipeParameters parameters) {
587+
public boolean isFullSync(final PipeParameters parameters) throws IllegalPathException {
588588
if (isSnapshotMode(parameters)) {
589589
return false;
590590
}
@@ -598,7 +598,10 @@ public boolean isFullSync(final PipeParameters parameters) {
598598
Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY, SOURCE_REALTIME_ENABLE_KEY),
599599
EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE);
600600

601-
return isHistoryEnable && isRealtimeEnable;
601+
return isHistoryEnable
602+
&& isRealtimeEnable
603+
&& DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair(parameters)
604+
.getLeft();
602605
}
603606

604607
@Override

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java

Lines changed: 26 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,7 @@
181181
import org.apache.iotdb.db.exception.metadata.SchemaQuotaExceededException;
182182
import org.apache.iotdb.db.exception.sql.SemanticException;
183183
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
184+
import org.apache.iotdb.db.pipe.source.dataregion.DataRegionListeningFilter;
184185
import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
185186
import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
186187
import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
@@ -2205,10 +2206,10 @@ public SettableFuture<ConfigTaskResult> createPipe(
22052206
new PipeParameters(createPipeStatement.getSourceAttributes());
22062207
final PipeParameters sinkPipeParameters =
22072208
new PipeParameters(createPipeStatement.getSinkAttributes());
2208-
if (PipeConfig.getInstance().getPipeAutoSplitFullEnabled()
2209-
&& PipeDataNodeAgent.task().isFullSync(sourcePipeParameters)) {
2210-
try (final ConfigNodeClient configNodeClient =
2211-
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
2209+
try (final ConfigNodeClient configNodeClient =
2210+
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
2211+
if (PipeConfig.getInstance().getPipeAutoSplitFullEnabled()
2212+
&& PipeDataNodeAgent.task().isFullSync(sourcePipeParameters)) {
22122213
// 1. Send request to create the real-time data synchronization pipeline
22132214
final TCreatePipeReq realtimeReq =
22142215
new TCreatePipeReq()
@@ -2253,11 +2254,17 @@ public SettableFuture<ConfigTaskResult> createPipe(
22532254
Boolean.toString(false),
22542255
PipeSourceConstant.EXTRACTOR_HISTORY_ENABLE_KEY,
22552256
Boolean.toString(true),
2256-
// We force the historical pipe to transfer data only
2257+
// We force the historical pipe to transfer data (and maybe
2258+
// deletion) only
22572259
// Thus we can transfer schema only once
22582260
// And may drop the historical pipe on successfully transferred
22592261
PipeSourceConstant.SOURCE_INCLUSION_KEY,
2260-
PipeSourceConstant.EXTRACTOR_INCLUSION_DEFAULT_VALUE,
2262+
DataRegionListeningFilter
2263+
.parseInsertionDeletionListeningOptionPair(
2264+
sourcePipeParameters)
2265+
.getRight()
2266+
? "data"
2267+
: PipeSourceConstant.EXTRACTOR_INCLUSION_DEFAULT_VALUE,
22612268
PipeSourceConstant.SOURCE_EXCLUSION_KEY,
22622269
PipeSourceConstant.EXTRACTOR_EXCLUSION_DEFAULT_VALUE)))
22632270
.getAttribute())
@@ -2280,27 +2287,20 @@ public SettableFuture<ConfigTaskResult> createPipe(
22802287

22812288
// 3. Set success status only if both pipelines are created successfully
22822289
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
2283-
} catch (final Exception e) {
2284-
// Catch any other exceptions (e.g., network issues)
2285-
future.setException(e);
2286-
}
2287-
return future;
2288-
}
2289-
2290-
try (final ConfigNodeClient configNodeClient =
2291-
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
2292-
final TCreatePipeReq req =
2293-
new TCreatePipeReq()
2294-
.setPipeName(pipeName)
2295-
.setIfNotExistsCondition(createPipeStatement.hasIfNotExistsCondition())
2296-
.setExtractorAttributes(createPipeStatement.getSourceAttributes())
2297-
.setProcessorAttributes(createPipeStatement.getProcessorAttributes())
2298-
.setConnectorAttributes(createPipeStatement.getSinkAttributes());
2299-
final TSStatus tsStatus = configNodeClient.createPipe(req);
2300-
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
2301-
future.setException(new IoTDBException(tsStatus));
23022290
} else {
2303-
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
2291+
final TCreatePipeReq req =
2292+
new TCreatePipeReq()
2293+
.setPipeName(pipeName)
2294+
.setIfNotExistsCondition(createPipeStatement.hasIfNotExistsCondition())
2295+
.setExtractorAttributes(createPipeStatement.getSourceAttributes())
2296+
.setProcessorAttributes(createPipeStatement.getProcessorAttributes())
2297+
.setConnectorAttributes(createPipeStatement.getSinkAttributes());
2298+
final TSStatus tsStatus = configNodeClient.createPipe(req);
2299+
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
2300+
future.setException(new IoTDBException(tsStatus));
2301+
} else {
2302+
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
2303+
}
23042304
}
23052305
} catch (final Exception e) {
23062306
future.setException(e);

0 commit comments

Comments
 (0)