Skip to content

Commit 6c8e1d5

Browse files
authored
Pipe: Fixed the on committed hook square bug & Trimmed the raw tablet hook & Fixed the premature report for source event & Skipped the parsing of time-covered tsFile (apache#17360)
* fix-data * fix
1 parent 55611f1 commit 6c8e1d5

3 files changed

Lines changed: 36 additions & 11 deletions

File tree

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -116,12 +116,14 @@ private PipeRawTabletInsertionEvent(
116116
this.allocatedMemoryBlock =
117117
PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(0);
118118

119-
addOnCommittedHook(
120-
() -> {
121-
if (shouldReportOnCommit) {
122-
eliminateProgressIndex();
123-
}
124-
});
119+
if (needToReport) {
120+
addOnCommittedHook(
121+
() -> {
122+
if (shouldReportOnCommit) {
123+
eliminateProgressIndex();
124+
}
125+
});
126+
}
125127
}
126128

127129
public PipeRawTabletInsertionEvent(
@@ -303,10 +305,8 @@ public boolean internallyDecreaseResourceReferenceCount(final String holderMessa
303305
}
304306

305307
protected void eliminateProgressIndex() {
306-
if (needToReport) {
307-
if (sourceEvent instanceof PipeTsFileInsertionEvent) {
308-
((PipeTsFileInsertionEvent) sourceEvent).eliminateProgressIndex();
309-
}
308+
if (sourceEvent instanceof PipeTsFileInsertionEvent) {
309+
((PipeTsFileInsertionEvent) sourceEvent).eliminateProgressIndex();
310310
}
311311
}
312312

@@ -387,6 +387,14 @@ public boolean mayEventPathsOverlappedWithPattern() {
387387
}
388388

389389
public void markAsNeedToReport() {
390+
if (!needToReport) {
391+
addOnCommittedHook(
392+
() -> {
393+
if (shouldReportOnCommit) {
394+
eliminateProgressIndex();
395+
}
396+
});
397+
}
390398
this.needToReport = true;
391399
}
392400

@@ -404,6 +412,11 @@ public EnrichedEvent getSourceEvent() {
404412
return sourceEvent;
405413
}
406414

415+
@Override
416+
public boolean isShouldReportOnCommit() {
417+
return shouldReportOnCommit && needToReport;
418+
}
419+
407420
/////////////////////////// TabletInsertionEvent ///////////////////////////
408421

409422
@Override

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -561,6 +561,17 @@ public boolean mayEventTimeOverlappedWithTimeRange() {
561561
|| startTime <= resource.getFileEndTime() && resource.getFileStartTime() <= endTime;
562562
}
563563

564+
@Override
565+
public boolean shouldParseTime() {
566+
if (!isTimeParsed
567+
&& Objects.nonNull(resource)
568+
&& startTime <= resource.getFileStartTime()
569+
&& resource.getFileEndTime() <= endTime) {
570+
isTimeParsed = true;
571+
}
572+
return !isTimeParsed;
573+
}
574+
564575
@Override
565576
public boolean mayEventPathsOverlappedWithPattern() {
566577
if (Objects.isNull(resource) || !resource.isClosed() || isTableModelEvent()) {

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/interval/PipeCommitInterval.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
2525
import org.apache.iotdb.commons.pipe.datastructure.interval.Interval;
2626

27+
import java.util.ArrayList;
2728
import java.util.List;
2829
import java.util.Objects;
2930

@@ -43,7 +44,7 @@ public PipeCommitInterval(
4344
this.pipeTaskMeta = pipeTaskMeta;
4445
this.currentIndex =
4546
Objects.nonNull(currentIndex) ? currentIndex : MinimumProgressIndex.INSTANCE;
46-
this.onCommittedHooks = onCommittedHooks;
47+
this.onCommittedHooks = new ArrayList<>(onCommittedHooks);
4748
}
4849

4950
@Override

0 commit comments

Comments
 (0)