reader : groupReaders) {
+ try {
+ reader.close();
+ } catch (IOException e) {
+ if (exception == null) {
+ exception = e;
+ } else {
+ exception.addSuppressed(e);
+ }
+ }
+ }
+ if (exception != null) {
+ throw exception;
+ }
+ }
+
+ /**
+ * Reads one blob sequence group (all blob files with the same max_seq_num) and emits
+ * placeholder rows for row id gaps. For example, if the full row range is [0, 100], but there's
+ * only one blob file with row range [20, 80], then the rows with row id [0, 19] and [81, 100]
+ * will be emitted as placeholder rows.
+ *
+ * This reader should always be fully consumed, or the internal states may be broken.
+ *
+ *
Note that we can not simply concat all data files and read them, even though we guarantee
+ * writing the full-range data during data-evolution. The complexity is introduced by row-level
+ * compaction. For example, if we execute following operations:
+ *
+ *
+ * Write [0, 100], generate files [0, 50], [51, 100]
+ * Update [0, 100] files, generate files [0, 25], [26, 75], [76, 100]
+ * Insert new blobs for range [101, 200], generate files [101, 200]
+ * Update new blobs for range [101, 200], generate files [101, 150], [151, 200]
+ * Compact, merge [0, 100], [101, 200] to a single range
+ * Update the compacted files, generate files [0, 200]
+ *
+ *
+ * The data files layout would be:
+ *
+ *
+ * |<----------------------- merged range: row 0 ~ 200 --------------------------------->|
+ * | |
+ * ┌─────────────────────────┐┌──────────────────────┐
+ * seq1: │ file1 (0~50) ││ file2 (51~100) │ (empty on 101~200)
+ * └─────────────────────────┘└──────────────────────┘
+ * ┌─────────────┐┌──────────────────────┐┌──────────┐
+ * seq2: │ file3(0~25) ││ file4 (26~75) ││f5(76~100)│ (empty on 101~200)
+ * └─────────────┘└──────────────────────┘└──────────┘
+ * ┌────────────────────────────────────┐
+ * seq3: (empty on 0~100) │ file6 (101~200) │
+ * └────────────────────────────────────┘
+ * ┌─────────────────┐┌─────────────────┐
+ * seq4: (empty on 0~100) │ file7 (101~150) ││ file8 (151~200) │
+ * └─────────────────┘└─────────────────┘
+ * ┌───────────────────────────────────────────────────────────────────────────────────────┐
+ * seq6: │ file9 (0~200) │
+ * └───────────────────────────────────────────────────────────────────────────────────────┘
+ *
+ *
+ * We treat all gaps as full-placeholders, and correctly resolve pushed-ranges.
+ */
+ public static class BlobSequenceGroupRecordReader implements RecordReader {
+
+ private final List files;
+ private final BlobFileReaderFactory readerFactory;
+ // pushed row ranges
+ private final List rowRanges;
+ private final RowType readRowType;
+ private final int blobIndex;
+ private final long lastRowId;
+
+ private RecordReader currentReader;
+ private DataFileMeta currentFile;
+ private int nextFileIndex;
+ private int nextRowRangeIndex;
+ // expected next row id
+ private long nextRowId;
+
+ private InternalRow placeholderRow;
+
+ BlobSequenceGroupRecordReader(
+ List files,
+ BlobFileReaderFactory readerFactory,
+ List rowRanges,
+ RowType readRowType,
+ int blobIndex,
+ long firstRowId,
+ long lastRowId) {
+ this.files = files;
+ this.readerFactory = readerFactory;
+ this.rowRanges = rowRanges == null ? null : Range.sortAndMergeOverlap(rowRanges);
+ this.readRowType = readRowType;
+ this.blobIndex = blobIndex;
+ this.lastRowId = lastRowId;
+
+ this.nextFileIndex = 0;
+ this.nextRowRangeIndex = 0;
+ setNextRowId(firstRowId);
+
+ this.placeholderRow = null;
+ }
+
+ @Nullable
+ @Override
+ public RecordIterator readBatch() throws IOException {
+ while (true) {
+ if (currentReader != null) {
+ RecordIterator batch = currentReader.readBatch();
+ if (batch != null) {
+ return batch;
+ }
+ // row ranges have been pushed to readers
+ // directly set nextRowId as the lastRowId + 1
+ setNextRowId(lastRowId(currentFile) + 1);
+ closeCurrentFileReader();
+ continue;
+ }
+
+ if (nextRowId > lastRowId) {
+ return null;
+ }
+
+ // skip files whose ranges are before nextRowId
+ while (nextFileIndex < files.size()
+ && lastRowId(files.get(nextFileIndex)) < nextRowId) {
+ nextFileIndex++;
+ }
+ if (nextFileIndex >= files.size()) {
+ return placeHolderBatch(lastRowId);
+ }
+
+ DataFileMeta nextFile = files.get(nextFileIndex);
+ if (nextFile.nonNullFirstRowId() > nextRowId) {
+ return placeHolderBatch(nextFile.nonNullFirstRowId() - 1);
+ }
+
+ createReader(nextFile);
+ }
+ }
+
+ /**
+ * Set nextRowId and try to move to the next selected row id. So the final nextRowId may be
+ * greater than the input value.
+ */
+ private void setNextRowId(long nextRowId) {
+ this.nextRowId = nextRowId;
+ tryMoveToSelectedRow();
+ }
+
+ private void tryMoveToSelectedRow() {
+ if (nextRowId > lastRowId || rowRanges == null) {
+ return;
+ }
+
+ while (nextRowRangeIndex < rowRanges.size()) {
+ Range range = rowRanges.get(nextRowRangeIndex);
+ if (nextRowId >= range.from && nextRowId <= range.to) {
+ // if nextRowId is within the range, do not need to move
+ return;
+ } else if (nextRowId < range.from) {
+ // else if nextRowId < next range, move to next range's `from`
+ nextRowId = range.from;
+ return;
+ }
+ // else nextRowId > range.to, try next range
+ nextRowRangeIndex++;
+ }
+
+ // all ranges consumed, no need to read
+ nextRowId = lastRowId + 1;
+ }
+
+ private RecordIterator placeHolderBatch(long endRowId) {
+ return new RecordIterator() {
+ long rowId;
+
+ @Nullable
+ @Override
+ public InternalRow next() {
+ rowId = nextRowId;
+ if (rowId > endRowId) {
+ return null;
+ }
+ setNextRowId(rowId + 1);
+ return placeHolderRow();
+ }
+
+ @Override
+ public void releaseBatch() {
+ // nothing to release
+ }
+ };
+ }
+
+ private InternalRow placeHolderRow() {
+ if (placeholderRow == null) {
+ GenericRow row = new GenericRow(readRowType.getFieldCount());
+ row.setField(blobIndex, BlobPlaceholder.INSTANCE);
+ placeholderRow = row;
+ }
+ return placeholderRow;
+ }
+
+ private long lastRowId(DataFileMeta file) {
+ return file.nonNullFirstRowId() + file.rowCount() - 1;
+ }
+
+ private void closeCurrentFileReader() throws IOException {
+ if (currentReader != null) {
+ currentReader.close();
+ currentReader = null;
+ }
+ currentFile = null;
+ }
+
+ private void createReader(DataFileMeta nextFile) throws IOException {
+ currentFile = nextFile;
+ currentReader = readerFactory.create(nextFile);
+ nextFileIndex++;
+ }
+
+ @Override
+ public void close() throws IOException {
+ closeCurrentFileReader();
+ }
+ }
+
+ /** Factory to create readers. */
+ interface BlobFileReaderFactory {
+ RecordReader create(DataFileMeta file) throws IOException;
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
index 0df1b1428ebb..0761a3cb3a4e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
@@ -46,10 +46,12 @@
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypeRoot;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.FormatReaderMapping;
import org.apache.paimon.utils.FormatReaderMapping.Builder;
+import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.Range;
import org.apache.paimon.utils.RangeHelper;
import org.apache.paimon.utils.RoaringBitmap32;
@@ -228,7 +230,8 @@ private DataEvolutionFileReader createUnionReader(
|| isVectorStoreFile(file.fileName()),
"Only blob/vector-store files need to call this method.");
return schemaFetcher.apply(file.schemaId()).logicalRowType();
- });
+ },
+ rowRanges != null);
long rowCount = fieldsFiles.get(0).rowCount();
long firstRowId = fieldsFiles.get(0).files().get(0).nonNullFirstRowId();
@@ -302,15 +305,16 @@ private DataEvolutionFileReader createUnionReader(
dataSchema,
readFields,
false));
+ RowType partialReadRowType = new RowType(readFields);
fileRecordReaders[i] =
new ForceSingleBatchReader(
- createFileReader(
+ createFieldBunchReader(
partition,
bunch,
dataFilePathFactory,
formatReaderMapping,
rowRanges,
- readRowType));
+ partialReadRowType));
}
}
@@ -327,6 +331,83 @@ private DataEvolutionFileReader createUnionReader(
return new DataEvolutionFileReader(rowOffsets, fieldOffsets, fileRecordReaders);
}
+ private RecordReader createFieldBunchReader(
+ BinaryRow partition,
+ FieldBunch bunch,
+ DataFilePathFactory dataFilePathFactory,
+ FormatReaderMapping formatReaderMapping,
+ List rowRanges,
+ RowType readRowType)
+ throws IOException {
+ if (bunch instanceof DataBunch) {
+ // for data bunch, directly read the single file
+ return createFileReader(
+ partition,
+ bunch.files().get(0),
+ dataFilePathFactory,
+ formatReaderMapping,
+ rowRanges,
+ readRowType);
+ } else if (bunch instanceof VectorFileBunch) {
+ // for vector bunch, sequential read all data files and concat them
+ List> readerSuppliers = new ArrayList<>();
+ for (DataFileMeta file : bunch.files()) {
+ RoaringBitmap32 selection = file.toFileSelection(rowRanges);
+ FormatReaderContext formatReaderContext =
+ new FormatReaderContext(
+ fileIO,
+ dataFilePathFactory.toPath(file),
+ file.fileSize(),
+ selection);
+ readerSuppliers.add(
+ () ->
+ new DataFileRecordReader(
+ readRowType,
+ formatReaderMapping.getReaderFactory(),
+ formatReaderContext,
+ coreOptions.scanIgnoreCorruptFile(),
+ coreOptions.scanIgnoreLostFile(),
+ formatReaderMapping.getIndexMapping(),
+ formatReaderMapping.getCastMapping(),
+ PartitionUtils.create(
+ formatReaderMapping.getPartitionPair(), partition),
+ true,
+ file.firstRowId(),
+ file.maxSequenceNumber(),
+ formatReaderMapping.getSystemFields()));
+ }
+ return ConcatRecordReader.create(readerSuppliers);
+ } else if (bunch instanceof BlobFileBunch) {
+ // for blob funch, fallback on placeholders
+ int blobIndex = findBlobFieldIndex(readRowType);
+ checkArgument(blobIndex >= 0, "Blob bunch read type should contain a blob field.");
+ return new BlobFallbackRecordReader(
+ bunch.files(),
+ file ->
+ createFileReader(
+ partition,
+ file,
+ dataFilePathFactory,
+ formatReaderMapping,
+ rowRanges,
+ readRowType),
+ rowRanges,
+ readRowType,
+ blobIndex);
+ } else {
+ throw new UnsupportedOperationException("Unsupported bunch type: " + bunch);
+ }
+ }
+
+ private static int findBlobFieldIndex(RowType rowType) {
+ for (int i = 0; i < rowType.getFieldCount(); i++) {
+ if (rowType.getTypeAt(i).getTypeRoot() == DataTypeRoot.BLOB) {
+ return i;
+ }
+ }
+ return -1;
+ }
+
private FileRecordReader createFileReader(
BinaryRow partition,
DataFilePathFactory dataFilePathFactory,
@@ -351,49 +432,6 @@ private FileRecordReader createFileReader(
partition, file, dataFilePathFactory, formatReaderMapping, rowRanges, readRowType);
}
- private RecordReader createFileReader(
- BinaryRow partition,
- FieldBunch bunch,
- DataFilePathFactory dataFilePathFactory,
- FormatReaderMapping formatReaderMapping,
- List rowRanges,
- RowType readRowType)
- throws IOException {
- if (bunch.files().size() == 1) {
- return createFileReader(
- partition,
- bunch.files().get(0),
- dataFilePathFactory,
- formatReaderMapping,
- rowRanges,
- readRowType);
- }
- List> readerSuppliers = new ArrayList<>();
- for (DataFileMeta file : bunch.files()) {
- RoaringBitmap32 selection = file.toFileSelection(rowRanges);
- FormatReaderContext formatReaderContext =
- new FormatReaderContext(
- fileIO, dataFilePathFactory.toPath(file), file.fileSize(), selection);
- readerSuppliers.add(
- () ->
- new DataFileRecordReader(
- readRowType,
- formatReaderMapping.getReaderFactory(),
- formatReaderContext,
- coreOptions.scanIgnoreCorruptFile(),
- coreOptions.scanIgnoreLostFile(),
- formatReaderMapping.getIndexMapping(),
- formatReaderMapping.getCastMapping(),
- PartitionUtils.create(
- formatReaderMapping.getPartitionPair(), partition),
- true,
- file.firstRowId(),
- file.maxSequenceNumber(),
- formatReaderMapping.getSystemFields()));
- }
- return ConcatRecordReader.create(readerSuppliers);
- }
-
private FileRecordReader createFileReader(
BinaryRow partition,
DataFileMeta file,
@@ -433,8 +471,8 @@ public static List splitFieldBunches(
Function fileToRowType,
boolean rowIdPushDown) {
List fieldsFiles = new ArrayList<>();
- Map blobBunchMap = new HashMap<>();
- Map vectorStoreBunchMap = new TreeMap<>();
+ Map blobBunchMap = new HashMap<>();
+ Map vectorStoreBunchMap = new TreeMap<>();
long rowCount = -1;
for (DataFileMeta file : needMergeFiles) {
if (isBlobFile(file.fileName())) {
@@ -443,8 +481,7 @@ public static List splitFieldBunches(
final long expectedRowCount = rowCount;
blobBunchMap
.computeIfAbsent(
- fieldId,
- key -> new SpecialFieldBunch(expectedRowCount, rowIdPushDown))
+ fieldId, key -> new BlobFileBunch(expectedRowCount, rowIdPushDown))
.add(file);
} else if (isVectorStoreFile(file.fileName())) {
RowType rowType = fileToRowType.apply(file);
@@ -456,7 +493,7 @@ public static List splitFieldBunches(
vectorStoreBunchMap
.computeIfAbsent(
vectorStoreKey,
- key -> new SpecialFieldBunch(expectedRowCount, rowIdPushDown))
+ key -> new VectorFileBunch(expectedRowCount, rowIdPushDown))
.add(file);
} else {
// Normal file, just add it to the current merge split
@@ -496,8 +533,69 @@ public List files() {
}
}
+ /**
+ * The {@link FieldBunch} for blobs. Compared to {@link VectorFileBunch} which only contains
+ * data files of the max max_seq number, {@link BlobFileBunch} contains all blob files.
+ */
@VisibleForTesting
- static class SpecialFieldBunch implements FieldBunch {
+ static class BlobFileBunch implements FieldBunch {
+
+ final List files;
+ final List ranges;
+ final long expectedRowCount;
+ final boolean rowIdPushdown;
+
+ BlobFileBunch(long expectedRowCount, boolean rowIdPushdown) {
+ this.files = new ArrayList<>();
+ this.expectedRowCount = expectedRowCount;
+ this.ranges = new ArrayList<>();
+ this.rowIdPushdown = rowIdPushdown;
+ }
+
+ void add(DataFileMeta file) {
+ if (!isBlobFile(file.fileName())) {
+ throw new IllegalArgumentException("Only blob file can be added to this bunch.");
+ }
+ if (!files.isEmpty()) {
+ checkArgument(
+ file.writeCols().equals(files.get(0).writeCols()),
+ "All files in this bunch should have the same write columns.");
+ }
+
+ files.add(file);
+ ranges.add(file.nonNullRowIdRange());
+ }
+
+ @Override
+ public long rowCount() {
+ List merged = Range.sortAndMergeOverlap(ranges, true);
+ if (!rowIdPushdown) {
+ Preconditions.checkState(
+ merged.size() == 1,
+ "Blob file bunch should always contain a contiguous row range.");
+
+ long rowCount = merged.get(0).count();
+ if (expectedRowCount >= 0) {
+ Preconditions.checkState(
+ rowCount == expectedRowCount,
+ "The merged rowCount %s of blob file bunch should be aligned with normal files %s.",
+ rowCount,
+ expectedRowCount);
+ }
+ }
+
+ return merged.stream().mapToLong(Range::count).sum();
+ }
+
+ @Override
+ public List files() {
+ return files;
+ }
+ }
+
+ /** {@link FieldBunch} for vector-store files. */
+ @VisibleForTesting
+ static class VectorFileBunch implements FieldBunch {
final List files;
final long expectedRowCount;
@@ -508,72 +606,67 @@ static class SpecialFieldBunch implements FieldBunch {
long latestMaxSequenceNumber = -1;
long rowCount;
- SpecialFieldBunch(long expectedRowCount, boolean rowIdPushDown) {
+ VectorFileBunch(long expectedRowCount, boolean rowIdPushDown) {
this.files = new ArrayList<>();
- this.rowCount = 0;
this.expectedRowCount = expectedRowCount;
this.rowIdPushDown = rowIdPushDown;
}
void add(DataFileMeta file) {
- if (!isBlobFile(file.fileName()) && !isVectorStoreFile(file.fileName())) {
+ if (!isVectorStoreFile(file.fileName())) {
throw new IllegalArgumentException(
- "Only blob/vector-store file can be added to this bunch.");
+ "Only vector-store file can be added to this bunch.");
}
-
if (file.nonNullFirstRowId() == latestFistRowId) {
if (file.maxSequenceNumber() >= latestMaxSequenceNumber) {
throw new IllegalArgumentException(
- "Blob/vector-store file with same first row id should have decreasing sequence number.");
+ "Vector file with same first row id should have decreasing sequence number.");
}
return;
}
+
if (!files.isEmpty()) {
long firstRowId = file.nonNullFirstRowId();
- if (rowIdPushDown) {
- if (firstRowId < expectedNextFirstRowId) {
- if (file.maxSequenceNumber() > latestMaxSequenceNumber) {
- DataFileMeta lastFile = files.remove(files.size() - 1);
- rowCount -= lastFile.rowCount();
- } else {
- return;
- }
- }
- } else {
- if (firstRowId < expectedNextFirstRowId) {
- checkArgument(
- file.maxSequenceNumber() < latestMaxSequenceNumber,
- "Blob/vector-store file with overlapping row id should have decreasing sequence number.");
+ if (rowIdPushDown && firstRowId < expectedNextFirstRowId) {
+ if (file.maxSequenceNumber() > latestMaxSequenceNumber) {
+ DataFileMeta lastFile = files.remove(files.size() - 1);
+ rowCount -= lastFile.rowCount();
+ } else {
return;
- } else if (firstRowId > expectedNextFirstRowId) {
- throw new IllegalArgumentException(
- "Blob/vector-store file first row id should be continuous, expect "
- + expectedNextFirstRowId
- + " but got "
- + firstRowId);
}
+ } else if (firstRowId < expectedNextFirstRowId) {
+ checkArgument(
+ file.maxSequenceNumber() < latestMaxSequenceNumber,
+ "Vector file with overlapping row id should have decreasing sequence number.");
+ return;
+ } else if (!rowIdPushDown && firstRowId > expectedNextFirstRowId) {
+ throw new IllegalArgumentException(
+ "Vector file first row id should be continuous, expect "
+ + expectedNextFirstRowId
+ + " but got "
+ + firstRowId);
}
+
if (!files.isEmpty()) {
- if (!isBlobFile(file.fileName())) {
- checkArgument(
- file.schemaId() == files.get(0).schemaId(),
- "All files in this bunch should have the same schema id.");
- }
+ checkArgument(
+ file.schemaId() == files.get(0).schemaId(),
+ "All files in this bunch should have the same schema id.");
checkArgument(
file.writeCols().equals(files.get(0).writeCols()),
"All files in this bunch should have the same write columns.");
}
}
+
files.add(file);
rowCount += file.rowCount();
- if (expectedRowCount >= 0) {
+ if (expectedRowCount > 0) {
checkArgument(
rowCount <= expectedRowCount,
- "Blob/vector-store files row count exceed the expect " + expectedRowCount);
+ "Vector files row count exceed the expect " + expectedRowCount);
}
- this.latestMaxSequenceNumber = file.maxSequenceNumber();
- this.latestFistRowId = file.nonNullFirstRowId();
- this.expectedNextFirstRowId = latestFistRowId + file.rowCount();
+ latestMaxSequenceNumber = file.maxSequenceNumber();
+ latestFistRowId = file.nonNullFirstRowId();
+ expectedNextFirstRowId = latestFistRowId + file.rowCount();
}
@Override
diff --git a/paimon-core/src/test/java/org/apache/paimon/append/BlobUpdateTest.java b/paimon-core/src/test/java/org/apache/paimon/append/BlobUpdateTest.java
new file mode 100644
index 000000000000..2afb262c0c04
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/append/BlobUpdateTest.java
@@ -0,0 +1,499 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.append;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Blob;
+import org.apache.paimon.data.BlobData;
+import org.apache.paimon.data.BlobPlaceholder;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.format.FormatWriter;
+import org.apache.paimon.format.blob.BlobFileFormat;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.globalindex.IndexedSplit;
+import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataFilePathFactory;
+import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.manifest.FileSource;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.stats.SimpleStats;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.TableTestBase;
+import org.apache.paimon.table.sink.CommitMessageImpl;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.TableScan;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Range;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+/** Tests for blob update scenarios. */
+public class BlobUpdateTest extends TableTestBase {
+
+ private final byte[] blobBytes = randomBytes();
+
+ @Test
+ public void testReadBlobPlaceHolderFallback() throws Exception {
+ createTableDefault();
+ writeDataDefault(
+ Arrays.asList(
+ GenericRow.of(1, BinaryString.fromString("first"), new BlobData(blobBytes)),
+ GenericRow.of(
+ 2, BinaryString.fromString("second"), new BlobData(blobBytes)),
+ GenericRow.of(
+ 3, BinaryString.fromString("third"), new BlobData(blobBytes))));
+
+ FileStoreTable table = getTableDefault();
+ byte[] updatedBytes = "updated-blob".getBytes();
+ DataFilePathFactory pathFactory =
+ table.store().pathFactory().createDataFilePathFactory(BinaryRow.EMPTY_ROW, 0);
+ DataFileMeta newBlobFile =
+ writeBlobFile(
+ table.fileIO(),
+ pathFactory.newBlobPath(),
+ Arrays.asList(BlobPlaceholder.INSTANCE, new BlobData(updatedBytes)),
+ 0,
+ 2,
+ table.schema().id(),
+ Collections.singletonList("f2"));
+ commitBlobFiles(newBlobFile);
+
+ List actual = new ArrayList<>();
+ ReadBuilder readBuilder = getTableDefault().newReadBuilder();
+ RecordReader reader =
+ readBuilder.newRead().createReader(readBuilder.newScan().plan());
+ reader.forEachRemaining(row -> actual.add(row.getBlob(2).toData()));
+
+ assertThat(actual.size()).isEqualTo(3);
+ assertThat(actual.get(0)).isEqualTo(blobBytes);
+ assertThat(actual.get(1)).isEqualTo(updatedBytes);
+ assertThat(actual.get(2)).isEqualTo(blobBytes);
+ }
+
+ @Test
+ public void testReadBlobPlaceHolderFallbackWithRowIdPushDown() throws Exception {
+ createTableDefault();
+
+ List originalBlobs = new ArrayList<>();
+ List rows = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ byte[] bytes = fixedBlobBytes(i);
+ originalBlobs.add(bytes);
+ rows.add(GenericRow.of(i, BinaryString.fromString("row-" + i), new BlobData(bytes)));
+ }
+ writeDataDefault(rows);
+
+ FileStoreTable table = getTableDefault();
+ assertBlobFileRowIdRanges(
+ table,
+ Arrays.asList(
+ new Range(0L, 2L),
+ new Range(3L, 5L),
+ new Range(6L, 8L),
+ new Range(9L, 9L)));
+
+ DataFilePathFactory pathFactory =
+ table.store().pathFactory().createDataFilePathFactory(BinaryRow.EMPTY_ROW, 0);
+ byte[] updated4 = fixedBlobBytes(44);
+ byte[] updated9 = fixedBlobBytes(99);
+ DataFileMeta updateFile0 =
+ writeBlobFile(
+ table.fileIO(),
+ pathFactory.newBlobPath(),
+ Arrays.asList(
+ BlobPlaceholder.INSTANCE,
+ BlobPlaceholder.INSTANCE,
+ BlobPlaceholder.INSTANCE,
+ BlobPlaceholder.INSTANCE,
+ new BlobData(updated4)),
+ 0,
+ 2,
+ table.schema().id(),
+ Collections.singletonList("f2"));
+ DataFileMeta updateFile1 =
+ writeBlobFile(
+ table.fileIO(),
+ pathFactory.newBlobPath(),
+ Arrays.asList(
+ BlobPlaceholder.INSTANCE,
+ BlobPlaceholder.INSTANCE,
+ BlobPlaceholder.INSTANCE,
+ BlobPlaceholder.INSTANCE,
+ new BlobData(updated9)),
+ 5,
+ 2,
+ table.schema().id(),
+ Collections.singletonList("f2"));
+ commitBlobFiles(updateFile0, updateFile1);
+
+ FileStoreTable readTable = getTableDefault();
+ ReadBuilder readBuilder =
+ readTable
+ .newReadBuilder()
+ .withReadType(readTable.rowType().project(Collections.singletonList("f2")))
+ .withRowRanges(Arrays.asList(new Range(5L, 5L), new Range(9L, 9L)));
+ TableScan.Plan plan = readBuilder.newScan().plan();
+ assertThat(plan.splits().size()).isEqualTo(1);
+ DataSplit dataSplit = ((IndexedSplit) plan.splits().get(0)).dataSplit();
+ assertThat(dataSplit.dataFiles().size()).isEqualTo(3);
+ RecordReader reader = readBuilder.newRead().createReader(plan);
+
+ List actual = new ArrayList<>();
+ reader.forEachRemaining(row -> actual.add(row.getBlob(0).toData()));
+
+ assertThat(actual.size()).isEqualTo(2);
+ assertThat(actual.get(0)).isEqualTo(originalBlobs.get(5));
+ assertThat(actual.get(1)).isEqualTo(updated9);
+ }
+
+ /**
+ * This test manually simulates the compacted layout described in BlobSequenceGroupRecordReader,
+ * scaled down to row id [0, 9].
+ *
+ *
+ * row id: 0 1 2 3 4 5 6 7 8 9
+ * seq1: [b0 b1 b2] [b3 b4] . . . . .
+ * seq2: [u20 P] [P u23][u24] . . . . .
+ * seq3: . . . . . [b5 b6 b7 b8 b9]
+ * seq4: . . . . . [P u46 P] [u48 P]
+ * seq6: [P u61 P P P P P P P u69]
+ *
+ * RESULT: u20 u61 b2 u23 u24 b5 u46 b7 u48 u69
+ *
+ */
+ @Test
+ public void testReadCompactedBlobSequenceGroups() throws Exception {
+ createTableDefault();
+
+ List baseBlobs = new ArrayList<>();
+ List rows = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ byte[] bytes = fixedBlobBytes(i);
+ baseBlobs.add(bytes);
+ rows.add(GenericRow.of(i, BinaryString.fromString("row-" + i), new BlobData(bytes)));
+ }
+ writeDataDefault(rows);
+
+ FileStoreTable table = getTableDefault();
+ List oldBlobFiles = blobFiles(table);
+ assertThat(oldBlobFiles.size()).isEqualTo(4);
+
+ byte[] seq2Row0 = fixedBlobBytes(20);
+ byte[] seq2Row3 = fixedBlobBytes(23);
+ byte[] seq2Row4 = fixedBlobBytes(24);
+ byte[] seq4Row6 = fixedBlobBytes(46);
+ byte[] seq4Row8 = fixedBlobBytes(48);
+ byte[] seq6Row1 = fixedBlobBytes(61);
+ byte[] seq6Row9 = fixedBlobBytes(69);
+
+ DataFilePathFactory pathFactory =
+ table.store().pathFactory().createDataFilePathFactory(BinaryRow.EMPTY_ROW, 0);
+ List compactedLayout =
+ Arrays.asList(
+ writeBlobFile(
+ table.fileIO(),
+ pathFactory.newBlobPath(),
+ Arrays.asList(
+ new BlobData(baseBlobs.get(0)),
+ new BlobData(baseBlobs.get(1)),
+ new BlobData(baseBlobs.get(2))),
+ 0,
+ 1,
+ table.schema().id(),
+ Collections.singletonList("f2")),
+ writeBlobFile(
+ table.fileIO(),
+ pathFactory.newBlobPath(),
+ Arrays.asList(
+ new BlobData(baseBlobs.get(3)),
+ new BlobData(baseBlobs.get(4))),
+ 3,
+ 1,
+ table.schema().id(),
+ Collections.singletonList("f2")),
+ writeBlobFile(
+ table.fileIO(),
+ pathFactory.newBlobPath(),
+ Arrays.asList(new BlobData(seq2Row0), BlobPlaceholder.INSTANCE),
+ 0,
+ 2,
+ table.schema().id(),
+ Collections.singletonList("f2")),
+ writeBlobFile(
+ table.fileIO(),
+ pathFactory.newBlobPath(),
+ Arrays.asList(BlobPlaceholder.INSTANCE, new BlobData(seq2Row3)),
+ 2,
+ 2,
+ table.schema().id(),
+ Collections.singletonList("f2")),
+ writeBlobFile(
+ table.fileIO(),
+ pathFactory.newBlobPath(),
+ Collections.singletonList(new BlobData(seq2Row4)),
+ 4,
+ 2,
+ table.schema().id(),
+ Collections.singletonList("f2")),
+ writeBlobFile(
+ table.fileIO(),
+ pathFactory.newBlobPath(),
+ Arrays.asList(
+ new BlobData(baseBlobs.get(5)),
+ new BlobData(baseBlobs.get(6)),
+ new BlobData(baseBlobs.get(7)),
+ new BlobData(baseBlobs.get(8)),
+ new BlobData(baseBlobs.get(9))),
+ 5,
+ 3,
+ table.schema().id(),
+ Collections.singletonList("f2")),
+ writeBlobFile(
+ table.fileIO(),
+ pathFactory.newBlobPath(),
+ Arrays.asList(
+ BlobPlaceholder.INSTANCE,
+ new BlobData(seq4Row6),
+ BlobPlaceholder.INSTANCE),
+ 5,
+ 4,
+ table.schema().id(),
+ Collections.singletonList("f2")),
+ writeBlobFile(
+ table.fileIO(),
+ pathFactory.newBlobPath(),
+ Arrays.asList(new BlobData(seq4Row8), BlobPlaceholder.INSTANCE),
+ 8,
+ 4,
+ table.schema().id(),
+ Collections.singletonList("f2")),
+ writeBlobFile(
+ table.fileIO(),
+ pathFactory.newBlobPath(),
+ Arrays.asList(
+ BlobPlaceholder.INSTANCE,
+ new BlobData(seq6Row1),
+ BlobPlaceholder.INSTANCE,
+ BlobPlaceholder.INSTANCE,
+ BlobPlaceholder.INSTANCE,
+ BlobPlaceholder.INSTANCE,
+ BlobPlaceholder.INSTANCE,
+ BlobPlaceholder.INSTANCE,
+ BlobPlaceholder.INSTANCE,
+ new BlobData(seq6Row9)),
+ 0,
+ 6,
+ table.schema().id(),
+ Collections.singletonList("f2")));
+
+ commitDataFiles(compactedLayout, oldBlobFiles);
+
+ FileStoreTable readTable = getTableDefault();
+ assertBlobFileLayout(
+ readTable,
+ Arrays.asList(
+ "seq1:0-2",
+ "seq1:3-4",
+ "seq2:0-1",
+ "seq2:2-3",
+ "seq2:4-4",
+ "seq3:5-9",
+ "seq4:5-7",
+ "seq4:8-9",
+ "seq6:0-9"));
+
+ ReadBuilder readBuilder = readTable.newReadBuilder();
+ RecordReader reader =
+ readBuilder.newRead().createReader(readBuilder.newScan().plan());
+
+ List actual = new ArrayList<>();
+ reader.forEachRemaining(row -> actual.add(row.getBlob(2).toData()));
+
+ List expected =
+ Arrays.asList(
+ seq2Row0,
+ seq6Row1,
+ baseBlobs.get(2),
+ seq2Row3,
+ seq2Row4,
+ baseBlobs.get(5),
+ seq4Row6,
+ baseBlobs.get(7),
+ seq4Row8,
+ seq6Row9);
+ assertThat(actual.size()).isEqualTo(expected.size());
+ for (int i = 0; i < expected.size(); i++) {
+ assertThat(actual.get(i)).isEqualTo(expected.get(i));
+ }
+ }
+
+ private void commitBlobFiles(DataFileMeta... dataFiles) throws Exception {
+ commitDataFiles(Arrays.asList(dataFiles), Collections.emptyList());
+ }
+
+ private void commitDataFiles(List newFiles, List deletedFiles)
+ throws Exception {
+ commitDefault(
+ Collections.singletonList(
+ new CommitMessageImpl(
+ BinaryRow.EMPTY_ROW,
+ 0,
+ null,
+ new DataIncrement(newFiles, deletedFiles, Collections.emptyList()),
+ CompactIncrement.emptyIncrement())));
+ }
+
+ private static List blobFiles(FileStoreTable table) {
+ List actual = new ArrayList<>();
+ for (ManifestEntry entry : table.store().newScan().plan().files()) {
+ DataFileMeta file = entry.file();
+ if (BlobFileFormat.isBlobFile(file.fileName())) {
+ actual.add(file);
+ }
+ }
+ return actual;
+ }
+
+ private static void assertBlobFileRowIdRanges(FileStoreTable table, List expected) {
+ List actual = new ArrayList<>();
+ for (DataFileMeta file : blobFiles(table)) {
+ actual.add(file.nonNullRowIdRange());
+ }
+ Collections.sort(actual, (left, right) -> Long.compare(left.from, right.from));
+ assertThat(actual).isEqualTo(expected);
+ }
+
+ private static void assertBlobFileLayout(FileStoreTable table, List expected) {
+ List files = blobFiles(table);
+ Collections.sort(
+ files,
+ (left, right) -> {
+ int seqCompare =
+ Long.compare(left.maxSequenceNumber(), right.maxSequenceNumber());
+ if (seqCompare != 0) {
+ return seqCompare;
+ }
+ int fromCompare =
+ Long.compare(left.nonNullFirstRowId(), right.nonNullFirstRowId());
+ if (fromCompare != 0) {
+ return fromCompare;
+ }
+ return Long.compare(left.nonNullRowIdRange().to, right.nonNullRowIdRange().to);
+ });
+ List actual = new ArrayList<>();
+ for (DataFileMeta file : files) {
+ Range range = file.nonNullRowIdRange();
+ actual.add("seq" + file.maxSequenceNumber() + ":" + range.from + "-" + range.to);
+ }
+ assertThat(actual).isEqualTo(expected);
+ }
+
+ private static byte[] fixedBlobBytes(int value) {
+ byte[] bytes = new byte[2 * 1024 * 124];
+ Arrays.fill(bytes, (byte) value);
+ return bytes;
+ }
+
+ private static DataFileMeta writeBlobFile(
+ FileIO fileIO,
+ Path path,
+ List blobs,
+ long firstRowId,
+ long maxSequenceNumber,
+ long schemaId,
+ List writeCols)
+ throws IOException {
+ try (PositionOutputStream out = fileIO.newOutputStream(path, false)) {
+ FormatWriter writer =
+ new BlobFileFormat()
+ .createWriterFactory(RowType.of(DataTypes.BLOB()))
+ .create(out, "none");
+ for (Blob blob : blobs) {
+ writer.addElement(GenericRow.of(blob));
+ }
+ writer.close();
+ }
+ return DataFileMeta.create(
+ path.getName(),
+ fileIO.getFileSize(path),
+ blobs.size(),
+ DataFileMeta.EMPTY_MIN_KEY,
+ DataFileMeta.EMPTY_MAX_KEY,
+ SimpleStats.EMPTY_STATS,
+ SimpleStats.EMPTY_STATS,
+ maxSequenceNumber,
+ maxSequenceNumber,
+ schemaId,
+ DataFileMeta.DUMMY_LEVEL,
+ Collections.emptyList(),
+ Timestamp.fromEpochMillis(System.currentTimeMillis()),
+ 0L,
+ null,
+ FileSource.APPEND,
+ null,
+ null,
+ firstRowId,
+ writeCols);
+ }
+
+ @Override
+ protected Schema schemaDefault() {
+ Schema.Builder schemaBuilder = Schema.newBuilder();
+ schemaBuilder.column("f0", DataTypes.INT());
+ schemaBuilder.column("f1", DataTypes.STRING());
+ schemaBuilder.column("f2", DataTypes.BLOB());
+ schemaBuilder.option(CoreOptions.TARGET_FILE_SIZE.key(), "25 MB");
+ schemaBuilder.option(CoreOptions.BLOB_TARGET_FILE_SIZE.key(), "700 KB");
+ schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
+ schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
+ return schemaBuilder.build();
+ }
+
+ @Override
+ protected InternalRow dataDefault(int time, int size) {
+ return GenericRow.of(
+ RANDOM.nextInt(), BinaryString.fromBytes(randomBytes()), new BlobData(blobBytes));
+ }
+
+ @Override
+ protected byte[] randomBytes() {
+ byte[] binary = new byte[2 * 1024 * 124];
+ RANDOM.nextBytes(binary);
+ return binary;
+ }
+}
diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/BlobFallbackRecordReaderTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/BlobFallbackRecordReaderTest.java
new file mode 100644
index 000000000000..dcea0a6ff130
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/operation/BlobFallbackRecordReaderTest.java
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation;
+
+import org.apache.paimon.data.BlobData;
+import org.apache.paimon.data.BlobPlaceholder;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.FileSource;
+import org.apache.paimon.operation.BlobFallbackRecordReader.BlobSequenceGroupRecordReader;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.reader.RecordReader.RecordIterator;
+import org.apache.paimon.stats.SimpleStats;
+import org.apache.paimon.table.SpecialFields;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Range;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link BlobFallbackRecordReader}. */
+public class BlobFallbackRecordReaderTest {
+
+ private static final int BLOB_INDEX = 0;
+ private static final String BLOB_FIELD = "blob_col";
+ private static final RowType READ_ROW_TYPE =
+ new RowType(
+ Arrays.asList(
+ new DataField(BLOB_INDEX, BLOB_FIELD, DataTypes.BLOB()),
+ new DataField(1, SpecialFields.ROW_ID.name(), DataTypes.BIGINT()),
+ new DataField(
+ 2, SpecialFields.SEQUENCE_NUMBER.name(), DataTypes.BIGINT())));
+
+ @Test
+ public void testBlobSequenceGroupReaderWithRowRanges() throws Exception {
+ List files =
+ Arrays.asList(blobFile("blob1", 0, 3, 10), blobFile("blob2", 5, 2, 10));
+ List rowRanges = ranges(1, 1, 3, 5);
+
+ ReadResult rows = readSequenceGroup(files, rowRanges, 0, 6, 10);
+
+ assertThat(rows.rowIds).containsExactly(1L, 5L);
+ assertThat(rows.placeholderRowCount).isEqualTo(2);
+ assertThat(rows.batchSizes).containsExactly(1, 2, 1);
+ }
+
+ @Test
+ public void testBlobSequenceGroupReaderWithMultipleRangesInFileAndGap() throws Exception {
+ DataFileMeta wideFile = blobFile("wide-file", 20, 31, 10);
+ List wideFileRanges = ranges(9, 10, 19, 20, 25, 30, 35, 40, 43, 45, 48, 52, 55, 56);
+
+ ReadResult wideFileRows =
+ readSequenceGroup(Collections.singletonList(wideFile), wideFileRanges, 10, 60, 10);
+
+ List wideFileActualRowIds = rowIdsInRanges(20, 50, wideFileRanges);
+ assertThat(wideFileRows.rowIds).containsExactlyElementsOf(wideFileActualRowIds);
+ assertThat(wideFileRows.placeholderRowCount)
+ .isEqualTo(
+ rowIdsInRanges(10, 19, wideFileRanges).size()
+ + rowIdsInRanges(51, 60, wideFileRanges).size());
+ assertThat(wideFileRows.batchSizes)
+ .containsExactlyElementsOf(batchSizes(2, wideFileActualRowIds.size(), 1, 4));
+
+ DataFileMeta firstFile = blobFile("first-file", 0, 11, 10);
+ DataFileMeta secondFile = blobFile("second-file", 50, 11, 10);
+ List gapRanges = ranges(0, 0, 9, 12, 25, 30, 35, 40, 43, 45, 48, 52, 58, 62);
+
+ ReadResult gapRows =
+ readSequenceGroup(Arrays.asList(firstFile, secondFile), gapRanges, 0, 62, 10);
+
+ assertThat(gapRows.rowIds)
+ .containsExactlyElementsOf(
+ concat(
+ rowIdsInRanges(0, 10, gapRanges),
+ rowIdsInRanges(50, 60, gapRanges)));
+ assertThat(gapRows.placeholderRowCount)
+ .isEqualTo(
+ rowIdsInRanges(11, 49, gapRanges).size()
+ + rowIdsInRanges(61, 62, gapRanges).size());
+ assertThat(gapRows.batchSizes).containsExactly(1, 1, 1, 19, 1, 1, 1, 1, 1, 1, 2);
+ }
+
+ @Test
+ public void testBlobFallbackRecordReader() throws Exception {
+ DataFileMeta newFile = blobFile("new-file", 0, 3, 2);
+ DataFileMeta oldFile = blobFile("old-file", 0, 5, 1);
+
+ ReadResult rows =
+ readFallback(Arrays.asList(newFile, oldFile), null, placeholderRows(newFile, 1));
+
+ assertThat(rows.rowIds).containsExactly(0L, 1L, 2L, 3L, 4L);
+ assertThat(rows.sequenceNumbers).containsExactly(2L, 1L, 2L, 1L, 1L);
+ }
+
+ @Test
+ public void testBlobFallbackRecordReaderDerivesRowIdBoundsFromFiles() throws Exception {
+ DataFileMeta newFile = blobFile("new-file", 10, 2, 2);
+ DataFileMeta oldFile = blobFile("old-file", 8, 6, 1);
+
+ ReadResult rows =
+ readFallback(Arrays.asList(newFile, oldFile), null, placeholderRows(newFile, 10));
+
+ assertThat(rows.rowIds).containsExactly(8L, 9L, 10L, 11L, 12L, 13L);
+ assertThat(rows.sequenceNumbers).containsExactly(1L, 1L, 1L, 2L, 1L, 1L);
+ }
+
+ @Test
+ public void testBlobFallbackRecordReaderThrowsIfAllRowsArePlaceholders() {
+ DataFileMeta newFile = blobFile("new-placeholder-file", 0, 1, 2);
+ DataFileMeta oldFile = blobFile("old-placeholder-file", 0, 1, 1);
+
+ assertThatThrownBy(
+ () ->
+ readFallback(
+ Arrays.asList(newFile, oldFile),
+ null,
+ placeholderRows(newFile, 0, oldFile, 0)))
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessageContaining("all blob files at the same row id store a placeholder");
+ }
+
+ @Test
+ public void testBlobFallbackRecordReaderWithRowRanges() throws Exception {
+ DataFileMeta oldFile = blobFile("old-file", 20, 26, 1);
+ DataFileMeta newFile1 = blobFile("new-file-1", 20, 11, 2);
+ DataFileMeta newFile2 = blobFile("new-file-2", 40, 6, 2);
+ List rowRanges = ranges(15, 20, 25, 26, 29, 33, 35, 41);
+
+ ReadResult rows =
+ readFallback(
+ Arrays.asList(newFile2, oldFile, newFile1),
+ rowRanges,
+ Collections.emptySet());
+
+ assertThat(rows.rowIds)
+ .containsExactly(
+ 20L, 25L, 26L, 29L, 30L, 31L, 32L, 33L, 35L, 36L, 37L, 38L, 39L, 40L, 41L);
+ assertThat(rows.sequenceNumbers)
+ .containsExactly(2L, 2L, 2L, 2L, 2L, 1L, 1L, 1L, 1L, 1L, 1L, 1L, 1L, 2L, 2L);
+ }
+
+ private static ReadResult readFallback(
+ List files, List rowRanges, Set placeholderRows)
+ throws Exception {
+ return ReadResult.read(
+ new BlobFallbackRecordReader(
+ files,
+ file -> oneRowPerBatchReader(fileRows(file, rowRanges, placeholderRows)),
+ rowRanges,
+ READ_ROW_TYPE,
+ BLOB_INDEX));
+ }
+
+ private static ReadResult readSequenceGroup(
+ List files,
+ List rowRanges,
+ long firstRowId,
+ long lastRowId,
+ long sequenceNumber)
+ throws Exception {
+ return ReadResult.read(
+ new BlobSequenceGroupRecordReader(
+ files,
+ file -> oneRowPerBatchReader(fileRows(file, rowRanges)),
+ rowRanges,
+ READ_ROW_TYPE,
+ BLOB_INDEX,
+ firstRowId,
+ lastRowId));
+ }
+
+ private static DataFileMeta blobFile(
+ String fileName, long firstRowId, long rowCount, long maxSequenceNumber) {
+ return DataFileMeta.create(
+ fileName + ".blob",
+ rowCount,
+ rowCount,
+ DataFileMeta.EMPTY_MIN_KEY,
+ DataFileMeta.EMPTY_MAX_KEY,
+ SimpleStats.EMPTY_STATS,
+ SimpleStats.EMPTY_STATS,
+ 0,
+ maxSequenceNumber,
+ 0L,
+ DataFileMeta.DUMMY_LEVEL,
+ Collections.emptyList(),
+ Timestamp.fromEpochMillis(System.currentTimeMillis()),
+ rowCount,
+ null,
+ FileSource.APPEND,
+ null,
+ null,
+ firstRowId,
+ Arrays.asList(BLOB_FIELD));
+ }
+
+ private static List ranges(long... bounds) {
+ if (bounds.length % 2 != 0) {
+ throw new IllegalArgumentException("Range bounds should be paired.");
+ }
+
+ List ranges = new ArrayList<>();
+ for (int i = 0; i < bounds.length; i += 2) {
+ ranges.add(new Range(bounds[i], bounds[i + 1]));
+ }
+ return ranges;
+ }
+
+ private static List fileRows(DataFileMeta file, List rowRanges) {
+ return fileRows(file, rowRanges, Collections.emptySet());
+ }
+
+ private static List fileRows(
+ DataFileMeta file, List rowRanges, Set placeholderRows) {
+ List rows = new ArrayList<>();
+ long lastRowId = file.nonNullFirstRowId() + file.rowCount() - 1;
+ for (long rowId = file.nonNullFirstRowId(); rowId <= lastRowId; rowId++) {
+ if (selected(rowId, rowRanges)) {
+ rows.add(
+ blobRow(
+ rowId,
+ file.maxSequenceNumber(),
+ placeholderRows.contains(rowKey(file, rowId))));
+ }
+ }
+ return rows;
+ }
+
+ private static boolean selected(long rowId, List rowRanges) {
+ if (rowRanges == null) {
+ return true;
+ }
+ for (Range range : rowRanges) {
+ if (rowId < range.from) {
+ return false;
+ }
+ if (rowId <= range.to) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private static List rowIdsInRanges(long firstRowId, long lastRowId, List ranges) {
+ List rowIds = new ArrayList<>();
+ for (long rowId = firstRowId; rowId <= lastRowId; rowId++) {
+ if (selected(rowId, ranges)) {
+ rowIds.add(rowId);
+ }
+ }
+ return rowIds;
+ }
+
+ private static List concat(List first, List second) {
+ List all = new ArrayList<>(first);
+ all.addAll(second);
+ return all;
+ }
+
+ private static List batchSizes(
+ int firstBatchSize, int repeatedBatchCount, int repeatedBatchSize, int lastBatchSize) {
+ List batchSizes = new ArrayList<>();
+ batchSizes.add(firstBatchSize);
+ for (int i = 0; i < repeatedBatchCount; i++) {
+ batchSizes.add(repeatedBatchSize);
+ }
+ batchSizes.add(lastBatchSize);
+ return batchSizes;
+ }
+
+ private static Set placeholderRows(DataFileMeta file, long rowId) {
+ Set keys = new HashSet<>();
+ keys.add(rowKey(file, rowId));
+ return keys;
+ }
+
+ private static Set placeholderRows(
+ DataFileMeta firstFile, long firstRowId, DataFileMeta secondFile, long secondRowId) {
+ Set keys = placeholderRows(firstFile, firstRowId);
+ keys.add(rowKey(secondFile, secondRowId));
+ return keys;
+ }
+
+ private static String rowKey(DataFileMeta file, long rowId) {
+ return file.fileName() + "#" + rowId;
+ }
+
+ private static InternalRow blobRow(long rowId, long sequenceNumber, boolean placeholder) {
+ GenericRow row = new GenericRow(3);
+ row.setField(
+ BLOB_INDEX,
+ placeholder ? BlobPlaceholder.INSTANCE : new BlobData(new byte[] {(byte) rowId}));
+ row.setField(1, rowId);
+ row.setField(2, sequenceNumber);
+ return row;
+ }
+
+ private static RecordReader oneRowPerBatchReader(List rows) {
+ return new RecordReader() {
+
+ int index;
+
+ @Override
+ public RecordIterator readBatch() {
+ if (index >= rows.size()) {
+ return null;
+ }
+ InternalRow row = rows.get(index++);
+ return new RecordIterator() {
+
+ boolean returned;
+
+ @Override
+ public InternalRow next() {
+ if (returned) {
+ return null;
+ }
+ returned = true;
+ return row;
+ }
+
+ @Override
+ public void releaseBatch() {}
+ };
+ }
+
+ @Override
+ public void close() {}
+ };
+ }
+
+ private static class ReadResult {
+ final List rowIds = new ArrayList<>();
+ final List sequenceNumbers = new ArrayList<>();
+ final List batchSizes = new ArrayList<>();
+ int placeholderRowCount;
+
+ static ReadResult read(RecordReader reader) throws Exception {
+ try {
+ ReadResult result = new ReadResult();
+ RecordIterator batch;
+ while ((batch = reader.readBatch()) != null) {
+ int batchSize = 0;
+ InternalRow row;
+ while ((row = batch.next()) != null) {
+ result.add(row);
+ batchSize++;
+ }
+ batch.releaseBatch();
+ result.batchSizes.add(batchSize);
+ }
+ return result;
+ } finally {
+ reader.close();
+ }
+ }
+
+ private void add(InternalRow row) {
+ if (row.getBlob(BLOB_INDEX) == BlobPlaceholder.INSTANCE) {
+ placeholderRowCount++;
+ } else {
+ rowIds.add(row.getLong(1));
+ sequenceNumbers.add(row.getLong(2));
+ }
+ }
+ }
+}
diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionReadTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionReadTest.java
index d6e6998e6c03..51a0a93aa233 100644
--- a/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionReadTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionReadTest.java
@@ -21,8 +21,9 @@
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.FileSource;
+import org.apache.paimon.operation.DataEvolutionSplitRead.BlobFileBunch;
import org.apache.paimon.operation.DataEvolutionSplitRead.FieldBunch;
-import org.apache.paimon.operation.DataEvolutionSplitRead.SpecialFieldBunch;
+import org.apache.paimon.operation.DataEvolutionSplitRead.VectorFileBunch;
import org.apache.paimon.stats.SimpleStats;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
@@ -42,150 +43,149 @@
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
-/** Tests for {@link SpecialFieldBunch}. */
+/** Tests for blob and vector field bunches. */
public class DataEvolutionReadTest {
- private SpecialFieldBunch blobBunch;
+ private VectorFileBunch vectorBunch;
@BeforeEach
public void setUp() {
- blobBunch = new SpecialFieldBunch(Long.MAX_VALUE, false);
+ vectorBunch = new VectorFileBunch(Long.MAX_VALUE, false);
}
@Test
- public void testAddSingleBlobEntry() {
- DataFileMeta blobEntry = createBlobFile("blob1", 0L, 100L, 1L);
+ public void testAddSingleVectorEntry() {
+ DataFileMeta vectorEntry = createVectorFile("vector1", 0L, 100L, 1L);
- blobBunch.add(blobEntry);
+ vectorBunch.add(vectorEntry);
- assertThat(blobBunch.files).hasSize(1);
- assertThat(blobBunch.files.get(0)).isEqualTo(blobEntry);
- assertThat(blobBunch.rowCount()).isEqualTo(100);
- assertThat(blobBunch.files.get(0).firstRowId()).isEqualTo(0);
- assertThat(blobBunch.files.get(0).writeCols()).isEqualTo(Arrays.asList("blob_col"));
+ assertThat(vectorBunch.files).hasSize(1);
+ assertThat(vectorBunch.files.get(0)).isEqualTo(vectorEntry);
+ assertThat(vectorBunch.rowCount()).isEqualTo(100);
+ assertThat(vectorBunch.files.get(0).firstRowId()).isEqualTo(0);
+ assertThat(vectorBunch.files.get(0).writeCols()).isEqualTo(Arrays.asList("vector_col"));
}
@Test
- public void testAddBlobEntryAndTail() {
- DataFileMeta blobEntry = createBlobFile("blob1", 0, 100, 1);
- DataFileMeta blobTail = createBlobFile("blob2", 100, 200, 1);
-
- blobBunch.add(blobEntry);
- blobBunch.add(blobTail);
-
- assertThat(blobBunch.files).hasSize(2);
- assertThat(blobBunch.files.get(0)).isEqualTo(blobEntry);
- assertThat(blobBunch.files.get(1)).isEqualTo(blobTail);
- assertThat(blobBunch.rowCount()).isEqualTo(300);
- assertThat(blobBunch.files.get(0).firstRowId()).isEqualTo(0);
- assertThat(blobBunch.files.get(0).writeCols()).isEqualTo(Arrays.asList("blob_col"));
- assertThat(blobBunch.files.get(0).schemaId()).isEqualTo(0L);
+ public void testAddVectorEntryAndTail() {
+ DataFileMeta vectorEntry = createVectorFile("vector1", 0, 100, 1);
+ DataFileMeta vectorTail = createVectorFile("vector2", 100, 200, 1);
+
+ vectorBunch.add(vectorEntry);
+ vectorBunch.add(vectorTail);
+
+ assertThat(vectorBunch.files).hasSize(2);
+ assertThat(vectorBunch.files.get(0)).isEqualTo(vectorEntry);
+ assertThat(vectorBunch.files.get(1)).isEqualTo(vectorTail);
+ assertThat(vectorBunch.rowCount()).isEqualTo(300);
+ assertThat(vectorBunch.files.get(0).firstRowId()).isEqualTo(0);
+ assertThat(vectorBunch.files.get(0).writeCols()).isEqualTo(Arrays.asList("vector_col"));
+ assertThat(vectorBunch.files.get(0).schemaId()).isEqualTo(0L);
}
@Test
- public void testAddNonBlobFileThrowsException() {
+ public void testAddNonVectorFileThrowsException() {
DataFileMeta normalFile = createNormalFile("normal1.parquet", 0, 100, 1, 0L);
- assertThatThrownBy(() -> blobBunch.add(normalFile))
+ assertThatThrownBy(() -> vectorBunch.add(normalFile))
.isInstanceOf(IllegalArgumentException.class)
- .hasMessage("Only blob/vector-store file can be added to this bunch.");
+ .hasMessage("Only vector-store file can be added to this bunch.");
}
@Test
- public void testAddBlobFileWithSameFirstRowId() {
- DataFileMeta blobEntry1 = createBlobFile("blob1", 0, 100, 1);
- DataFileMeta blobEntry2 = createBlobFile("blob2", 0, 50, 2);
+ public void testAddVectorFileWithSameFirstRowId() {
+ DataFileMeta vectorEntry1 = createVectorFile("vector1", 0, 100, 1);
+ DataFileMeta vectorEntry2 = createVectorFile("vector2", 0, 50, 2);
- blobBunch.add(blobEntry1);
+ vectorBunch.add(vectorEntry1);
// Adding file with same firstRowId but higher sequence number should throw exception
- assertThatThrownBy(() -> blobBunch.add(blobEntry2))
+ assertThatThrownBy(() -> vectorBunch.add(vectorEntry2))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage(
- "Blob/vector-store file with same first row id should have decreasing sequence number.");
+ "Vector file with same first row id should have decreasing sequence number.");
}
@Test
- public void testAddBlobFileWithSameFirstRowIdAndLowerSequenceNumber() {
- DataFileMeta blobEntry1 = createBlobFile("blob1", 0, 100, 2);
- DataFileMeta blobEntry2 = createBlobFile("blob2", 0, 50, 1);
+ public void testAddVectorFileWithSameFirstRowIdAndLowerSequenceNumber() {
+ DataFileMeta vectorEntry1 = createVectorFile("vector1", 0, 100, 2);
+ DataFileMeta vectorEntry2 = createVectorFile("vector2", 0, 50, 1);
- blobBunch.add(blobEntry1);
+ vectorBunch.add(vectorEntry1);
// Adding file with same firstRowId and lower sequence number should be ignored
- blobBunch.add(blobEntry2);
+ vectorBunch.add(vectorEntry2);
- assertThat(blobBunch.files).hasSize(1);
- assertThat(blobBunch.files.get(0)).isEqualTo(blobEntry1);
+ assertThat(vectorBunch.files).hasSize(1);
+ assertThat(vectorBunch.files.get(0)).isEqualTo(vectorEntry1);
}
@Test
- public void testAddBlobFileWithOverlappingRowId() {
- DataFileMeta blobEntry1 = createBlobFile("blob1", 0, 100, 2);
- DataFileMeta blobEntry2 = createBlobFile("blob2", 50, 150, 1);
+ public void testAddVectorFileWithOverlappingRowId() {
+ DataFileMeta vectorEntry1 = createVectorFile("vector1", 0, 100, 2);
+ DataFileMeta vectorEntry2 = createVectorFile("vector2", 50, 150, 1);
- blobBunch.add(blobEntry1);
+ vectorBunch.add(vectorEntry1);
// Adding file with overlapping row id and lower sequence number should be ignored
- blobBunch.add(blobEntry2);
+ vectorBunch.add(vectorEntry2);
- assertThat(blobBunch.files).hasSize(1);
- assertThat(blobBunch.files.get(0)).isEqualTo(blobEntry1);
+ assertThat(vectorBunch.files).hasSize(1);
+ assertThat(vectorBunch.files.get(0)).isEqualTo(vectorEntry1);
}
@Test
- public void testAddBlobFileWithOverlappingRowIdAndHigherSequenceNumber() {
- DataFileMeta blobEntry1 = createBlobFile("blob1", 0, 100, 1);
- DataFileMeta blobEntry2 = createBlobFile("blob2", 50, 150, 2);
+ public void testAddVectorFileWithOverlappingRowIdAndHigherSequenceNumber() {
+ DataFileMeta vectorEntry1 = createVectorFile("vector1", 0, 100, 1);
+ DataFileMeta vectorEntry2 = createVectorFile("vector2", 50, 150, 2);
- blobBunch.add(blobEntry1);
+ vectorBunch.add(vectorEntry1);
// Adding file with overlapping row id and higher sequence number should throw exception
- assertThatThrownBy(() -> blobBunch.add(blobEntry2))
+ assertThatThrownBy(() -> vectorBunch.add(vectorEntry2))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage(
- "Blob/vector-store file with overlapping row id should have decreasing sequence number.");
+ "Vector file with overlapping row id should have decreasing sequence number.");
}
@Test
- public void testAddBlobFileWithNonContinuousRowId() {
- DataFileMeta blobEntry1 = createBlobFile("blob1", 0, 100, 1);
- DataFileMeta blobEntry2 = createBlobFile("blob2", 200, 300, 1);
+ public void testAddVectorFileWithNonContinuousRowId() {
+ DataFileMeta vectorEntry1 = createVectorFile("vector1", 0, 100, 1);
+ DataFileMeta vectorEntry2 = createVectorFile("vector2", 200, 300, 1);
- blobBunch.add(blobEntry1);
+ vectorBunch.add(vectorEntry1);
// Adding file with non-continuous row id should throw exception
- assertThatThrownBy(() -> blobBunch.add(blobEntry2))
+ assertThatThrownBy(() -> vectorBunch.add(vectorEntry2))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage(
- "Blob/vector-store file first row id should be continuous, expect 100 but got 200");
+ "Vector file first row id should be continuous, expect 100 but got 200");
}
@Test
- public void testAddBlobFileWithDifferentWriteCols() {
- DataFileMeta blobEntry1 = createBlobFile("blob1", 0, 100, 1);
- DataFileMeta blobEntry2 =
- createBlobFileWithCols("blob2", 100, 200, 1, Arrays.asList("different_col"));
+ public void testAddVectorFileWithDifferentWriteCols() {
+ DataFileMeta vectorEntry1 = createVectorFile("vector1", 0, 100, 1);
+ DataFileMeta vectorEntry2 =
+ createVectorFileWithCols("vector2", 100, 200, 1, Arrays.asList("different_col"));
- blobBunch.add(blobEntry1);
+ vectorBunch.add(vectorEntry1);
// Adding file with different write columns should throw exception
- assertThatThrownBy(() -> blobBunch.add(blobEntry2))
+ assertThatThrownBy(() -> vectorBunch.add(vectorEntry2))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("All files in this bunch should have the same write columns.");
}
@Test
- public void testComplexBlobBunchScenario() {
- // Create a complex scenario with multiple blob entries and a tail
- DataFileMeta blobEntry1 = createBlobFile("blob1", 0, 100, 1);
- DataFileMeta blobEntry2 = createBlobFile("blob2", 100, 200, 1);
- DataFileMeta blobEntry3 = createBlobFile("blob3", 300, 300, 1);
- DataFileMeta blobTail = createBlobFile("blob4", 600, 400, 1);
-
- blobBunch.add(blobEntry1);
- blobBunch.add(blobEntry2);
- blobBunch.add(blobEntry3);
- blobBunch.add(blobTail);
-
- assertThat(blobBunch.files).hasSize(4);
- assertThat(blobBunch.rowCount()).isEqualTo(1000);
- assertThat(blobBunch.files.get(0).firstRowId()).isEqualTo(0);
- assertThat(blobBunch.files.get(0).writeCols()).isEqualTo(Arrays.asList("blob_col"));
+ public void testComplexVectorBunchScenario() {
+ DataFileMeta vectorEntry1 = createVectorFile("vector1", 0, 100, 1);
+ DataFileMeta vectorEntry2 = createVectorFile("vector2", 100, 200, 1);
+ DataFileMeta vectorEntry3 = createVectorFile("vector3", 300, 300, 1);
+ DataFileMeta vectorTail = createVectorFile("vector4", 600, 400, 1);
+
+ vectorBunch.add(vectorEntry1);
+ vectorBunch.add(vectorEntry2);
+ vectorBunch.add(vectorEntry3);
+ vectorBunch.add(vectorTail);
+
+ assertThat(vectorBunch.files).hasSize(4);
+ assertThat(vectorBunch.rowCount()).isEqualTo(1000);
+ assertThat(vectorBunch.files.get(0).firstRowId()).isEqualTo(0);
+ assertThat(vectorBunch.files.get(0).writeCols()).isEqualTo(Arrays.asList("vector_col"));
}
@Test
@@ -209,26 +209,31 @@ public void testComplexBlobBunchScenario2() {
List batch = batches.get(0);
- assertThat(batch.get(1).fileName()).contains("blob5"); // pick
- assertThat(batch.get(2).fileName()).contains("blob2"); // skip
- assertThat(batch.get(3).fileName()).contains("blob1"); // skip
- assertThat(batch.get(4).fileName()).contains("blob9"); // pick
- assertThat(batch.get(5).fileName()).contains("blob6"); // skip
- assertThat(batch.get(6).fileName()).contains("blob3"); // skip
- assertThat(batch.get(7).fileName()).contains("blob7"); // pick
- assertThat(batch.get(8).fileName()).contains("blob4"); // skip
- assertThat(batch.get(9).fileName()).contains("blob8"); // pick
+ assertThat(batch.get(1).fileName()).contains("blob5");
+ assertThat(batch.get(2).fileName()).contains("blob2");
+ assertThat(batch.get(3).fileName()).contains("blob1");
+ assertThat(batch.get(4).fileName()).contains("blob9");
+ assertThat(batch.get(5).fileName()).contains("blob6");
+ assertThat(batch.get(6).fileName()).contains("blob3");
+ assertThat(batch.get(7).fileName()).contains("blob7");
+ assertThat(batch.get(8).fileName()).contains("blob4");
+ assertThat(batch.get(9).fileName()).contains("blob8");
List fieldBunches =
splitFieldBunches(batch, file -> makeBlobRowType(file.writeCols(), f -> 0));
assertThat(fieldBunches.size()).isEqualTo(2);
- SpecialFieldBunch blobBunch = (SpecialFieldBunch) fieldBunches.get(1);
- assertThat(blobBunch.files).hasSize(4);
+ BlobFileBunch blobBunch = (BlobFileBunch) fieldBunches.get(1);
+ assertThat(blobBunch.files).hasSize(9);
assertThat(blobBunch.files.get(0).fileName()).contains("blob5");
- assertThat(blobBunch.files.get(1).fileName()).contains("blob9");
- assertThat(blobBunch.files.get(2).fileName()).contains("blob7");
- assertThat(blobBunch.files.get(3).fileName()).contains("blob8");
+ assertThat(blobBunch.files.get(1).fileName()).contains("blob2");
+ assertThat(blobBunch.files.get(2).fileName()).contains("blob1");
+ assertThat(blobBunch.files.get(3).fileName()).contains("blob9");
+ assertThat(blobBunch.files.get(4).fileName()).contains("blob6");
+ assertThat(blobBunch.files.get(5).fileName()).contains("blob3");
+ assertThat(blobBunch.files.get(6).fileName()).contains("blob7");
+ assertThat(blobBunch.files.get(7).fileName()).contains("blob4");
+ assertThat(blobBunch.files.get(8).fileName()).contains("blob8");
}
@Test
@@ -275,19 +280,29 @@ public void testComplexBlobBunchScenario3() {
batch, file -> makeBlobRowType(file.writeCols(), String::hashCode));
assertThat(fieldBunches.size()).isEqualTo(3);
- SpecialFieldBunch blobBunch = (SpecialFieldBunch) fieldBunches.get(1);
- assertThat(blobBunch.files).hasSize(4);
+ BlobFileBunch blobBunch = (BlobFileBunch) fieldBunches.get(1);
+ assertThat(blobBunch.files).hasSize(9);
assertThat(blobBunch.files.get(0).fileName()).contains("blob5");
- assertThat(blobBunch.files.get(1).fileName()).contains("blob9");
- assertThat(blobBunch.files.get(2).fileName()).contains("blob7");
- assertThat(blobBunch.files.get(3).fileName()).contains("blob8");
-
- blobBunch = (SpecialFieldBunch) fieldBunches.get(2);
- assertThat(blobBunch.files).hasSize(4);
+ assertThat(blobBunch.files.get(1).fileName()).contains("blob2");
+ assertThat(blobBunch.files.get(2).fileName()).contains("blob1");
+ assertThat(blobBunch.files.get(3).fileName()).contains("blob9");
+ assertThat(blobBunch.files.get(4).fileName()).contains("blob6");
+ assertThat(blobBunch.files.get(5).fileName()).contains("blob3");
+ assertThat(blobBunch.files.get(6).fileName()).contains("blob7");
+ assertThat(blobBunch.files.get(7).fileName()).contains("blob4");
+ assertThat(blobBunch.files.get(8).fileName()).contains("blob8");
+
+ blobBunch = (BlobFileBunch) fieldBunches.get(2);
+ assertThat(blobBunch.files).hasSize(9);
assertThat(blobBunch.files.get(0).fileName()).contains("blob15");
- assertThat(blobBunch.files.get(1).fileName()).contains("blob19");
- assertThat(blobBunch.files.get(2).fileName()).contains("blob17");
- assertThat(blobBunch.files.get(3).fileName()).contains("blob18");
+ assertThat(blobBunch.files.get(1).fileName()).contains("blob12");
+ assertThat(blobBunch.files.get(2).fileName()).contains("blob11");
+ assertThat(blobBunch.files.get(3).fileName()).contains("blob19");
+ assertThat(blobBunch.files.get(4).fileName()).contains("blob16");
+ assertThat(blobBunch.files.get(5).fileName()).contains("blob13");
+ assertThat(blobBunch.files.get(6).fileName()).contains("blob17");
+ assertThat(blobBunch.files.get(7).fileName()).contains("blob14");
+ assertThat(blobBunch.files.get(8).fileName()).contains("blob18");
}
@Test
@@ -372,8 +387,81 @@ private DataFileMeta createBlobFileWithCols(
writeCols);
}
+ private DataFileMeta createVectorFile(
+ String fileName, long firstRowId, long rowCount, long maxSequenceNumber) {
+ return createVectorFileWithCols(
+ fileName, firstRowId, rowCount, maxSequenceNumber, Arrays.asList("vector_col"));
+ }
+
+ private DataFileMeta createVectorFileWithSchema(
+ String fileName,
+ long firstRowId,
+ long rowCount,
+ long maxSequenceNumber,
+ long schemaId) {
+ return createFile(
+ fileName + ".vector.avro",
+ firstRowId,
+ rowCount,
+ maxSequenceNumber,
+ schemaId,
+ Arrays.asList("vector_col"));
+ }
+
+ private DataFileMeta createVectorFileWithCols(
+ String fileName,
+ long firstRowId,
+ long rowCount,
+ long maxSequenceNumber,
+ List writeCols) {
+ return createFile(
+ fileName + ".vector.avro", firstRowId, rowCount, maxSequenceNumber, 0L, writeCols);
+ }
+
+ private DataFileMeta createFile(
+ String fileName,
+ long firstRowId,
+ long rowCount,
+ long maxSequenceNumber,
+ long schemaId,
+ List writeCols) {
+ return DataFileMeta.create(
+ fileName,
+ rowCount,
+ rowCount,
+ DataFileMeta.EMPTY_MIN_KEY,
+ DataFileMeta.EMPTY_MAX_KEY,
+ SimpleStats.EMPTY_STATS,
+ SimpleStats.EMPTY_STATS,
+ 0,
+ maxSequenceNumber,
+ schemaId,
+ DataFileMeta.DUMMY_LEVEL,
+ Collections.emptyList(),
+ Timestamp.fromEpochMillis(System.currentTimeMillis()),
+ rowCount,
+ null,
+ FileSource.APPEND,
+ null,
+ null,
+ firstRowId,
+ writeCols);
+ }
+
+ @Test
+ void testAddVectorFilesWithDifferentSchemaId() {
+ DataFileMeta vectorEntry1 = createVectorFileWithSchema("vector1", 0, 100, 1, 0L);
+ DataFileMeta vectorEntry2 = createVectorFileWithSchema("vector2", 100, 200, 1, 1L);
+
+ vectorBunch.add(vectorEntry1);
+ assertThatThrownBy(() -> vectorBunch.add(vectorEntry2))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("All files in this bunch should have the same schema id.");
+ }
+
@Test
void testAddBlobFilesWithDifferentSchemaId() {
+ BlobFileBunch blobBunch = new BlobFileBunch(300, false);
DataFileMeta blobEntry1 = createBlobFileWithSchema("blob1", 0, 100, 1, 0L);
DataFileMeta blobEntry2 = createBlobFileWithSchema("blob2", 100, 200, 1, 1L);
@@ -388,24 +476,24 @@ void testAddBlobFilesWithDifferentSchemaId() {
@Test
public void testRowIdPushDown() {
- SpecialFieldBunch blobBunch = new SpecialFieldBunch(Long.MAX_VALUE, true);
- DataFileMeta blobEntry1 = createBlobFile("blob1", 0, 100, 1);
- DataFileMeta blobEntry2 = createBlobFile("blob2", 200, 300, 1);
- blobBunch.add(blobEntry1);
- SpecialFieldBunch finalBlobBunch = blobBunch;
- DataFileMeta finalBlobEntry = blobEntry2;
- assertThatCode(() -> finalBlobBunch.add(finalBlobEntry)).doesNotThrowAnyException();
-
- blobBunch = new SpecialFieldBunch(Long.MAX_VALUE, true);
- blobEntry1 = createBlobFile("blob1", 0, 100, 1);
- blobEntry2 = createBlobFile("blob2", 50, 200, 2);
- blobBunch.add(blobEntry1);
- blobBunch.add(blobEntry2);
- assertThat(blobBunch.files).containsExactlyInAnyOrder(blobEntry2);
-
- SpecialFieldBunch finalBlobBunch2 = blobBunch;
- DataFileMeta blobEntry3 = createBlobFile("blob2", 250, 100, 2);
- assertThatCode(() -> finalBlobBunch2.add(blobEntry3)).doesNotThrowAnyException();
+ VectorFileBunch vectorBunch = new VectorFileBunch(Long.MAX_VALUE, true);
+ DataFileMeta vectorEntry1 = createVectorFile("vector1", 0, 100, 1);
+ DataFileMeta vectorEntry2 = createVectorFile("vector2", 200, 300, 1);
+ vectorBunch.add(vectorEntry1);
+ VectorFileBunch finalVectorBunch = vectorBunch;
+ DataFileMeta finalVectorEntry = vectorEntry2;
+ assertThatCode(() -> finalVectorBunch.add(finalVectorEntry)).doesNotThrowAnyException();
+
+ vectorBunch = new VectorFileBunch(Long.MAX_VALUE, true);
+ vectorEntry1 = createVectorFile("vector1", 0, 100, 1);
+ vectorEntry2 = createVectorFile("vector2", 50, 200, 2);
+ vectorBunch.add(vectorEntry1);
+ vectorBunch.add(vectorEntry2);
+ assertThat(vectorBunch.files).containsExactlyInAnyOrder(vectorEntry2);
+
+ VectorFileBunch finalVectorBunch2 = vectorBunch;
+ DataFileMeta vectorEntry3 = createVectorFile("vector2", 250, 100, 2);
+ assertThatCode(() -> finalVectorBunch2.add(vectorEntry3)).doesNotThrowAnyException();
}
/** Creates a normal (non-blob) file for testing. */
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileMeta.java b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileMeta.java
index 27a1ed56170a..d6d87542315f 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileMeta.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileMeta.java
@@ -42,7 +42,7 @@ public BlobFileMeta(SeekableInputStream in, long fileSize, @Nullable RoaringBitm
byte[] header = new byte[5];
IOUtils.readFully(in, header);
byte version = header[4];
- if (version != 1) {
+ if (version != 1 && version != 2) {
throw new IOException("Unsupported version: " + version);
}
int indexLength = BytesUtils.getInt(header, 0);
@@ -55,7 +55,7 @@ public BlobFileMeta(SeekableInputStream in, long fileSize, @Nullable RoaringBitm
long[] blobOffsets = new long[blobLengths.length];
long offset = 0;
for (int i = 0; i < blobLengths.length; i++) {
- if (blobLengths[i] == -1) {
+ if (blobLengths[i] < 0) {
blobOffsets[i] = -1;
} else {
blobOffsets[i] = offset;
@@ -86,7 +86,11 @@ public BlobFileMeta(SeekableInputStream in, long fileSize, @Nullable RoaringBitm
}
public boolean isNull(int i) {
- return blobLengths[i] == -1;
+ return blobLengths[i] == BlobFormatWriter.NULL_LENGTH;
+ }
+
+ public boolean isPlaceHolder(int i) {
+ return blobLengths[i] == BlobFormatWriter.PLACE_HOLDER_LENGTH;
}
public long blobLength(int i) {
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java
index 801964b86239..73222ea575c9 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java
@@ -19,6 +19,7 @@
package org.apache.paimon.format.blob;
import org.apache.paimon.data.Blob;
+import org.apache.paimon.data.BlobPlaceholder;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.fs.FileIO;
@@ -94,6 +95,8 @@ public InternalRow next() {
Blob blob;
if (fileMeta.isNull(currentPosition)) {
blob = null;
+ } else if (fileMeta.isPlaceHolder(currentPosition)) {
+ blob = BlobPlaceholder.INSTANCE;
} else {
long offset = fileMeta.blobOffset(currentPosition) + 4;
long length = fileMeta.blobLength(currentPosition) - 16;
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatWriter.java b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatWriter.java
index 83d6c0eb915d..0ae9fbb43700 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatWriter.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatWriter.java
@@ -21,6 +21,7 @@
import org.apache.paimon.data.Blob;
import org.apache.paimon.data.BlobConsumer;
import org.apache.paimon.data.BlobDescriptor;
+import org.apache.paimon.data.BlobPlaceholder;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.format.FileAwareFormatWriter;
import org.apache.paimon.format.FormatWriter;
@@ -43,9 +44,11 @@
/** {@link FormatWriter} for blob file. */
public class BlobFormatWriter implements FileAwareFormatWriter {
- public static final byte VERSION = 1;
+ public static final byte VERSION = 2;
public static final int MAGIC_NUMBER = 1481511375;
public static final byte[] MAGIC_NUMBER_BYTES = intToLittleEndian(MAGIC_NUMBER);
+ public static final long NULL_LENGTH = -1L;
+ public static final long PLACE_HOLDER_LENGTH = -2L;
private final PositionOutputStream out;
@Nullable private final BlobConsumer writeConsumer;
@@ -81,13 +84,17 @@ public boolean deleteFileUponAbort() {
public void addElement(InternalRow element) throws IOException {
checkArgument(element.getFieldCount() == 1, "BlobFormatWriter only support one field.");
if (element.isNullAt(0)) {
- lengths.add(-1L);
+ lengths.add(NULL_LENGTH);
if (writeConsumer != null) {
writeConsumer.accept(blobFieldName, null);
}
return;
}
Blob blob = element.getBlob(0);
+ if (blob == BlobPlaceholder.INSTANCE) {
+ lengths.add(PLACE_HOLDER_LENGTH);
+ return;
+ }
long previousPos = out.getPos();
crc32.reset();
diff --git a/paimon-format/src/test/java/org/apache/paimon/format/blob/BlobFileFormatTest.java b/paimon-format/src/test/java/org/apache/paimon/format/blob/BlobFileFormatTest.java
index 0b66e15c2b88..c2928639f341 100644
--- a/paimon-format/src/test/java/org/apache/paimon/format/blob/BlobFileFormatTest.java
+++ b/paimon-format/src/test/java/org/apache/paimon/format/blob/BlobFileFormatTest.java
@@ -20,6 +20,7 @@
import org.apache.paimon.data.Blob;
import org.apache.paimon.data.BlobData;
+import org.apache.paimon.data.BlobPlaceholder;
import org.apache.paimon.data.BlobRef;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
@@ -33,6 +34,7 @@
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.DeltaVarintCompressor;
import org.apache.paimon.utils.RoaringBitmap32;
import org.junit.jupiter.api.BeforeEach;
@@ -44,7 +46,10 @@
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
+import java.util.zip.CRC32;
+import static org.apache.paimon.utils.StreamUtils.intToLittleEndian;
+import static org.apache.paimon.utils.StreamUtils.longToLittleEndian;
import static org.assertj.core.api.Assertions.assertThat;
/** Test for {@link BlobFileFormat}. */
@@ -73,22 +78,53 @@ public void testReadBlobInlineBytes() throws IOException {
innerTest(false);
}
+ @Test
+ public void testReadLegacyVersionOneBlobFile() throws IOException {
+ BlobFileFormat format = new BlobFileFormat(false);
+ RowType rowType = RowType.of(DataTypes.BLOB());
+ List blobs = Arrays.asList("hello".getBytes(), null, "world".getBytes());
+
+ try (PositionOutputStream out = fileIO.newOutputStream(file, false)) {
+ writeLegacyVersionOneBlobFile(out, blobs);
+ }
+
+ FormatReaderFactory readerFactory = format.createReaderFactory(null, rowType, null);
+ FormatReaderContext context =
+ new FormatReaderContext(fileIO, file, fileIO.getFileSize(file));
+ List result = new ArrayList<>();
+ readerFactory
+ .createReader(context)
+ .forEachRemaining(
+ row -> result.add(row.isNullAt(0) ? null : row.getBlob(0).toData()));
+
+ assertThat(result).hasSize(blobs.size());
+ assertThat((byte[]) result.get(0)).isEqualTo(blobs.get(0));
+ assertThat(result.get(1)).isNull();
+ assertThat((byte[]) result.get(2)).isEqualTo(blobs.get(2));
+ }
+
private void innerTest(boolean blobAsDescriptor) throws IOException {
BlobFileFormat format = new BlobFileFormat(blobAsDescriptor);
RowType rowType = RowType.of(DataTypes.BLOB());
// write
FormatWriterFactory writerFactory = format.createWriterFactory(rowType);
- List blobs =
- Arrays.asList("hello".getBytes(), null, "world".getBytes(), new byte[0]);
+ List blobs =
+ Arrays.asList(
+ "hello".getBytes(),
+ null,
+ BlobPlaceholder.INSTANCE,
+ "world".getBytes(),
+ new byte[0]);
try (PositionOutputStream out = fileIO.newOutputStream(file, false)) {
FormatWriter formatWriter = writerFactory.create(out, null);
- for (byte[] bytes : blobs) {
- if (bytes == null) {
+ for (Object blob : blobs) {
+ if (blob == null) {
formatWriter.addElement(GenericRow.of((Object) null));
- continue;
+ } else if (blob == BlobPlaceholder.INSTANCE) {
+ formatWriter.addElement(GenericRow.of(BlobPlaceholder.INSTANCE));
} else {
- formatWriter.addElement(GenericRow.of(new BlobData(bytes)));
+ formatWriter.addElement(GenericRow.of(new BlobData((byte[]) blob)));
}
}
formatWriter.close();
@@ -98,7 +134,7 @@ private void innerTest(boolean blobAsDescriptor) throws IOException {
FormatReaderFactory readerFactory = format.createReaderFactory(null, rowType, null);
FormatReaderContext context =
new FormatReaderContext(fileIO, file, fileIO.getFileSize(file));
- List result = new ArrayList<>();
+ List result = new ArrayList<>();
readerFactory
.createReader(context)
.forEachRemaining(
@@ -107,7 +143,10 @@ private void innerTest(boolean blobAsDescriptor) throws IOException {
result.add(null);
} else {
Blob blob = row.getBlob(0);
- if (blobAsDescriptor) {
+ if (blob == BlobPlaceholder.INSTANCE) {
+ result.add(BlobPlaceholder.INSTANCE);
+ return;
+ } else if (blobAsDescriptor) {
assertThat(blob).isInstanceOf(BlobRef.class);
} else {
assertThat(blob).isInstanceOf(BlobData.class);
@@ -117,19 +156,59 @@ private void innerTest(boolean blobAsDescriptor) throws IOException {
});
// assert
- assertThat(result).containsExactlyElementsOf(blobs);
+ assertThat(result).hasSize(blobs.size());
+ assertThat((byte[]) result.get(0)).isEqualTo((byte[]) blobs.get(0));
+ assertThat(result.get(1)).isNull();
+ assertThat(result.get(2)).isSameAs(BlobPlaceholder.INSTANCE);
+ assertThat((byte[]) result.get(3)).isEqualTo((byte[]) blobs.get(3));
+ assertThat((byte[]) result.get(4)).isEqualTo((byte[]) blobs.get(4));
// read with selection
RoaringBitmap32 selection = new RoaringBitmap32();
selection.add(2);
context = new FormatReaderContext(fileIO, file, fileIO.getFileSize(file), selection);
result.clear();
- readerFactory
- .createReader(context)
- .forEachRemaining(row -> result.add(row.getBlob(0).toData()));
+ readerFactory.createReader(context).forEachRemaining(row -> result.add(row.getBlob(0)));
// assert
- assertThat(result).containsOnly(blobs.get(2));
+ assertThat(result).hasSize(1);
+ assertThat(result.get(0)).isSameAs(BlobPlaceholder.INSTANCE);
+ }
+
+ private void writeLegacyVersionOneBlobFile(PositionOutputStream out, List blobs)
+ throws IOException {
+ CRC32 crc32 = new CRC32();
+ long[] lengths = new long[blobs.size()];
+ for (int i = 0; i < blobs.size(); i++) {
+ byte[] blob = blobs.get(i);
+ if (blob == null) {
+ lengths[i] = BlobFormatWriter.NULL_LENGTH;
+ continue;
+ }
+
+ long previousPos = out.getPos();
+ crc32.reset();
+
+ crc32.update(
+ BlobFormatWriter.MAGIC_NUMBER_BYTES,
+ 0,
+ BlobFormatWriter.MAGIC_NUMBER_BYTES.length);
+ out.write(BlobFormatWriter.MAGIC_NUMBER_BYTES);
+ crc32.update(blob, 0, blob.length);
+ out.write(blob);
+
+ long binLength = out.getPos() - previousPos + 12;
+ lengths[i] = binLength;
+ byte[] lengthBytes = longToLittleEndian(binLength);
+ crc32.update(lengthBytes, 0, lengthBytes.length);
+ out.write(lengthBytes);
+ out.write(intToLittleEndian((int) crc32.getValue()));
+ }
+
+ byte[] indexBytes = DeltaVarintCompressor.compress(lengths);
+ out.write(indexBytes);
+ out.write(intToLittleEndian(indexBytes.length));
+ out.write(1);
}
@Test
diff --git a/paimon-python/pypaimon/read/reader/format_blob_reader.py b/paimon-python/pypaimon/read/reader/format_blob_reader.py
index e7e65394aa6d..8dd4e2fbd6bf 100644
--- a/paimon-python/pypaimon/read/reader/format_blob_reader.py
+++ b/paimon-python/pypaimon/read/reader/format_blob_reader.py
@@ -32,6 +32,8 @@
class FormatBlobReader(RecordBatchReader):
+ NULL_LENGTH = -1
+ PLACE_HOLDER_LENGTH = -2
def __init__(self, file_io: FileIO, file_path: str, read_fields: List[str],
full_fields: List[DataField], push_down_predicate: Any, blob_as_descriptor: bool,
@@ -97,6 +99,10 @@ def read_arrow_batch(self, start_idx=None, end_idx=None) -> Optional[RecordBatch
for field_name in self._fields:
if blob is None:
pydict_data[field_name].append(None)
+ elif blob is Blob.PLACE_HOLDER:
+ raise RuntimeError(
+ "Blob placeholder is not supported by FormatBlobReader yet."
+ )
elif self._blob_as_descriptor:
pydict_data[field_name].append(blob.to_descriptor().serialize())
else:
@@ -148,7 +154,7 @@ def _read_index(self) -> None:
index_length = struct.unpack(' None:
blob_offsets = []
offset = 0
for length in blob_lengths:
- if length == -1:
+ if length < 0:
blob_offsets.append(-1)
else:
blob_offsets.append(offset)
@@ -175,6 +181,8 @@ def _read_index(self) -> None:
class BlobRecordIterator:
MAGIC_NUMBER_SIZE = 4
METADATA_OVERHEAD = 16
+ NULL_LENGTH = -1
+ PLACE_HOLDER_LENGTH = -2
def __init__(self, file_io: FileIO, file_path: str, blob_lengths: List[int],
blob_offsets: List[int], field_name: str):
@@ -192,13 +200,17 @@ def __next__(self) -> GenericRow:
if self.current_position >= len(self.blob_lengths):
raise StopIteration
fields = [DataField(0, self.field_name, AtomicType("BLOB"))]
- if self.blob_lengths[self.current_position] == -1:
+ length = self.blob_lengths[self.current_position]
+ if length == self.NULL_LENGTH:
self.current_position += 1
return GenericRow([None], fields, RowKind.INSERT)
+ if length == self.PLACE_HOLDER_LENGTH:
+ self.current_position += 1
+ return GenericRow([Blob.PLACE_HOLDER], fields, RowKind.INSERT)
# Create blob reference for the current blob
# Skip magic number (4 bytes) and exclude length (8 bytes) + CRC (4 bytes) = 12 bytes
blob_offset = self.blob_offsets[self.current_position] + self.MAGIC_NUMBER_SIZE # Skip magic number
- blob_length = self.blob_lengths[self.current_position] - self.METADATA_OVERHEAD
+ blob_length = length - self.METADATA_OVERHEAD
blob = Blob.from_file(self.file_io, self.file_path, blob_offset, blob_length)
self.current_position += 1
return GenericRow([blob], fields, RowKind.INSERT)
diff --git a/paimon-python/pypaimon/table/row/blob.py b/paimon-python/pypaimon/table/row/blob.py
index b619b6a76aec..73974d73e8e4 100644
--- a/paimon-python/pypaimon/table/row/blob.py
+++ b/paimon-python/pypaimon/table/row/blob.py
@@ -277,6 +277,21 @@ def from_descriptor(uri_reader: UriReader, descriptor: BlobDescriptor) -> 'Blob'
return BlobRef(uri_reader, descriptor)
+class _PlaceholderBlob(Blob):
+
+ def to_data(self) -> bytes:
+ raise RuntimeError("Should never call this method for placeholder blob.")
+
+ def to_descriptor(self) -> BlobDescriptor:
+ raise RuntimeError("Should never call this method for placeholder blob.")
+
+ def new_input_stream(self) -> BinaryIO:
+ raise RuntimeError("Should never call this method for placeholder blob.")
+
+
+Blob.PLACE_HOLDER = _PlaceholderBlob()
+
+
class BlobData(Blob):
def __init__(self, data: Optional[Union[bytes, bytearray]] = None):
diff --git a/paimon-python/pypaimon/tests/blob_test.py b/paimon-python/pypaimon/tests/blob_test.py
index d9cf64210d61..1883e9dbbb3c 100644
--- a/paimon-python/pypaimon/tests/blob_test.py
+++ b/paimon-python/pypaimon/tests/blob_test.py
@@ -29,7 +29,7 @@
from pypaimon.common.file_io import FileIO
from pypaimon.filesystem.local_file_io import LocalFileIO
from pypaimon.common.options import Options
-from pypaimon.read.reader.format_blob_reader import FormatBlobReader
+from pypaimon.read.reader.format_blob_reader import BlobRecordIterator, FormatBlobReader
from pypaimon.schema.data_types import AtomicType, DataField
from pypaimon.table.row.blob import Blob, BlobData, BlobRef, BlobDescriptor
from pypaimon.table.row.generic_row import GenericRowDeserializer, GenericRowSerializer, GenericRow
@@ -1264,6 +1264,79 @@ def test_null_blob_read_as_descriptor(self):
self.assertEqual(desc2.uri, blob_file_path)
reader.close()
+ def test_placeholder_blob_write_read(self):
+ from pypaimon.write.blob_format_writer import BlobFormatWriter
+
+ file_io = LocalFileIO(self.temp_dir, Options({}))
+ blob_file_path = os.path.join(self.temp_dir, "placeholder_blob.blob")
+
+ output = open(blob_file_path, 'wb')
+ writer = BlobFormatWriter(output)
+ fields = [DataField(0, "blob_field", AtomicType("BLOB"))]
+ writer.add_element(GenericRow([BlobData(b"hello")], fields, RowKind.INSERT))
+ writer.add_element(GenericRow([Blob.PLACE_HOLDER], fields, RowKind.INSERT))
+ writer.add_element(GenericRow([None], fields, RowKind.INSERT))
+ writer.add_element(GenericRow([BlobData(b"world")], fields, RowKind.INSERT))
+ self.assertEqual(
+ writer.lengths[1:3],
+ [BlobFormatWriter.PLACE_HOLDER_LENGTH, BlobFormatWriter.NULL_LENGTH])
+ writer.close()
+
+ with open(blob_file_path, 'rb') as blob_file:
+ blob_file.seek(-1, os.SEEK_END)
+ self.assertEqual(blob_file.read(1), struct.pack(' None:
blob_value = row.values[0]
if blob_value is None:
- self.lengths.append(-1)
+ self.lengths.append(self.NULL_LENGTH)
return
if not isinstance(blob_value, Blob):
raise ValueError("Field must be Blob/BlobData instance")
+ if blob_value is Blob.PLACE_HOLDER:
+ self.lengths.append(self.PLACE_HOLDER_LENGTH)
+ return
+
previous_pos = self.position
crc32 = 0 # Initialize CRC32
@@ -97,7 +103,7 @@ def write_value(self, col_data, fields, uri_reader_factory=None) -> None:
if col_data is None:
if not is_blob:
raise RuntimeError("Null values are only supported for BLOB type fields")
- self.lengths.append(-1)
+ self.lengths.append(self.NULL_LENGTH)
return
if is_blob: