From 774213bddfb4b161dd11c7363dea49e389b313e7 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Fri, 27 Mar 2026 19:18:12 +0800 Subject: [PATCH 1/5] z --- .../auto/basic/IoTDBPipeDataSinkIT.java | 58 +++++++++++++++++++ 1 file changed, 58 insertions(+) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeDataSinkIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeDataSinkIT.java index 36a0ea0cc92b..212eff5d879e 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeDataSinkIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeDataSinkIT.java @@ -576,4 +576,62 @@ public void testSpecialPartialInsert() throws Exception { "1635232151960,null,null,2.0,2.1,null,", "1635232143960,6.0,4.0,null,null,null,"))); } + + @Test + public void testTransferMods() { + try { + TestUtils.executeNonQueries( + senderEnv, + Arrays.asList( + "create database root.sg_nonAligned", + "create TIMESERIES root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`.s0 with datatype=boolean, encoding=RLE,compressor=snappy", + "create timeseries root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`.s1 with datatype=int32, encoding=PLAIN,compressor=LZ4", + "create timeseries root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`.s2 with datatype=int64,encoding=gorilla,compressor=uncompressed", + "create timeseries root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`.s3 with datatype=float,encoding=chimp,compressor=gzip", + "create timeseries root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`.s4 with datatype=double,encoding=ts_2diff,compressor=zstd", + "create timeseries root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`.s5 with datatype=text,encoding=dictionary,compressor=lzma2", + "insert into root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`(time,s1, s2,s3,s4,s0,s5) values(1,1,10,5.39,5.51234,true,''),(11,null,20,5.39,15.51234,false,'第2条 device_nonAligned'),(21,3,null,5.39,25.51234,true,'第3条device_nonAligned'),(31,4,40,null,35.51234,true,'第4条device_nonAligned'),(41,5,50,5.39,null,false,'第5条device_nonAligned'),(51,6,60,5.39,55.51234,null,'第6条device_nonAligned'),(61,7,70,5.39,65.51234,false,null),(71,8,80,5.39,75.51234,false,'第8条device_nonAligned'),(81,9,90,5.39,85.51234,false,'第9条device_nonAligned'),(91,10,100,5.39,95.51234,false,'第10条device_nonAligned'),(101,11,110,5.39,105.51234,false,'第11条device_nonAligned'),(111,12,120,5.39,115.51234,false,'第12条device_nonAligned'),(121,13,130,5.39,125.51234,false,'第13条device_nonAligned'),(131,14,140,5.39,135.51234,false,'第14条device_nonAligned'),(141,15,150,5.39,145.51234,false,'第15条'),(151,16,160,5.39,155.51234,false,'第16条'),(161,17,170,5.39,165.51234,false,'第17条'),(171,18,180,5.39,175.51234,false,'第18条'),(181,19,190,5.39,185.51234,false,'第19条'),(191,20,200,5.39,195.51234,false,'第20条'),(201,21,210,5.39,null,false,'第21条')", + "flush", + "delete timeseries root.sg_nonAligned.非对齐序列带有encoding和压缩方式.s0", + String.format( + "create pipe test with source ('source.realtime.mode'='stream','inclusion'='data','path'='root.sg_nonAligned.非对齐序列带有encoding和压缩方式.**','source.realtime.enable'='true','mods.enable'='true') with sink ('sink'='iotdb-thrift-sink', 'sink.node-urls'='%s')", + receiverEnv.getDataNodeWrapperList().get(0).getIpAndPortString()))); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "count timeseries root.sg_nonAligned.非对齐序列带有encoding和压缩方式.*", + "count(timeseries),", + Collections.singleton("5,")); + + TestUtils.executeNonQueries( + senderEnv, Arrays.asList("drop pipe test_history", "drop pipe test_realtime")); + + TestUtils.executeNonQuery(receiverEnv, "drop database root.**"); + + TestUtils.executeNonQueries( + senderEnv, + Arrays.asList( + "delete timeseries root.sg_nonAligned.非对齐序列带有encoding和压缩方式.s1", + String.format( + "create pipe test with source ('source.realtime.mode'='stream','inclusion'='data','path'='root.sg_nonAligned.非对齐序列带有encoding和压缩方式.**','source.realtime.enable'='true','mods.enable'='true') with sink ('sink'='iotdb-thrift-sink', 'sink.node-urls'='%s')", + receiverEnv.getDataNodeWrapperList().get(0).getIpAndPortString()))); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "count timeseries root.sg_nonAligned.非对齐序列带有encoding和压缩方式.*", + "count(timeseries),", + Collections.singleton("4,"), + 15); + TestUtils.assertDataAlwaysOnEnv( + receiverEnv, + "count timeseries root.sg_nonAligned.非对齐序列带有encoding和压缩方式.*", + "count(timeseries),", + Collections.singleton("4,")); + } finally { + TestUtils.executeNonQueries( + senderEnv, + Arrays.asList( + "drop pipe test_history", "drop pipe test_realtime", "drop database root.**")); + } + } } From 6dbfca8f4bc91fd5f742f847949b434036ea8463 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Fri, 27 Mar 2026 19:20:12 +0800 Subject: [PATCH 2/5] shop --- .../PipeHistoricalDataRegionTsFileAndDeletionSource.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java index 32fcece41158..66f8d48ce281 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java @@ -904,7 +904,8 @@ private Event supplyTsFileEvent(final TsFileResource resource) { return isReferenceCountIncreased ? event : null; } finally { try { - PipeDataNodeResourceManager.tsfile().unpinTsFileResource(resource, pipeName); + PipeDataNodeResourceManager.tsfile() + .unpinTsFileResource(resource, shouldTransferModFile, pipeName); } catch (final IOException e) { LOGGER.warn( "Pipe {}@{}: failed to unpin TsFileResource after creating event, original path: {}", @@ -989,7 +990,8 @@ public synchronized void close() { if (resource instanceof TsFileResource) { try { PipeDataNodeResourceManager.tsfile() - .unpinTsFileResource((TsFileResource) resource, pipeName); + .unpinTsFileResource( + (TsFileResource) resource, shouldTransferModFile, pipeName); } catch (final IOException e) { LOGGER.warn( "Pipe {}@{}: failed to unpin TsFileResource after dropping pipe, original path: {}", From a005b0c5bbf75aa687ddf31f309b5c3706219db3 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Fri, 27 Mar 2026 19:23:50 +0800 Subject: [PATCH 3/5] fix --- .../tsfile/PipeTsFileResourceManager.java | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java index c1e4ff9ddf1b..c84504fa52b7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java @@ -355,13 +355,18 @@ public void pinTsFileResource( } } - public void unpinTsFileResource(final TsFileResource resource, final @Nullable String pipeName) + public void unpinTsFileResource( + final TsFileResource resource, + final boolean shouldTransferModFile, + final @Nullable String pipeName) throws IOException { - final File pinnedFile = getHardlinkOrCopiedFileInPipeDir(resource.getTsFile(), pipeName); - decreaseFileReference(pinnedFile, pipeName); + decreaseFileReference( + getHardlinkOrCopiedFileInPipeDir(resource.getTsFile(), pipeName), pipeName); - if (resource.sharedModFileExists()) { - decreaseFileReference(resource.getSharedModFile().getFile(), pipeName); + if (shouldTransferModFile && resource.exclusiveModFileExists()) { + decreaseFileReference( + getHardlinkOrCopiedFileInPipeDir(resource.getExclusiveModFile().getFile(), pipeName), + pipeName); } } From 4854c40322962f83b474af478c7a2ff0f21eb39d Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Mon, 30 Mar 2026 09:47:28 +0800 Subject: [PATCH 4/5] fix --- .../pipe/it/dual/treemodel/auto/basic/IoTDBPipeDataSinkIT.java | 1 + 1 file changed, 1 insertion(+) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeDataSinkIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeDataSinkIT.java index 212eff5d879e..dc2ba05c7d54 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeDataSinkIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeDataSinkIT.java @@ -65,6 +65,7 @@ public void setUp() { protected void setupConfig() { super.setupConfig(); senderEnv.getConfig().getDataNodeConfig().setEnableRestService(true); + senderEnv.getConfig().getCommonConfig().setPipeAutoSplitFullEnabled(true); } @Test From aec8a7fd2adec2bc69c53fe686868466ee73dcce Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Mon, 30 Mar 2026 10:02:39 +0800 Subject: [PATCH 5/5] Update IoTDBPipeDataSinkIT.java --- .../treemodel/auto/basic/IoTDBPipeDataSinkIT.java | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeDataSinkIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeDataSinkIT.java index dc2ba05c7d54..de1acfa32c74 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeDataSinkIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeDataSinkIT.java @@ -106,9 +106,6 @@ public void testThriftSinkWithRealtimeFirstDisabled() throws Exception { Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); - Assert.assertEquals( - TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("testPipe").getCode()); - // Do not fail if the failure has nothing to do with pipe // Because the failures will randomly generate due to resource limitation TestUtils.executeNonQueries( @@ -173,9 +170,6 @@ private void testSinkFormat(final String format) throws Exception { .setProcessorAttributes(processorAttributes)) .getCode()); - Assert.assertEquals( - TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("testPipe").getCode()); - // Do not fail if the failure has nothing to do with pipe // Because the failures will randomly generate due to resource limitation TestUtils.executeNonQueries( @@ -237,9 +231,6 @@ public void testLegacySink() throws Exception { try (final SyncConfigNodeIServiceClient client = (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { - Assert.assertEquals( - TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("testPipe").getCode()); - // Do not fail if the failure has nothing to do with pipe // Because the failures will randomly generate due to resource limitation TestUtils.executeNonQueries( @@ -417,9 +408,6 @@ private void testReceiverLoadTsFile(final String loadTsFileStrategy) throws Exce .setProcessorAttributes(processorAttributes)) .getCode()); - Assert.assertEquals( - TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("testPipe").getCode()); - // Do not fail if the failure has nothing to do with pipe // Because the failures will randomly generate due to resource limitation TestUtils.executeNonQueries( @@ -513,9 +501,6 @@ private void testLoadTsFileWithoutVerify(final String loadTsFileStrategy) throws .setProcessorAttributes(processorAttributes)) .getCode()); - Assert.assertEquals( - TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("testPipe").getCode()); - TestUtils.executeNonQueries( senderEnv, Arrays.asList("insert into root.vehicle.d0(time, s1) values (2, 1)", "flush"),