Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 3 additions & 21 deletions paimon-python/pypaimon/read/reader/data_file_batch_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from pypaimon.read.reader.format_blob_reader import FormatBlobReader
from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
from pypaimon.schema.data_types import DataField, PyarrowFieldParser
from pypaimon.table.row.blob import Blob, BlobDescriptor
from pypaimon.table.row.blob import Blob
from pypaimon.table.special_fields import SpecialFields


Expand Down Expand Up @@ -178,28 +178,10 @@ def _blob_cell_to_data(self, value):
value = self._normalize_blob_cell(value)
if value is None:
return None

if not isinstance(value, bytes):
return value

descriptor = self._deserialize_descriptor_or_none(value)
if descriptor is None:
return value

try:
uri_reader = self.file_io.uri_reader_factory.create(descriptor.uri)
blob = Blob.from_descriptor(uri_reader, descriptor)
return blob.to_data()
except Exception as e:
raise RuntimeError(
"Failed to read blob bytes from descriptor URI while converting blob value."
) from e

@staticmethod
def _deserialize_descriptor_or_none(raw: bytes):
if not BlobDescriptor.is_blob_descriptor(raw):
return None
return BlobDescriptor.deserialize(raw)
blob = Blob.from_bytes(value, self.file_io)
return blob.to_data() if blob is not None else None

def _assign_row_tracking(self, record_batch: RecordBatch) -> RecordBatch:
"""Assign row tracking meta fields (_ROW_ID and _SEQUENCE_NUMBER)."""
Expand Down
10 changes: 7 additions & 3 deletions paimon-python/pypaimon/read/reader/iface/record_batch_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,21 @@ def tuple_iterator(self) -> Optional[Iterator[tuple]]:
return None
return df.iter_rows()

def read_batch(self) -> Optional[RecordIterator[InternalRow]]:
def read_batch(self, file_io=None, blob_field_indices=None) -> Optional[RecordIterator[InternalRow]]:
df = self.read_next_df()
if df is None:
return None
return InternalRowWrapperIterator(df.iter_rows(), df.width)
return InternalRowWrapperIterator(
df.iter_rows(), df.width, file_io, blob_field_indices)


class InternalRowWrapperIterator(RecordIterator[InternalRow]):
def __init__(self, iterator: Iterator[tuple], width: int):
def __init__(self, iterator: Iterator[tuple], width: int,
file_io=None, blob_field_indices=None):
self._iterator = iterator
self._reused_row = OffsetRow(None, 0, width)
if file_io is not None and blob_field_indices:
self._reused_row.with_blob_context(file_io, blob_field_indices)

def next(self) -> Optional[InternalRow]:
row_tuple = next(self._iterator, None)
Expand Down
49 changes: 49 additions & 0 deletions paimon-python/pypaimon/read/table_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,55 @@ def _record_generator():

return _record_generator()

def to_blob_iterator(self, splits: List[Split]) -> Iterator:
"""Iterator where blob fields are accessible via row.get_blob(pos).

Unlike to_iterator() which eagerly resolves blobs to bytes,
this returns rows with lazy Blob access supporting streaming.
"""
from pypaimon.common.options.core_options import CoreOptions

blob_field_indices = {
i for i, field in enumerate(self.read_type)
if hasattr(field.type, 'type') and field.type.type == 'BLOB'
}
file_io = self.table.file_io
limit = self.limit

# Force blob-as-descriptor=true so descriptors are preserved
original_value = self.table.options.blob_as_descriptor()
self.table.options.set(CoreOptions.BLOB_AS_DESCRIPTOR, True)

def _blob_record_generator():
try:
count = 0
for split in splits:
if limit is not None and count >= limit:
return
reader = self._create_split_read(split).create_reader()
try:
for batch in iter(
lambda: reader.read_batch(file_io, blob_field_indices),
None
):
for row in iter(batch.next, None):
yield row
count += 1
if limit is not None and count >= limit:
return
finally:
reader.close()
finally:
# Restore original option
if original_value is not None:
self.table.options.set(
CoreOptions.BLOB_AS_DESCRIPTOR, original_value)
else:
self.table.options.options.data.pop(
CoreOptions.BLOB_AS_DESCRIPTOR.key(), None)

return _blob_record_generator()

def to_arrow_batch_reader(self, splits: List[Split]) -> pyarrow.ipc.RecordBatchReader:
schema = PyarrowFieldParser.from_paimon_schema(self.read_type)
if self.include_row_kind:
Expand Down
15 changes: 15 additions & 0 deletions paimon-python/pypaimon/table/row/blob.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,21 @@ def from_file(file_io, file_path: str, offset: int, length: int) -> 'Blob':
def from_descriptor(uri_reader: UriReader, descriptor: BlobDescriptor) -> 'Blob':
return BlobRef(uri_reader, descriptor)

@staticmethod
def from_bytes(data: Optional[bytes], file_io=None, allow_blob_data: bool = True) -> Optional['Blob']:
if data is None:
return None
if not isinstance(data, (bytes, bytearray)):
raise TypeError(f"Blob.from_bytes expects bytes, got {type(data)}")
data = bytes(data)
if BlobDescriptor.is_blob_descriptor(data) or not allow_blob_data:
if file_io is None:
raise ValueError("file_io is required to resolve BlobDescriptor bytes")
descriptor = BlobDescriptor.deserialize(data)
uri_reader = file_io.uri_reader_factory.create(descriptor.uri)
return BlobRef(uri_reader, descriptor)
return BlobData(data)


class BlobData(Blob):

Expand Down
9 changes: 9 additions & 0 deletions paimon-python/pypaimon/table/row/generic_row.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,15 @@ def get_field(self, pos: int) -> Any:
raise IndexError(f"Position {pos} is out of bounds for row arity {len(self.values)}")
return self.values[pos]

def get_blob(self, pos: int):
from pypaimon.table.row.blob import Blob
value = self.get_field(pos)
if value is None:
return None
if isinstance(value, Blob):
return value
raise TypeError(f"Cannot get Blob from {type(value)} at position {pos}")

def get_row_kind(self) -> RowKind:
return self.row_kind

Expand Down
11 changes: 10 additions & 1 deletion paimon-python/pypaimon/table/row/internal_row.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# under the License.

from abc import ABC, abstractmethod
from typing import Any
from typing import Any, Optional

from pypaimon.table.row.row_kind import RowKind

Expand Down Expand Up @@ -45,6 +45,15 @@ def __len__(self) -> int:
The number does not include RowKind. It is kept separately.
"""

def get_blob(self, pos: int) -> Optional[Any]:
"""Returns the Blob at the given position, or None if null.

Requires a blob-aware row context. Use TableRead.to_blob_iterator().
"""
raise NotImplementedError(
"get_blob() requires a blob-aware row. Use TableRead.to_blob_iterator()."
)

def __str__(self) -> str:
fields = []
for pos in range(self.__len__()):
Expand Down
27 changes: 26 additions & 1 deletion paimon-python/pypaimon/table/row/offset_row.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# specific language governing permissions and limitations
# under the License.

from typing import Optional
from typing import Optional, Set

from pypaimon.table.row.internal_row import InternalRow, RowKind

Expand All @@ -28,6 +28,13 @@ def __init__(self, row_tuple: Optional[tuple], offset: int, arity: int):
self.offset = offset
self.arity = arity
self.row_kind_byte: int = 1
self._file_io = None
self._blob_field_indices: Optional[Set[int]] = None

def with_blob_context(self, file_io, blob_field_indices: Set[int]) -> 'OffsetRow':
self._file_io = file_io
self._blob_field_indices = blob_field_indices
return self

def replace(self, row_tuple: tuple) -> 'OffsetRow':
self.row_tuple = row_tuple
Expand All @@ -46,6 +53,24 @@ def get_field(self, pos: int):
raise IndexError(f"Position {pos} is out of bounds for row arity {self.arity}")
return self.row_tuple[self.offset + pos]

def get_blob(self, pos: int):
from pypaimon.table.row.blob import Blob, BlobDescriptor

if self._blob_field_indices is not None and pos not in self._blob_field_indices:
raise TypeError(f"Field at position {pos} is not a BLOB field")
value = self.get_field(pos)
if value is None:
return None
if isinstance(value, (bytes, bytearray)):
value = bytes(value)
if BlobDescriptor.is_blob_descriptor(value):
descriptor = BlobDescriptor.deserialize(value)
uri_reader = self._file_io.uri_reader_factory.create(descriptor.uri)
return Blob.from_descriptor(uri_reader, descriptor)
else:
return Blob.from_data(value)
raise TypeError(f"Cannot convert {type(value)} to Blob")

def get_row_kind(self) -> RowKind:
return RowKind(self.row_kind_byte)

Expand Down
91 changes: 91 additions & 0 deletions paimon-python/pypaimon/tests/blob_table_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3184,5 +3184,96 @@ def test_rename_blob_column_should_fail(self):
self.assertIn('Cannot rename BLOB column', str(ctx.exception))


class GetBlobTest(unittest.TestCase):

@classmethod
def setUpClass(cls):
cls.temp_dir = tempfile.mkdtemp()
cls.warehouse = os.path.join(cls.temp_dir, 'warehouse')
cls.catalog = CatalogFactory.create({'warehouse': cls.warehouse})
cls.catalog.create_database('test_db', False)

pa_schema = pa.schema([
('id', pa.int32()),
('name', pa.string()),
('picture', pa.large_binary()),
])
schema = Schema.from_pyarrow_schema(pa_schema, options={
'row-tracking.enabled': 'true',
'data-evolution.enabled': 'true',
})
cls.catalog.create_table('test_db.get_blob_test', schema, False)
cls.table = cls.catalog.get_table('test_db.get_blob_test')

data = pa.Table.from_pydict({
'id': [1, 2, 3],
'name': ['a', 'b', 'c'],
'picture': [b'img_data_1', b'img_data_2', b'img_data_3'],
}, schema=pa_schema)

write_builder = cls.table.new_batch_write_builder()
writer = write_builder.new_write()
writer.write_arrow(data)
commit_messages = writer.prepare_commit()
commit = write_builder.new_commit()
commit.commit(commit_messages)
writer.close()

@classmethod
def tearDownClass(cls):
shutil.rmtree(cls.temp_dir, ignore_errors=True)

def test_get_blob_lazy_access(self):
read_builder = self.table.new_read_builder()
splits = read_builder.new_scan().plan().splits()
read = read_builder.new_read()

results = []
for row in read.to_blob_iterator(splits):
blob = row.get_blob(2)
self.assertIsNotNone(blob)
results.append((row.get_field(0), blob.to_data()))

self.assertEqual(len(results), 3)
results.sort(key=lambda x: x[0])
self.assertEqual(results[0], (1, b'img_data_1'))
self.assertEqual(results[1], (2, b'img_data_2'))
self.assertEqual(results[2], (3, b'img_data_3'))

def test_get_blob_streaming(self):
read_builder = self.table.new_read_builder()
splits = read_builder.new_scan().plan().splits()
read = read_builder.new_read()

for row in read.to_blob_iterator(splits):
blob = row.get_blob(2)
with blob.new_input_stream() as stream:
data = stream.read()
self.assertTrue(data.startswith(b'img_data_'))
break

def test_get_blob_non_blob_field_raises(self):
read_builder = self.table.new_read_builder()
splits = read_builder.new_scan().plan().splits()
read = read_builder.new_read()

for row in read.to_blob_iterator(splits):
with self.assertRaises(TypeError):
row.get_blob(0)
break

def test_to_iterator_unchanged(self):
read_builder = self.table.new_read_builder()
splits = read_builder.new_scan().plan().splits()
read = read_builder.new_read()

count = 0
for row in read.to_iterator(splits):
self.assertIsNotNone(row.get_field(0))
self.assertIsNotNone(row.get_field(1))
count += 1
self.assertEqual(count, 3)


if __name__ == '__main__':
unittest.main()
37 changes: 37 additions & 0 deletions paimon-python/pypaimon/tests/blob_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,43 @@ def test_from_http(self):
self.assertEqual(descriptor.offset, 0)
self.assertEqual(descriptor.length, -1)

def test_from_bytes_with_raw_data(self):
raw = b"hello blob"
blob = Blob.from_bytes(raw)
self.assertIsInstance(blob, BlobData)
self.assertEqual(blob.to_data(), raw)

def test_from_bytes_with_none(self):
self.assertIsNone(Blob.from_bytes(None))

def test_from_bytes_with_descriptor(self):
import tempfile
import os
data = b"actual blob content"
tmp = tempfile.NamedTemporaryFile(delete=False)
tmp.write(data)
tmp.close()

descriptor = BlobDescriptor(tmp.name, 0, len(data))
serialized = descriptor.serialize()

from pypaimon.common.file_io import FileIO
file_io = FileIO.get(f"file://{os.path.dirname(tmp.name)}", {})
blob = Blob.from_bytes(serialized, file_io)
self.assertIsInstance(blob, BlobRef)
self.assertEqual(blob.to_data(), data)
os.unlink(tmp.name)

def test_from_bytes_descriptor_without_file_io_raises(self):
descriptor = BlobDescriptor("/tmp/fake", 0, 10)
serialized = descriptor.serialize()
with self.assertRaises(ValueError):
Blob.from_bytes(serialized)

def test_from_bytes_invalid_type_raises(self):
with self.assertRaises(TypeError):
Blob.from_bytes(12345)

def test_blob_data_interface_compliance(self):
"""Test that BlobData properly implements Blob interface."""
test_data = b"interface test data"
Expand Down
Loading