Skip to content

Commit e48272a

Browse files
authored
Pipe: Fixed the log of disruptor queue & deleted the useless binary buffer (apache#17341)
* fix-disruptor * part * fix * fix * f * fix
1 parent 5f9cb66 commit e48272a

13 files changed

Lines changed: 47 additions & 242 deletions

File tree

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@
5353
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode;
5454
import org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectName;
5555
import org.apache.iotdb.db.storageengine.dataregion.memtable.DeviceIDFactory;
56-
import org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
5756
import org.apache.iotdb.pipe.api.access.Row;
5857
import org.apache.iotdb.pipe.api.collector.RowCollector;
5958
import org.apache.iotdb.pipe.api.collector.TabletCollector;
@@ -69,7 +68,8 @@
6968
import org.slf4j.Logger;
7069
import org.slf4j.LoggerFactory;
7170

72-
import java.nio.ByteBuffer;
71+
import javax.annotation.Nonnull;
72+
7373
import java.util.ArrayList;
7474
import java.util.Collection;
7575
import java.util.List;
@@ -159,18 +159,11 @@ public PipeInsertNodeTabletInsertionEvent(
159159
this.allocatedMemoryBlock = new AtomicReference<>();
160160
}
161161

162+
@Nonnull
162163
public InsertNode getInsertNode() {
163164
return insertNode;
164165
}
165166

166-
public ByteBuffer getByteBuffer() throws WALPipeException {
167-
final InsertNode node = insertNode;
168-
if (Objects.isNull(node)) {
169-
throw new PipeException("InsertNode has been released");
170-
}
171-
return node.serializeToByteBuffer();
172-
}
173-
174167
public String getDeviceId() {
175168
final InsertNode node = insertNode;
176169
if (Objects.isNull(node)) {
@@ -399,9 +392,6 @@ public boolean mayEventTimeOverlappedWithTimeRange() {
399392
public boolean mayEventPathsOverlappedWithPattern() {
400393
try {
401394
final InsertNode insertNode = getInsertNode();
402-
if (Objects.isNull(insertNode)) {
403-
return true;
404-
}
405395

406396
if (insertNode instanceof RelationalInsertRowNode
407397
|| insertNode instanceof RelationalInsertTabletNode

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -51,12 +51,10 @@
5151

5252
public class PipeTabletEventPlainBatch extends PipeTabletEventBatch {
5353

54-
private final List<ByteBuffer> binaryBuffers = new ArrayList<>();
5554
private final List<ByteBuffer> insertNodeBuffers = new ArrayList<>();
5655
private final List<ByteBuffer> tabletBuffers = new ArrayList<>();
5756

5857
private static final String TREE_MODEL_DATABASE_PLACEHOLDER = null;
59-
private final List<String> binaryDataBases = new ArrayList<>();
6058
private final List<String> insertNodeDataBases = new ArrayList<>();
6159
private final List<String> tabletDataBases = new ArrayList<>();
6260

@@ -90,11 +88,9 @@ protected boolean constructBatch(final TabletInsertionEvent event) throws IOExce
9088
public synchronized void onSuccess() {
9189
super.onSuccess();
9290

93-
binaryBuffers.clear();
9491
insertNodeBuffers.clear();
9592
tabletBuffers.clear();
9693

97-
binaryDataBases.clear();
9894
insertNodeDataBases.clear();
9995
tabletDataBases.clear();
10096
tableModelTabletMap.clear();
@@ -142,12 +138,7 @@ public PipeTransferTabletBatchReqV2 toTPipeTransferReq() throws IOException {
142138
tableModelTabletMap.clear();
143139

144140
return PipeTransferTabletBatchReqV2.toTPipeTransferReq(
145-
binaryBuffers,
146-
insertNodeBuffers,
147-
tabletBuffers,
148-
binaryDataBases,
149-
insertNodeDataBases,
150-
tabletDataBases);
141+
insertNodeBuffers, tabletBuffers, insertNodeDataBases, tabletDataBases);
151142
}
152143

153144
public Map<Pair<String, Long>, Long> deepCopyPipeName2BytesAccumulated() {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java

Lines changed: 8 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@
4545

4646
public class PipeTransferTabletBatchReq extends TPipeTransferReq {
4747

48-
private final transient List<PipeTransferTabletBinaryReq> binaryReqs = new ArrayList<>();
4948
private final transient List<PipeTransferTabletInsertNodeReq> insertNodeReqs = new ArrayList<>();
5049
private final transient List<PipeTransferTabletRawReq> tabletReqs = new ArrayList<>();
5150

@@ -61,26 +60,6 @@ public Pair<InsertRowsStatement, InsertMultiTabletsStatement> constructStatement
6160
final List<InsertRowStatement> insertRowStatementList = new ArrayList<>();
6261
final List<InsertTabletStatement> insertTabletStatementList = new ArrayList<>();
6362

64-
for (final PipeTransferTabletBinaryReq binaryReq : binaryReqs) {
65-
final InsertBaseStatement statement = binaryReq.constructStatement();
66-
if (statement.isEmpty()) {
67-
continue;
68-
}
69-
if (statement instanceof InsertRowStatement) {
70-
insertRowStatementList.add((InsertRowStatement) statement);
71-
} else if (statement instanceof InsertTabletStatement) {
72-
insertTabletStatementList.add((InsertTabletStatement) statement);
73-
} else if (statement instanceof InsertRowsStatement) {
74-
insertRowStatementList.addAll(
75-
((InsertRowsStatement) statement).getInsertRowStatementList());
76-
} else {
77-
throw new UnsupportedOperationException(
78-
String.format(
79-
"unknown InsertBaseStatement %s constructed from PipeTransferTabletBinaryReq.",
80-
binaryReq));
81-
}
82-
}
83-
8463
for (final PipeTransferTabletInsertNodeReq insertNodeReq : insertNodeReqs) {
8564
final InsertBaseStatement statement = insertNodeReq.constructStatement();
8665
if (statement.isEmpty()) {
@@ -117,24 +96,19 @@ public Pair<InsertRowsStatement, InsertMultiTabletsStatement> constructStatement
11796
/////////////////////////////// Thrift ///////////////////////////////
11897

11998
public static PipeTransferTabletBatchReq toTPipeTransferReq(
120-
final List<ByteBuffer> binaryBuffers,
121-
final List<ByteBuffer> insertNodeBuffers,
122-
final List<ByteBuffer> tabletBuffers)
99+
final List<ByteBuffer> insertNodeBuffers, final List<ByteBuffer> tabletBuffers)
123100
throws IOException {
124101
final PipeTransferTabletBatchReq batchReq = new PipeTransferTabletBatchReq();
125102

126-
// batchReq.binaryReqs, batchReq.insertNodeReqs, batchReq.tabletReqs are empty
103+
// batchReq.insertNodeReqs, batchReq.tabletReqs are empty
127104
// when this method is called from PipeTransferTabletBatchReqBuilder.toTPipeTransferReq()
128105

129106
batchReq.version = IoTDBSinkRequestVersion.VERSION_1.getVersion();
130107
batchReq.type = PipeRequestType.TRANSFER_TABLET_BATCH.getType();
131108
try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
132109
final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) {
133-
ReadWriteIOUtils.write(binaryBuffers.size(), outputStream);
134-
for (final ByteBuffer binaryBuffer : binaryBuffers) {
135-
ReadWriteIOUtils.write(binaryBuffer.limit(), outputStream);
136-
outputStream.write(binaryBuffer.array(), 0, binaryBuffer.limit());
137-
}
110+
// Binary buffer, for rolling upgrade
111+
ReadWriteIOUtils.write(0, outputStream);
138112

139113
ReadWriteIOUtils.write(insertNodeBuffers.size(), outputStream);
140114
for (final ByteBuffer insertNodeBuffer : insertNodeBuffers) {
@@ -157,16 +131,9 @@ public static PipeTransferTabletBatchReq fromTPipeTransferReq(
157131
final TPipeTransferReq transferReq) {
158132
final PipeTransferTabletBatchReq batchReq = new PipeTransferTabletBatchReq();
159133

134+
// Binary size, for rolling upgrade
135+
ReadWriteIOUtils.readInt(transferReq.body);
160136
int size = ReadWriteIOUtils.readInt(transferReq.body);
161-
for (int i = 0; i < size; ++i) {
162-
final int length = ReadWriteIOUtils.readInt(transferReq.body);
163-
final byte[] body = new byte[length];
164-
transferReq.body.get(body);
165-
batchReq.binaryReqs.add(
166-
PipeTransferTabletBinaryReq.toTPipeTransferReq(ByteBuffer.wrap(body)));
167-
}
168-
169-
size = ReadWriteIOUtils.readInt(transferReq.body);
170137
for (int i = 0; i < size; ++i) {
171138
batchReq.insertNodeReqs.add(
172139
PipeTransferTabletInsertNodeReq.toTPipeTransferRawReq(
@@ -188,11 +155,6 @@ public static PipeTransferTabletBatchReq fromTPipeTransferReq(
188155

189156
/////////////////////////////// TestOnly ///////////////////////////////
190157

191-
@TestOnly
192-
public List<PipeTransferTabletBinaryReq> getBinaryReqs() {
193-
return binaryReqs;
194-
}
195-
196158
@TestOnly
197159
public List<PipeTransferTabletInsertNodeReq> getInsertNodeReqs() {
198160
return insertNodeReqs;
@@ -214,8 +176,7 @@ public boolean equals(final Object obj) {
214176
return false;
215177
}
216178
final PipeTransferTabletBatchReq that = (PipeTransferTabletBatchReq) obj;
217-
return binaryReqs.equals(that.binaryReqs)
218-
&& insertNodeReqs.equals(that.insertNodeReqs)
179+
return insertNodeReqs.equals(that.insertNodeReqs)
219180
&& tabletReqs.equals(that.tabletReqs)
220181
&& version == that.version
221182
&& type == that.type
@@ -224,6 +185,6 @@ public boolean equals(final Object obj) {
224185

225186
@Override
226187
public int hashCode() {
227-
return Objects.hash(binaryReqs, insertNodeReqs, tabletReqs, version, type, body);
188+
return Objects.hash(insertNodeReqs, tabletReqs, version, type, body);
228189
}
229190
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReqV2.java

Lines changed: 7 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,6 @@
4444
import java.util.Objects;
4545

4646
public class PipeTransferTabletBatchReqV2 extends TPipeTransferReq {
47-
48-
private final transient List<PipeTransferTabletBinaryReqV2> binaryReqs = new ArrayList<>();
4947
private final transient List<PipeTransferTabletInsertNodeReqV2> insertNodeReqs =
5048
new ArrayList<>();
5149
private final transient List<PipeTransferTabletRawReqV2> tabletReqs = new ArrayList<>();
@@ -66,45 +64,6 @@ public List<InsertBaseStatement> constructStatements() {
6664
final Map<String, List<InsertRowStatement>> tableModelDatabaseInsertRowStatementMap =
6765
new HashMap<>();
6866

69-
for (final PipeTransferTabletBinaryReqV2 binaryReq : binaryReqs) {
70-
final InsertBaseStatement statement = binaryReq.constructStatement();
71-
if (statement.isEmpty()) {
72-
continue;
73-
}
74-
if (statement.isWriteToTable()) {
75-
if (statement instanceof InsertRowStatement) {
76-
tableModelDatabaseInsertRowStatementMap
77-
.computeIfAbsent(statement.getDatabaseName().get(), k -> new ArrayList<>())
78-
.add((InsertRowStatement) statement);
79-
} else if (statement instanceof InsertTabletStatement) {
80-
statements.add(statement);
81-
} else if (statement instanceof InsertRowsStatement) {
82-
tableModelDatabaseInsertRowStatementMap
83-
.computeIfAbsent(statement.getDatabaseName().get(), k -> new ArrayList<>())
84-
.addAll(((InsertRowsStatement) statement).getInsertRowStatementList());
85-
} else {
86-
throw new UnsupportedOperationException(
87-
String.format(
88-
"unknown InsertBaseStatement %s constructed from PipeTransferTabletBinaryReqV2.",
89-
binaryReq));
90-
}
91-
continue;
92-
}
93-
if (statement instanceof InsertRowStatement) {
94-
insertRowStatementList.add((InsertRowStatement) statement);
95-
} else if (statement instanceof InsertTabletStatement) {
96-
insertTabletStatementList.add((InsertTabletStatement) statement);
97-
} else if (statement instanceof InsertRowsStatement) {
98-
insertRowStatementList.addAll(
99-
((InsertRowsStatement) statement).getInsertRowStatementList());
100-
} else {
101-
throw new UnsupportedOperationException(
102-
String.format(
103-
"unknown InsertBaseStatement %s constructed from PipeTransferTabletBinaryReqV2.",
104-
binaryReq));
105-
}
106-
}
107-
10867
for (final PipeTransferTabletInsertNodeReqV2 insertNodeReq : insertNodeReqs) {
10968
final InsertBaseStatement statement = insertNodeReq.constructStatement();
11069
if (statement.isEmpty()) {
@@ -180,10 +139,8 @@ public List<InsertBaseStatement> constructStatements() {
180139
/////////////////////////////// Thrift ///////////////////////////////
181140

182141
public static PipeTransferTabletBatchReqV2 toTPipeTransferReq(
183-
final List<ByteBuffer> binaryBuffers,
184142
final List<ByteBuffer> insertNodeBuffers,
185143
final List<ByteBuffer> tabletBuffers,
186-
final List<String> binaryDataBases,
187144
final List<String> insertNodeDataBases,
188145
final List<String> tabletDataBases)
189146
throws IOException {
@@ -193,13 +150,8 @@ public static PipeTransferTabletBatchReqV2 toTPipeTransferReq(
193150
batchReq.type = PipeRequestType.TRANSFER_TABLET_BATCH_V2.getType();
194151
try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
195152
final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) {
196-
ReadWriteIOUtils.write(binaryBuffers.size(), outputStream);
197-
for (int i = 0; i < binaryBuffers.size(); i++) {
198-
final ByteBuffer binaryBuffer = binaryBuffers.get(i);
199-
ReadWriteIOUtils.write(binaryBuffer.limit(), outputStream);
200-
outputStream.write(binaryBuffer.array(), 0, binaryBuffer.limit());
201-
ReadWriteIOUtils.write(binaryDataBases.get(i), outputStream);
202-
}
153+
// Binary buffer, for rolling upgrade
154+
ReadWriteIOUtils.write(0, outputStream);
203155

204156
ReadWriteIOUtils.write(insertNodeBuffers.size(), outputStream);
205157
for (int i = 0; i < insertNodeBuffers.size(); i++) {
@@ -226,17 +178,10 @@ public static PipeTransferTabletBatchReqV2 fromTPipeTransferReq(
226178
final org.apache.iotdb.service.rpc.thrift.TPipeTransferReq transferReq) {
227179
final PipeTransferTabletBatchReqV2 batchReq = new PipeTransferTabletBatchReqV2();
228180

229-
int size = ReadWriteIOUtils.readInt(transferReq.body);
230-
for (int i = 0; i < size; ++i) {
231-
final int length = ReadWriteIOUtils.readInt(transferReq.body);
232-
final byte[] body = new byte[length];
233-
transferReq.body.get(body);
234-
batchReq.binaryReqs.add(
235-
PipeTransferTabletBinaryReqV2.toTPipeTransferBinaryReq(
236-
ByteBuffer.wrap(body), ReadWriteIOUtils.readString(transferReq.body)));
237-
}
181+
// Binary req, for rolling upgrade
182+
ReadWriteIOUtils.readInt(transferReq.body);
238183

239-
size = ReadWriteIOUtils.readInt(transferReq.body);
184+
int size = ReadWriteIOUtils.readInt(transferReq.body);
240185
for (int i = 0; i < size; ++i) {
241186
batchReq.insertNodeReqs.add(
242187
PipeTransferTabletInsertNodeReqV2.toTabletInsertNodeReq(
@@ -258,11 +203,6 @@ public static PipeTransferTabletBatchReqV2 fromTPipeTransferReq(
258203

259204
/////////////////////////////// TestOnly ///////////////////////////////
260205

261-
@TestOnly
262-
public List<PipeTransferTabletBinaryReqV2> getBinaryReqs() {
263-
return binaryReqs;
264-
}
265-
266206
@TestOnly
267207
public List<PipeTransferTabletInsertNodeReqV2> getInsertNodeReqs() {
268208
return insertNodeReqs;
@@ -284,8 +224,7 @@ public boolean equals(final Object obj) {
284224
return false;
285225
}
286226
final PipeTransferTabletBatchReqV2 that = (PipeTransferTabletBatchReqV2) obj;
287-
return Objects.equals(binaryReqs, that.binaryReqs)
288-
&& Objects.equals(insertNodeReqs, that.insertNodeReqs)
227+
return Objects.equals(insertNodeReqs, that.insertNodeReqs)
289228
&& Objects.equals(tabletReqs, that.tabletReqs)
290229
&& version == that.version
291230
&& type == that.type
@@ -294,6 +233,6 @@ public boolean equals(final Object obj) {
294233

295234
@Override
296235
public int hashCode() {
297-
return Objects.hash(binaryReqs, insertNodeReqs, tabletReqs, version, type, body);
236+
return Objects.hash(insertNodeReqs, tabletReqs, version, type, body);
298237
}
299238
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import org.apache.iotdb.db.pipe.metric.overview.PipeResourceMetrics;
3232
import org.apache.iotdb.db.pipe.metric.sink.PipeDataRegionSinkMetrics;
3333
import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferPlanNodeReq;
34-
import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletBinaryReqV2;
3534
import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletInsertNodeReqV2;
3635
import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletRawReqV2;
3736
import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTsFilePieceReq;
@@ -234,20 +233,14 @@ private void doTransferWrapper(
234233
private void doTransfer(
235234
final AirGapSocket socket,
236235
final PipeInsertNodeTabletInsertionEvent pipeInsertNodeTabletInsertionEvent)
237-
throws PipeException, WALPipeException, IOException {
236+
throws PipeException, IOException {
238237
final InsertNode insertNode = pipeInsertNodeTabletInsertionEvent.getInsertNode();
239238
final byte[] bytes =
240-
Objects.isNull(insertNode)
241-
? PipeTransferTabletBinaryReqV2.toTPipeTransferBytes(
242-
pipeInsertNodeTabletInsertionEvent.getByteBuffer(),
243-
pipeInsertNodeTabletInsertionEvent.isTableModelEvent()
244-
? pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName()
245-
: null)
246-
: PipeTransferTabletInsertNodeReqV2.toTPipeTransferBytes(
247-
insertNode,
248-
pipeInsertNodeTabletInsertionEvent.isTableModelEvent()
249-
? pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName()
250-
: null);
239+
PipeTransferTabletInsertNodeReqV2.toTPipeTransferBytes(
240+
insertNode,
241+
pipeInsertNodeTabletInsertionEvent.isTableModelEvent()
242+
? pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName()
243+
: null);
251244

252245
if (!send(
253246
pipeInsertNodeTabletInsertionEvent.getPipeName(),

0 commit comments

Comments
 (0)