Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public void setUp() {
protected void setupConfig() {
super.setupConfig();
senderEnv.getConfig().getDataNodeConfig().setEnableRestService(true);
senderEnv.getConfig().getCommonConfig().setPipeAutoSplitFullEnabled(true);
}

@Test
Expand Down Expand Up @@ -105,9 +106,6 @@ public void testThriftSinkWithRealtimeFirstDisabled() throws Exception {

Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());

Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("testPipe").getCode());

// Do not fail if the failure has nothing to do with pipe
// Because the failures will randomly generate due to resource limitation
TestUtils.executeNonQueries(
Expand Down Expand Up @@ -172,9 +170,6 @@ private void testSinkFormat(final String format) throws Exception {
.setProcessorAttributes(processorAttributes))
.getCode());

Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("testPipe").getCode());

// Do not fail if the failure has nothing to do with pipe
// Because the failures will randomly generate due to resource limitation
TestUtils.executeNonQueries(
Expand Down Expand Up @@ -236,9 +231,6 @@ public void testLegacySink() throws Exception {

try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("testPipe").getCode());

// Do not fail if the failure has nothing to do with pipe
// Because the failures will randomly generate due to resource limitation
TestUtils.executeNonQueries(
Expand Down Expand Up @@ -416,9 +408,6 @@ private void testReceiverLoadTsFile(final String loadTsFileStrategy) throws Exce
.setProcessorAttributes(processorAttributes))
.getCode());

Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("testPipe").getCode());

// Do not fail if the failure has nothing to do with pipe
// Because the failures will randomly generate due to resource limitation
TestUtils.executeNonQueries(
Expand Down Expand Up @@ -512,9 +501,6 @@ private void testLoadTsFileWithoutVerify(final String loadTsFileStrategy) throws
.setProcessorAttributes(processorAttributes))
.getCode());

Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("testPipe").getCode());

TestUtils.executeNonQueries(
senderEnv,
Arrays.asList("insert into root.vehicle.d0(time, s1) values (2, 1)", "flush"),
Expand Down Expand Up @@ -576,4 +562,62 @@ public void testSpecialPartialInsert() throws Exception {
"1635232151960,null,null,2.0,2.1,null,",
"1635232143960,6.0,4.0,null,null,null,")));
}

@Test
public void testTransferMods() {
try {
TestUtils.executeNonQueries(
senderEnv,
Arrays.asList(
"create database root.sg_nonAligned",
"create TIMESERIES root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`.s0 with datatype=boolean, encoding=RLE,compressor=snappy",
"create timeseries root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`.s1 with datatype=int32, encoding=PLAIN,compressor=LZ4",
"create timeseries root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`.s2 with datatype=int64,encoding=gorilla,compressor=uncompressed",
"create timeseries root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`.s3 with datatype=float,encoding=chimp,compressor=gzip",
"create timeseries root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`.s4 with datatype=double,encoding=ts_2diff,compressor=zstd",
"create timeseries root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`.s5 with datatype=text,encoding=dictionary,compressor=lzma2",
"insert into root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`(time,s1, s2,s3,s4,s0,s5) values(1,1,10,5.39,5.51234,true,''),(11,null,20,5.39,15.51234,false,'第2条 device_nonAligned'),(21,3,null,5.39,25.51234,true,'第3条device_nonAligned'),(31,4,40,null,35.51234,true,'第4条device_nonAligned'),(41,5,50,5.39,null,false,'第5条device_nonAligned'),(51,6,60,5.39,55.51234,null,'第6条device_nonAligned'),(61,7,70,5.39,65.51234,false,null),(71,8,80,5.39,75.51234,false,'第8条device_nonAligned'),(81,9,90,5.39,85.51234,false,'第9条device_nonAligned'),(91,10,100,5.39,95.51234,false,'第10条device_nonAligned'),(101,11,110,5.39,105.51234,false,'第11条device_nonAligned'),(111,12,120,5.39,115.51234,false,'第12条device_nonAligned'),(121,13,130,5.39,125.51234,false,'第13条device_nonAligned'),(131,14,140,5.39,135.51234,false,'第14条device_nonAligned'),(141,15,150,5.39,145.51234,false,'第15条'),(151,16,160,5.39,155.51234,false,'第16条'),(161,17,170,5.39,165.51234,false,'第17条'),(171,18,180,5.39,175.51234,false,'第18条'),(181,19,190,5.39,185.51234,false,'第19条'),(191,20,200,5.39,195.51234,false,'第20条'),(201,21,210,5.39,null,false,'第21条')",
"flush",
"delete timeseries root.sg_nonAligned.非对齐序列带有encoding和压缩方式.s0",
String.format(
"create pipe test with source ('source.realtime.mode'='stream','inclusion'='data','path'='root.sg_nonAligned.非对齐序列带有encoding和压缩方式.**','source.realtime.enable'='true','mods.enable'='true') with sink ('sink'='iotdb-thrift-sink', 'sink.node-urls'='%s')",
receiverEnv.getDataNodeWrapperList().get(0).getIpAndPortString())));

TestUtils.assertDataEventuallyOnEnv(
receiverEnv,
"count timeseries root.sg_nonAligned.非对齐序列带有encoding和压缩方式.*",
"count(timeseries),",
Collections.singleton("5,"));

TestUtils.executeNonQueries(
senderEnv, Arrays.asList("drop pipe test_history", "drop pipe test_realtime"));

TestUtils.executeNonQuery(receiverEnv, "drop database root.**");

TestUtils.executeNonQueries(
senderEnv,
Arrays.asList(
"delete timeseries root.sg_nonAligned.非对齐序列带有encoding和压缩方式.s1",
String.format(
"create pipe test with source ('source.realtime.mode'='stream','inclusion'='data','path'='root.sg_nonAligned.非对齐序列带有encoding和压缩方式.**','source.realtime.enable'='true','mods.enable'='true') with sink ('sink'='iotdb-thrift-sink', 'sink.node-urls'='%s')",
receiverEnv.getDataNodeWrapperList().get(0).getIpAndPortString())));

TestUtils.assertDataEventuallyOnEnv(
receiverEnv,
"count timeseries root.sg_nonAligned.非对齐序列带有encoding和压缩方式.*",
"count(timeseries),",
Collections.singleton("4,"),
15);
TestUtils.assertDataAlwaysOnEnv(
receiverEnv,
"count timeseries root.sg_nonAligned.非对齐序列带有encoding和压缩方式.*",
"count(timeseries),",
Collections.singleton("4,"));
} finally {
TestUtils.executeNonQueries(
senderEnv,
Arrays.asList(
"drop pipe test_history", "drop pipe test_realtime", "drop database root.**"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -355,13 +355,18 @@ public void pinTsFileResource(
}
}

public void unpinTsFileResource(final TsFileResource resource, final @Nullable String pipeName)
public void unpinTsFileResource(
final TsFileResource resource,
final boolean shouldTransferModFile,
final @Nullable String pipeName)
throws IOException {
final File pinnedFile = getHardlinkOrCopiedFileInPipeDir(resource.getTsFile(), pipeName);
decreaseFileReference(pinnedFile, pipeName);
decreaseFileReference(
getHardlinkOrCopiedFileInPipeDir(resource.getTsFile(), pipeName), pipeName);

if (resource.sharedModFileExists()) {
decreaseFileReference(resource.getSharedModFile().getFile(), pipeName);
if (shouldTransferModFile && resource.exclusiveModFileExists()) {
decreaseFileReference(
getHardlinkOrCopiedFileInPipeDir(resource.getExclusiveModFile().getFile(), pipeName),
pipeName);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -904,7 +904,8 @@ private Event supplyTsFileEvent(final TsFileResource resource) {
return isReferenceCountIncreased ? event : null;
} finally {
try {
PipeDataNodeResourceManager.tsfile().unpinTsFileResource(resource, pipeName);
PipeDataNodeResourceManager.tsfile()
.unpinTsFileResource(resource, shouldTransferModFile, pipeName);
} catch (final IOException e) {
LOGGER.warn(
"Pipe {}@{}: failed to unpin TsFileResource after creating event, original path: {}",
Expand Down Expand Up @@ -989,7 +990,8 @@ public synchronized void close() {
if (resource instanceof TsFileResource) {
try {
PipeDataNodeResourceManager.tsfile()
.unpinTsFileResource((TsFileResource) resource, pipeName);
.unpinTsFileResource(
(TsFileResource) resource, shouldTransferModFile, pipeName);
} catch (final IOException e) {
LOGGER.warn(
"Pipe {}@{}: failed to unpin TsFileResource after dropping pipe, original path: {}",
Expand Down
Loading