diff --git a/paimon-python/pypaimon/manifest/schema/data_file_meta.py b/paimon-python/pypaimon/manifest/schema/data_file_meta.py index 98f29b6e0afe..2aecfc21c2e7 100644 --- a/paimon-python/pypaimon/manifest/schema/data_file_meta.py +++ b/paimon-python/pypaimon/manifest/schema/data_file_meta.py @@ -16,15 +16,19 @@ # under the License. from dataclasses import dataclass -from datetime import datetime -from typing import List, Optional +from datetime import date, datetime, time as dt_time +from decimal import Decimal +from base64 import b64decode, b64encode +from typing import Any, Dict, List, Optional import time from pypaimon.utils.range import Range from pypaimon.data.timestamp import Timestamp from pypaimon.manifest.schema.simple_stats import (KEY_STATS_SCHEMA, VALUE_STATS_SCHEMA, SimpleStats) +from pypaimon.schema.data_types import DataField from pypaimon.table.row.generic_row import GenericRow +from pypaimon.table.row.internal_row import RowKind from pypaimon.utils.file_store_path_factory import _is_null_or_whitespace_only @@ -223,6 +227,186 @@ def assign_sequence_number(self, min_sequence_number: int, max_sequence_number: file_path=self.file_path ) + def to_dict(self) -> Dict[str, Any]: + """Serialize to a JSON-friendly dict for cross-process transport (e.g. Ray task payloads). + + Field types preserved via tagged objects (see encode_value/decode_value). + """ + return { + "file_name": self.file_name, + "file_size": self.file_size, + "row_count": self.row_count, + "min_key": _generic_row_to_dict(self.min_key), + "max_key": _generic_row_to_dict(self.max_key), + "key_stats": _simple_stats_to_dict(self.key_stats), + "value_stats": _simple_stats_to_dict(self.value_stats), + "min_sequence_number": self.min_sequence_number, + "max_sequence_number": self.max_sequence_number, + "schema_id": self.schema_id, + "level": self.level, + "extra_files": list(self.extra_files) if self.extra_files is not None else [], + "creation_time": _timestamp_to_dict(self.creation_time), + "delete_row_count": self.delete_row_count, + "embedded_index": _bytes_to_str(self.embedded_index), + "file_source": self.file_source, + "value_stats_cols": list(self.value_stats_cols) if self.value_stats_cols is not None else None, + "external_path": self.external_path, + "first_row_id": self.first_row_id, + "write_cols": list(self.write_cols) if self.write_cols is not None else None, + "file_path": self.file_path, + } + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> "DataFileMeta": + return cls( + file_name=data["file_name"], + file_size=data["file_size"], + row_count=data["row_count"], + min_key=_generic_row_from_dict(data.get("min_key")), + max_key=_generic_row_from_dict(data.get("max_key")), + key_stats=_simple_stats_from_dict(data.get("key_stats")), + value_stats=_simple_stats_from_dict(data.get("value_stats")), + min_sequence_number=data["min_sequence_number"], + max_sequence_number=data["max_sequence_number"], + schema_id=data["schema_id"], + level=data["level"], + extra_files=list(data.get("extra_files") or []), + creation_time=_timestamp_from_dict(data.get("creation_time")), + delete_row_count=data.get("delete_row_count"), + embedded_index=_bytes_from_str(data.get("embedded_index")), + file_source=data.get("file_source"), + value_stats_cols=list(data["value_stats_cols"]) if data.get("value_stats_cols") is not None else None, + external_path=data.get("external_path"), + first_row_id=data.get("first_row_id"), + write_cols=list(data["write_cols"]) if data.get("write_cols") is not None else None, + file_path=data.get("file_path"), + ) + + +def _bytes_to_str(value: Optional[bytes]) -> Optional[str]: + if value is None: + return None + return b64encode(value).decode("ascii") + + +def _bytes_from_str(value: Optional[str]) -> Optional[bytes]: + if value is None: + return None + return b64decode(value.encode("ascii")) + + +def _timestamp_to_dict(ts: Optional[Timestamp]) -> Optional[Dict[str, int]]: + if ts is None: + return None + return {"ms": ts.get_millisecond(), "ns": ts.get_nano_of_millisecond()} + + +def _timestamp_from_dict(data: Optional[Dict[str, int]]) -> Optional[Timestamp]: + if data is None: + return None + return Timestamp(data["ms"], data.get("ns", 0)) + + +def encode_value(value: Any) -> Any: + """Encode a GenericRow / SimpleStats / partition field value into a JSON-friendly form. + + Tagged dicts mark non-JSON-native types so decode_value can round-trip them. + Public so that callers serializing other field-bearing structures (e.g. partitions + in CommitMessage) can reuse the same tagged encoding. + """ + if value is None or isinstance(value, (bool, int, float, str)): + return value + if isinstance(value, bytes): + return {"__t__": "bytes", "v": b64encode(value).decode("ascii")} + if isinstance(value, Decimal): + return {"__t__": "decimal", "v": str(value)} + if isinstance(value, Timestamp): + return {"__t__": "ts", "ms": value.get_millisecond(), "ns": value.get_nano_of_millisecond()} + if isinstance(value, datetime): + return {"__t__": "datetime", "v": value.isoformat()} + if isinstance(value, date): + return {"__t__": "date", "v": value.isoformat()} + if isinstance(value, dt_time): + return {"__t__": "time", "v": value.isoformat()} + raise TypeError( + f"Unsupported value type for DataFileMeta serialization: {type(value).__name__}" + ) + + +def decode_value(value: Any) -> Any: + if not isinstance(value, dict) or "__t__" not in value: + return value + tag = value["__t__"] + if tag == "bytes": + return b64decode(value["v"].encode("ascii")) + if tag == "decimal": + return Decimal(value["v"]) + if tag == "ts": + return Timestamp(value["ms"], value.get("ns", 0)) + if tag == "datetime": + return datetime.fromisoformat(value["v"]) + if tag == "date": + return date.fromisoformat(value["v"]) + if tag == "time": + return dt_time.fromisoformat(value["v"]) + raise ValueError(f"Unknown tagged value type: {tag}") + + +def _generic_row_to_dict(row) -> Optional[Dict[str, Any]]: + if row is None: + return None + # GenericRow exposes .values directly; BinaryRow lazily decodes per field + # via get_field(i). Normalize both into a list of decoded Python values + # so the dict format stays uniform. + if hasattr(row, "values"): + values = row.values + else: + values = [row.get_field(i) for i in range(len(row))] + fields = getattr(row, "fields", None) + return { + "values": [encode_value(v) for v in values], + "fields": [f.to_dict() for f in fields] if fields else [], + "row_kind": row.get_row_kind().value if hasattr(row, "get_row_kind") else 0, + } + + +def _generic_row_from_dict(data: Optional[Dict[str, Any]]) -> Optional[GenericRow]: + if data is None: + return None + fields = [DataField.from_dict(f) for f in data.get("fields", [])] + values = [decode_value(v) for v in data.get("values", [])] + row_kind = RowKind(data.get("row_kind", RowKind.INSERT.value)) + return GenericRow(values, fields, row_kind) + + +def _simple_stats_to_dict(stats: Optional[SimpleStats]) -> Optional[Dict[str, Any]]: + if stats is None: + return None + # null_counts may be a Python list (writer path) or a pyarrow Array-like + # (manifest reader path). Normalize to a plain list of ints. + nc = stats.null_counts + if nc is None: + null_counts = [] + elif hasattr(nc, "to_pylist"): + null_counts = nc.to_pylist() + else: + null_counts = list(nc) + return { + "min_values": _generic_row_to_dict(stats.min_values), + "max_values": _generic_row_to_dict(stats.max_values), + "null_counts": null_counts, + } + + +def _simple_stats_from_dict(data: Optional[Dict[str, Any]]) -> Optional[SimpleStats]: + if data is None: + return None + return SimpleStats( + min_values=_generic_row_from_dict(data.get("min_values")), + max_values=_generic_row_from_dict(data.get("max_values")), + null_counts=list(data.get("null_counts") or []), + ) + DATA_FILE_META_SCHEMA = { "type": "record", diff --git a/paimon-python/pypaimon/tests/commit_message_serializer_test.py b/paimon-python/pypaimon/tests/commit_message_serializer_test.py new file mode 100644 index 000000000000..d41dcbe0520c --- /dev/null +++ b/paimon-python/pypaimon/tests/commit_message_serializer_test.py @@ -0,0 +1,228 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +import unittest +from datetime import date, datetime, time as dt_time +from decimal import Decimal + +from pypaimon.data.timestamp import Timestamp +from pypaimon.manifest.schema.data_file_meta import DataFileMeta +from pypaimon.manifest.schema.simple_stats import SimpleStats +from pypaimon.schema.data_types import AtomicType, DataField +from pypaimon.table.row.generic_row import GenericRow +from pypaimon.table.row.internal_row import RowKind +from pypaimon.write.commit_message import CommitMessage +from pypaimon.write.commit_message_serializer import CommitMessageSerializer +from pypaimon.write.compact_increment import CompactIncrement +from pypaimon.write.data_increment import DataIncrement + + +def _key_field(idx: int, name: str, type_str: str) -> DataField: + return DataField(idx, name, AtomicType(type_str)) + + +def _build_data_file_meta(file_name: str = "data-1.parquet") -> DataFileMeta: + pk_fields = [_key_field(0, "id", "BIGINT"), _key_field(1, "name", "STRING")] + min_key = GenericRow([1, "alice"], pk_fields) + max_key = GenericRow([99, "zoe"], pk_fields) + key_stats = SimpleStats( + min_values=GenericRow([1, "alice"], pk_fields), + max_values=GenericRow([99, "zoe"], pk_fields), + null_counts=[0, 0], + ) + value_stats = SimpleStats( + min_values=GenericRow([], []), + max_values=GenericRow([], []), + null_counts=[], + ) + return DataFileMeta.create( + file_name=file_name, + file_size=4096, + row_count=99, + min_key=min_key, + max_key=max_key, + key_stats=key_stats, + value_stats=value_stats, + min_sequence_number=10, + max_sequence_number=200, + schema_id=0, + level=0, + extra_files=["index-1.idx"], + creation_time=Timestamp.from_epoch_millis(1_700_000_000_000, 123_456), + delete_row_count=2, + embedded_index=b"\x00\x01\x02\x03embedded", + file_source=1, + value_stats_cols=["c1"], + external_path="oss://bucket/path/to/file", + first_row_id=1000, + write_cols=["id", "name"], + file_path="/abs/path/data-1.parquet", + ) + + +class DataFileMetaSerdeTest(unittest.TestCase): + + def test_to_from_dict_roundtrip(self): + original = _build_data_file_meta() + rebuilt = DataFileMeta.from_dict(original.to_dict()) + + self.assertEqual(original, rebuilt) + # spot check of complex sub-fields that use tagged encoding + self.assertEqual(original.embedded_index, rebuilt.embedded_index) + self.assertEqual(original.creation_time, rebuilt.creation_time) + self.assertEqual(original.min_key.values, rebuilt.min_key.values) + self.assertEqual(original.min_key.row_kind, rebuilt.min_key.row_kind) + self.assertEqual( + [f.to_dict() for f in original.min_key.fields], + [f.to_dict() for f in rebuilt.min_key.fields], + ) + + def test_value_encoding_supports_decimal_and_temporal_types(self): + fields = [ + _key_field(0, "amount", "DECIMAL(10, 2)"), + _key_field(1, "ts", "TIMESTAMP(6)"), + _key_field(2, "d", "DATE"), + _key_field(3, "t", "TIME"), + _key_field(4, "blob", "BYTES"), + ] + row = GenericRow( + values=[ + Decimal("12.34"), + datetime(2024, 1, 2, 3, 4, 5, 678901), + date(2024, 1, 2), + dt_time(13, 45, 30, 250000), + b"binary-payload", + ], + fields=fields, + row_kind=RowKind.UPDATE_AFTER, + ) + # Reuse the GenericRow encode path through SimpleStats + stats = SimpleStats(min_values=row, max_values=row, null_counts=[0, 0, 0, 0, 0]) + meta = _build_data_file_meta() + meta.key_stats = stats + + rebuilt = DataFileMeta.from_dict(meta.to_dict()) + + self.assertEqual(rebuilt.key_stats.min_values.values[0], Decimal("12.34")) + self.assertEqual(rebuilt.key_stats.min_values.values[1], datetime(2024, 1, 2, 3, 4, 5, 678901)) + self.assertEqual(rebuilt.key_stats.min_values.values[2], date(2024, 1, 2)) + self.assertEqual(rebuilt.key_stats.min_values.values[3], dt_time(13, 45, 30, 250000)) + self.assertEqual(rebuilt.key_stats.min_values.values[4], b"binary-payload") + self.assertEqual(rebuilt.key_stats.min_values.row_kind, RowKind.UPDATE_AFTER) + + +class CommitMessageSerializerTest(unittest.TestCase): + + def test_serialize_deserialize_roundtrip_for_compact_message(self): + before_files = [_build_data_file_meta(f"old-{i}.parquet") for i in range(3)] + after_files = [_build_data_file_meta("new-merged.parquet")] + message = CommitMessage( + partition=("2024-01-01", "us"), + bucket=2, + total_buckets=8, + compact_increment=CompactIncrement( + compact_before=before_files, + compact_after=after_files, + ), + check_from_snapshot=42, + ) + + payload = CommitMessageSerializer.serialize(message) + rebuilt = CommitMessageSerializer.deserialize(payload) + + self.assertIsInstance(payload, bytes) + self.assertEqual(message.partition, rebuilt.partition) + self.assertEqual(message.bucket, rebuilt.bucket) + self.assertEqual(message.total_buckets, rebuilt.total_buckets) + self.assertEqual(message.new_files, rebuilt.new_files) + self.assertEqual(message.compact_before, rebuilt.compact_before) + self.assertEqual(message.compact_after, rebuilt.compact_after) + self.assertEqual(message.check_from_snapshot, rebuilt.check_from_snapshot) + + def test_serialize_deserialize_roundtrip_for_append_message(self): + message = CommitMessage( + partition=(), + bucket=0, + data_increment=DataIncrement(new_files=[_build_data_file_meta("append-1.parquet")]), + ) + + rebuilt = CommitMessageSerializer.deserialize(CommitMessageSerializer.serialize(message)) + + self.assertEqual(message.partition, rebuilt.partition) + self.assertEqual(message.bucket, rebuilt.bucket) + self.assertEqual(message.new_files, rebuilt.new_files) + self.assertEqual([], rebuilt.compact_before) + self.assertEqual([], rebuilt.compact_after) + + def test_unsupported_version_is_rejected(self): + message = CommitMessage( + partition=(), + bucket=0, + data_increment=DataIncrement(new_files=[_build_data_file_meta()]), + ) + payload_dict = CommitMessageSerializer.to_dict(message) + payload_dict["version"] = CommitMessageSerializer.VERSION + 1 + + with self.assertRaises(ValueError): + CommitMessageSerializer.from_dict(payload_dict) + + def test_serialize_supports_partition_with_non_json_native_types(self): + # Partitions can carry DATE/DECIMAL/bytes columns; serializer must round-trip them. + message = CommitMessage( + partition=(date(2024, 1, 2), Decimal("99.50"), b"raw"), + bucket=0, + compact_increment=CompactIncrement(compact_after=[_build_data_file_meta()]), + ) + + rebuilt = CommitMessageSerializer.deserialize(CommitMessageSerializer.serialize(message)) + + self.assertEqual((date(2024, 1, 2), Decimal("99.50"), b"raw"), rebuilt.partition) + + def test_serialize_supports_timestamp_partition(self): + ts = Timestamp.from_epoch_millis(1_700_000_000_000, 500_000) + message = CommitMessage( + partition=(ts,), + bucket=0, + compact_increment=CompactIncrement(compact_after=[_build_data_file_meta()]), + ) + + rebuilt = CommitMessageSerializer.deserialize(CommitMessageSerializer.serialize(message)) + + self.assertEqual((ts,), rebuilt.partition) + + def test_serialize_list_round_trip(self): + messages = [ + CommitMessage( + partition=(f"p{i}",), + bucket=i, + data_increment=DataIncrement(new_files=[_build_data_file_meta(f"f{i}.parquet")]), + ) + for i in range(3) + ] + payloads = CommitMessageSerializer.serialize_list(messages) + rebuilt = CommitMessageSerializer.deserialize_list(payloads) + + self.assertEqual(len(messages), len(rebuilt)) + for original, copy in zip(messages, rebuilt): + self.assertEqual(original.partition, copy.partition) + self.assertEqual(original.bucket, copy.bucket) + self.assertEqual(original.new_files, copy.new_files) + + +if __name__ == "__main__": + unittest.main() diff --git a/paimon-python/pypaimon/tests/file_store_commit_compact_test.py b/paimon-python/pypaimon/tests/file_store_commit_compact_test.py new file mode 100644 index 000000000000..f8ff34b06bf0 --- /dev/null +++ b/paimon-python/pypaimon/tests/file_store_commit_compact_test.py @@ -0,0 +1,252 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +import unittest +from datetime import datetime +from unittest.mock import Mock, patch + +from pypaimon.data.timestamp import Timestamp +from pypaimon.manifest.schema.data_file_meta import DataFileMeta +from pypaimon.manifest.schema.simple_stats import SimpleStats +from pypaimon.schema.data_types import AtomicType, DataField +from pypaimon.table.row.generic_row import GenericRow +from pypaimon.write.commit_message import CommitMessage +from pypaimon.write.compact_increment import CompactIncrement +from pypaimon.write.data_increment import DataIncrement +from pypaimon.write.file_store_commit import FileStoreCommit + + +def _make_file(name: str, *, first_row_id=None) -> DataFileMeta: + return DataFileMeta.create( + file_name=name, + file_size=4096, + row_count=10, + min_key=GenericRow([], []), + max_key=GenericRow([], []), + key_stats=SimpleStats.empty_stats(), + value_stats=SimpleStats.empty_stats(), + min_sequence_number=1, + max_sequence_number=10, + schema_id=0, + level=0, + extra_files=[], + creation_time=Timestamp.from_local_date_time(datetime(2024, 1, 15, 10, 30, 0)), + first_row_id=first_row_id, + ) + + +@patch('pypaimon.write.file_store_commit.ManifestFileManager') +@patch('pypaimon.write.file_store_commit.ManifestListManager') +class TestFileStoreCommitCompact(unittest.TestCase): + """Phase 1 protocol-level tests: verify compact_before/after entries flow correctly through commit(). + + These tests stub _try_commit so we only verify the entry-construction and commit_kind selection. + Full e2e (with real manifest writes / scans) is covered in Phase 2 once the rewriter exists. + """ + + def setUp(self): + self.mock_table = Mock() + self.mock_table.partition_keys = ['dt'] + self.mock_table.partition_keys_fields = [DataField(0, 'dt', AtomicType('STRING'))] + self.mock_table.total_buckets = 4 + self.mock_table.current_branch.return_value = 'main' + self.mock_table.identifier = 'default.t' + self.mock_snapshot_commit = Mock() + + def _create_commit(self): + return FileStoreCommit( + snapshot_commit=self.mock_snapshot_commit, + table=self.mock_table, + commit_user='test_user', + ) + + def test_build_entries_emits_add_for_new_files(self, *_): + commit = self._create_commit() + msg = CommitMessage( + partition=('2024-01-15',), + bucket=2, + data_increment=DataIncrement(new_files=[_make_file('a.parquet')]), + ) + + entries = commit._build_commit_entries([msg]) + + self.assertEqual(1, len(entries)) + self.assertEqual(0, entries[0].kind) + self.assertEqual(2, entries[0].bucket) + self.assertEqual('a.parquet', entries[0].file.file_name) + self.assertEqual(['2024-01-15'], list(entries[0].partition.values)) + + def test_build_entries_emits_delete_for_compact_before_and_add_for_compact_after(self, *_): + commit = self._create_commit() + msg = CommitMessage( + partition=('2024-01-15',), + bucket=1, + compact_increment=CompactIncrement( + compact_before=[_make_file('old-1.parquet'), _make_file('old-2.parquet')], + compact_after=[_make_file('merged.parquet')], + ), + ) + + entries = commit._build_commit_entries([msg]) + + kinds = [e.kind for e in entries] + names = [e.file.file_name for e in entries] + self.assertEqual([1, 1, 0], kinds) + self.assertEqual(['old-1.parquet', 'old-2.parquet', 'merged.parquet'], names) + self.assertTrue(all(e.bucket == 1 for e in entries)) + + def test_commit_compact_uses_compact_kind_and_no_conflict_detection(self, *_): + commit = self._create_commit() + commit._try_commit = Mock() + msg = CommitMessage( + partition=('p1',), + bucket=3, + compact_increment=CompactIncrement( + compact_before=[_make_file('old.parquet')], + compact_after=[_make_file('new.parquet')], + ), + ) + + commit.commit_compact([msg], commit_identifier=300) + + commit._try_commit.assert_called_once() + kwargs = commit._try_commit.call_args.kwargs + self.assertEqual('COMPACT', kwargs['commit_kind']) + self.assertEqual(300, kwargs['commit_identifier']) + self.assertFalse(kwargs['detect_conflicts']) + self.assertFalse(kwargs['allow_rollback']) + + def test_commit_compact_rejects_messages_with_new_files(self, *_): + commit = self._create_commit() + msg = CommitMessage( + partition=('p1',), + bucket=0, + data_increment=DataIncrement(new_files=[_make_file('append.parquet')]), + compact_increment=CompactIncrement( + compact_before=[_make_file('old.parquet')], + compact_after=[_make_file('new.parquet')], + ), + ) + + with self.assertRaises(ValueError): + commit.commit_compact([msg], commit_identifier=400) + + def test_commit_compact_skips_when_no_messages(self, *_): + commit = self._create_commit() + commit._try_commit = Mock() + + commit.commit_compact([], commit_identifier=500) + + commit._try_commit.assert_not_called() + + def test_commit_compact_skips_when_messages_have_no_files(self, *_): + commit = self._create_commit() + commit._try_commit = Mock() + empty_msg = CommitMessage(partition=('p1',), bucket=0) + + commit.commit_compact([empty_msg], commit_identifier=600) + + commit._try_commit.assert_not_called() + + # _build_commit_entries must reject increment fields it does not yet + # wire up so a future writer cannot silently drop them at commit time. + + def test_build_entries_rejects_data_increment_deleted_files(self, *_): + commit = self._create_commit() + msg = CommitMessage( + partition=('2024-01-15',), + bucket=0, + data_increment=DataIncrement(deleted_files=[_make_file('d.parquet')]), + ) + with self.assertRaises(NotImplementedError): + commit._build_commit_entries([msg]) + + def test_build_entries_rejects_data_increment_changelog_files(self, *_): + commit = self._create_commit() + msg = CommitMessage( + partition=('2024-01-15',), + bucket=0, + data_increment=DataIncrement(changelog_files=[_make_file('c.parquet')]), + ) + with self.assertRaises(NotImplementedError): + commit._build_commit_entries([msg]) + + def test_build_entries_rejects_compact_increment_changelog_files(self, *_): + commit = self._create_commit() + msg = CommitMessage( + partition=('2024-01-15',), + bucket=0, + compact_increment=CompactIncrement(changelog_files=[_make_file('c.parquet')]), + ) + with self.assertRaises(NotImplementedError): + commit._build_commit_entries([msg]) + + def test_build_entries_uses_message_total_buckets_when_set(self, *_): + """A stale plan whose bucket count has been rescaled should keep the + message's own total_buckets, not be silently overwritten with the + table's current value. + """ + commit = self._create_commit() + # Table claims 4 buckets; message was planned when there were 8. + self.mock_table.total_buckets = 4 + msg = CommitMessage( + partition=('2024-01-15',), + bucket=0, + total_buckets=8, + data_increment=DataIncrement(new_files=[_make_file('a.parquet')]), + ) + + entries = commit._build_commit_entries([msg]) + + self.assertEqual(1, len(entries)) + self.assertEqual(8, entries[0].total_buckets) + + def test_commit_rejects_messages_with_compact_increment(self, *_): + """commit() is the write-side entry; compaction results must go through + commit_compact(). A caller that mixes them is almost certainly wrong + (would otherwise silently flatten two snapshot kinds into one).""" + commit = self._create_commit() + commit._try_commit = Mock() + msg = CommitMessage( + partition=('p1',), + bucket=0, + compact_increment=CompactIncrement(compact_after=[_make_file('a.parquet')]), + ) + + with self.assertRaises(ValueError): + commit.commit([msg], commit_identifier=700) + + commit._try_commit.assert_not_called() + + def test_build_entries_falls_back_to_table_total_buckets_when_message_has_none(self, *_): + commit = self._create_commit() + self.mock_table.total_buckets = 4 + msg = CommitMessage( + partition=('2024-01-15',), + bucket=0, + total_buckets=None, + data_increment=DataIncrement(new_files=[_make_file('a.parquet')]), + ) + + entries = commit._build_commit_entries([msg]) + + self.assertEqual(4, entries[0].total_buckets) + + +if __name__ == '__main__': + unittest.main() diff --git a/paimon-python/pypaimon/tests/file_store_commit_test.py b/paimon-python/pypaimon/tests/file_store_commit_test.py index ec13d4041add..68ff4c37f9f2 100644 --- a/paimon-python/pypaimon/tests/file_store_commit_test.py +++ b/paimon-python/pypaimon/tests/file_store_commit_test.py @@ -24,6 +24,7 @@ from pypaimon.snapshot.snapshot_commit import PartitionStatistics from pypaimon.table.row.generic_row import GenericRow from pypaimon.write.commit_message import CommitMessage +from pypaimon.write.data_increment import DataIncrement from pypaimon.write.file_store_commit import FileStoreCommit @@ -86,7 +87,7 @@ def test_generate_partition_statistics_single_partition_single_file( commit_message = CommitMessage( partition=('2024-01-15', 'us-east-1'), bucket=0, - new_files=[file_meta] + data_increment=DataIncrement(new_files=[file_meta]) ) # Test method @@ -151,7 +152,7 @@ def test_generate_partition_statistics_multiple_files_same_partition( commit_message = CommitMessage( partition=('2024-01-15', 'us-east-1'), bucket=0, - new_files=[file_meta_1, file_meta_2] + data_increment=DataIncrement(new_files=[file_meta_1, file_meta_2]) ) # Test method @@ -223,13 +224,13 @@ def test_generate_partition_statistics_multiple_partitions( commit_message_1 = CommitMessage( partition=('2024-01-15', 'us-east-1'), bucket=0, - new_files=[file_meta_1] + data_increment=DataIncrement(new_files=[file_meta_1]) ) commit_message_2 = CommitMessage( partition=('2024-01-15', 'us-west-2'), bucket=0, - new_files=[file_meta_2] + data_increment=DataIncrement(new_files=[file_meta_2]) ) # Test method @@ -292,7 +293,7 @@ def test_generate_partition_statistics_unpartitioned_table( commit_message = CommitMessage( partition=(), # Empty partition for unpartitioned table bucket=0, - new_files=[file_meta] + data_increment=DataIncrement(new_files=[file_meta]) ) # Test method @@ -331,7 +332,7 @@ def test_generate_partition_statistics_no_creation_time( commit_message = CommitMessage( partition=('2024-01-15', 'us-east-1'), bucket=0, - new_files=[file_meta] + data_increment=DataIncrement(new_files=[file_meta]) ) # Test method @@ -373,7 +374,7 @@ def test_generate_partition_statistics_mismatched_partition_keys( commit_message = CommitMessage( partition=('2024-01-15', 'us-east-1', 'extra-value'), # 3 values but table has 2 keys bucket=0, - new_files=[file_meta] + data_increment=DataIncrement(new_files=[file_meta]) ) # Test method diff --git a/paimon-python/pypaimon/tests/partition_predicate_test.py b/paimon-python/pypaimon/tests/partition_predicate_test.py index abf5212d0e75..005303217724 100644 --- a/paimon-python/pypaimon/tests/partition_predicate_test.py +++ b/paimon-python/pypaimon/tests/partition_predicate_test.py @@ -28,6 +28,7 @@ from pypaimon.table.row.offset_row import OffsetRow from pypaimon.write.commit.commit_scanner import CommitScanner from pypaimon.write.commit_message import CommitMessage +from pypaimon.write.data_increment import DataIncrement from pypaimon.write.file_store_commit import FileStoreCommit PARTITION_FIELDS = [ @@ -170,7 +171,11 @@ def _create_commit(self, stub_commit=True): @staticmethod def _msg(partition): - return CommitMessage(partition=partition, bucket=0, new_files=[Mock(row_count=10)]) + return CommitMessage( + partition=partition, + bucket=0, + data_increment=DataIncrement(new_files=[Mock(row_count=10)]), + ) def _extract_partition_predicate(self, commit): entries_plan = commit._try_commit.call_args[1]['commit_entries_plan'] diff --git a/paimon-python/pypaimon/tests/table_commit_test.py b/paimon-python/pypaimon/tests/table_commit_test.py index d0b9d62762df..d84a7c417d4a 100644 --- a/paimon-python/pypaimon/tests/table_commit_test.py +++ b/paimon-python/pypaimon/tests/table_commit_test.py @@ -22,6 +22,7 @@ from pypaimon.snapshot.snapshot import BATCH_COMMIT_IDENTIFIER from pypaimon.write.commit_message import CommitMessage +from pypaimon.write.data_increment import DataIncrement from pypaimon.write.table_commit import BatchTableCommit, StreamTableCommit @@ -51,7 +52,11 @@ def test_overwrite_forwards_filtered_messages(self, name, msg_flags): commit, mock_fsc = self._create_commit(BatchTableCommit, overwrite_partition={'f0': 1}) messages = [ - CommitMessage(partition=(1,), bucket=0, new_files=[Mock()] if has_files else []) + CommitMessage( + partition=(1,), + bucket=0, + data_increment=DataIncrement(new_files=[Mock()] if has_files else []), + ) for has_files in msg_flags ] commit.commit(messages) @@ -74,7 +79,11 @@ def test_append_forwards_non_empty_messages(self, name, msg_flags): commit, mock_fsc = self._create_commit(BatchTableCommit, overwrite_partition=None) messages = [ - CommitMessage(partition=(), bucket=0, new_files=[Mock()] if has_files else []) + CommitMessage( + partition=(), + bucket=0, + data_increment=DataIncrement(new_files=[Mock()] if has_files else []), + ) for has_files in msg_flags ] commit.commit(messages) diff --git a/paimon-python/pypaimon/write/commit_message.py b/paimon-python/pypaimon/write/commit_message.py index 7bce06d8ab13..223c3e572ca5 100644 --- a/paimon-python/pypaimon/write/commit_message.py +++ b/paimon-python/pypaimon/write/commit_message.py @@ -15,18 +15,66 @@ # specific language governing permissions and limitations # under the License. -from dataclasses import dataclass -from typing import List, Tuple, Optional +from dataclasses import dataclass, field +from typing import List, Optional, Tuple from pypaimon.manifest.schema.data_file_meta import DataFileMeta +from pypaimon.write.compact_increment import CompactIncrement +from pypaimon.write.data_increment import DataIncrement @dataclass class CommitMessage: + """File committable for sink. + + Carries everything one (partition, bucket) writer or compactor contributes + to a snapshot, packaged as a (data_increment, compact_increment) pair so + the same message type can describe both pure writes and compaction results. + + - partition / bucket: identify the (partition, bucket) the message + applies to. + - total_buckets: number of buckets the table had at write time, used by + the commit path to detect bucket-count changes. + - data_increment: ADD/DELETE/changelog/index deltas from a normal write. + - compact_increment: ADD/DELETE/changelog/index deltas from compaction. + - check_from_snapshot: row-tracking conflict-detection anchor; -1 means + "no check" (default). Read per-message by write/commit/conflict_detection.py. + """ + partition: Tuple bucket: int - new_files: List[DataFileMeta] + total_buckets: Optional[int] = None + data_increment: DataIncrement = field(default_factory=DataIncrement) + compact_increment: CompactIncrement = field(default_factory=CompactIncrement) check_from_snapshot: Optional[int] = -1 - def is_empty(self): - return not self.new_files + # ---- Convenience accessors --------------------------------------------- + # Callers usually want the individual file lists rather than reaching + # through the increment. + + @property + def new_files(self) -> List[DataFileMeta]: + return self.data_increment.new_files + + @property + def deleted_files(self) -> List[DataFileMeta]: + return self.data_increment.deleted_files + + @property + def changelog_files(self) -> List[DataFileMeta]: + return self.data_increment.changelog_files + + @property + def compact_before(self) -> List[DataFileMeta]: + return self.compact_increment.compact_before + + @property + def compact_after(self) -> List[DataFileMeta]: + return self.compact_increment.compact_after + + @property + def compact_changelog_files(self) -> List[DataFileMeta]: + return self.compact_increment.changelog_files + + def is_empty(self) -> bool: + return self.data_increment.is_empty() and self.compact_increment.is_empty() diff --git a/paimon-python/pypaimon/write/commit_message_serializer.py b/paimon-python/pypaimon/write/commit_message_serializer.py new file mode 100644 index 000000000000..d218d4e91f65 --- /dev/null +++ b/paimon-python/pypaimon/write/commit_message_serializer.py @@ -0,0 +1,164 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +import json +from typing import Any, Dict, List + +from pypaimon.index.index_file_meta import IndexFileMeta +from pypaimon.manifest.schema.data_file_meta import (DataFileMeta, decode_value, + encode_value) +from pypaimon.write.commit_message import CommitMessage +from pypaimon.write.compact_increment import CompactIncrement +from pypaimon.write.data_increment import DataIncrement + + +class CommitMessageSerializer: + """JSON wire format for shipping CommitMessage between pypaimon processes + (e.g. Ray driver ↔ workers). + + JSON on purpose: human-debuggable, tolerant of Python version drift, and + avoids pickle's compat/security pitfalls when shipping CompactTask outputs + from Ray workers back to the driver. + + Every message round-trips (partition, bucket, total_buckets, data_increment, + compact_increment), with each increment carrying its own new / deleted / + changelog file lists plus index file deltas. Today the index slots are + populated only by tables that opt into them; the serializer round-trips + them either way so adding deletion vectors / global index later does + not need a new payload version. + """ + + # Wire format version; bump on incompatible payload changes. + VERSION = 1 + + @classmethod + def serialize(cls, message: CommitMessage) -> bytes: + return json.dumps(cls.to_dict(message), separators=(",", ":")).encode("utf-8") + + @classmethod + def deserialize(cls, payload: bytes) -> CommitMessage: + return cls.from_dict(json.loads(payload.decode("utf-8"))) + + @classmethod + def to_dict(cls, message: CommitMessage) -> Dict[str, Any]: + partition = message.partition if message.partition is not None else () + return { + "version": cls.VERSION, + "partition": [encode_value(v) for v in partition], + "bucket": message.bucket, + "total_buckets": message.total_buckets, + "data_increment": cls._data_increment_to_dict(message.data_increment), + "compact_increment": cls._compact_increment_to_dict(message.compact_increment), + "check_from_snapshot": message.check_from_snapshot, + } + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> CommitMessage: + version = data.get("version", cls.VERSION) + if version != cls.VERSION: + raise ValueError( + f"Unsupported CommitMessage payload version: {version} (expected {cls.VERSION})" + ) + partition_values = data.get("partition") or [] + return CommitMessage( + partition=tuple(decode_value(v) for v in partition_values), + bucket=data["bucket"], + total_buckets=data.get("total_buckets"), + data_increment=cls._data_increment_from_dict(data.get("data_increment")), + compact_increment=cls._compact_increment_from_dict(data.get("compact_increment")), + check_from_snapshot=data.get("check_from_snapshot", -1), + ) + + @classmethod + def serialize_list(cls, messages: List[CommitMessage]) -> List[bytes]: + return [cls.serialize(m) for m in messages] + + @classmethod + def deserialize_list(cls, payloads: List[bytes]) -> List[CommitMessage]: + return [cls.deserialize(p) for p in payloads] + + # ---- Increment helpers ------------------------------------------------- + + @classmethod + def _data_increment_to_dict(cls, inc: DataIncrement) -> Dict[str, Any]: + return { + "new_files": [f.to_dict() for f in inc.new_files], + "deleted_files": [f.to_dict() for f in inc.deleted_files], + "changelog_files": [f.to_dict() for f in inc.changelog_files], + "new_index_files": [_index_file_to_dict(i) for i in inc.new_index_files], + "deleted_index_files": [_index_file_to_dict(i) for i in inc.deleted_index_files], + } + + @classmethod + def _data_increment_from_dict(cls, data) -> DataIncrement: + if not data: + return DataIncrement() + return DataIncrement( + new_files=[DataFileMeta.from_dict(f) for f in data.get("new_files") or []], + deleted_files=[DataFileMeta.from_dict(f) for f in data.get("deleted_files") or []], + changelog_files=[DataFileMeta.from_dict(f) for f in data.get("changelog_files") or []], + new_index_files=[_index_file_from_dict(i) for i in data.get("new_index_files") or []], + deleted_index_files=[_index_file_from_dict(i) for i in data.get("deleted_index_files") or []], + ) + + @classmethod + def _compact_increment_to_dict(cls, inc: CompactIncrement) -> Dict[str, Any]: + return { + "compact_before": [f.to_dict() for f in inc.compact_before], + "compact_after": [f.to_dict() for f in inc.compact_after], + "changelog_files": [f.to_dict() for f in inc.changelog_files], + "new_index_files": [_index_file_to_dict(i) for i in inc.new_index_files], + "deleted_index_files": [_index_file_to_dict(i) for i in inc.deleted_index_files], + } + + @classmethod + def _compact_increment_from_dict(cls, data) -> CompactIncrement: + if not data: + return CompactIncrement() + return CompactIncrement( + compact_before=[DataFileMeta.from_dict(f) for f in data.get("compact_before") or []], + compact_after=[DataFileMeta.from_dict(f) for f in data.get("compact_after") or []], + changelog_files=[DataFileMeta.from_dict(f) for f in data.get("changelog_files") or []], + new_index_files=[_index_file_from_dict(i) for i in data.get("new_index_files") or []], + deleted_index_files=[_index_file_from_dict(i) for i in data.get("deleted_index_files") or []], + ) + + +# IndexFileMeta has richer payloads (deletion vector ranges, global index +# meta) that aren't relevant to the basic compaction path yet — round-trip +# only the scalar identity fields here. Phase 6/7 (deletion vectors, +# changelog producer) will extend this to cover dv_ranges and +# global_index_meta as the rewriter starts producing them. +def _index_file_to_dict(idx: IndexFileMeta) -> Dict[str, Any]: + return { + "index_type": idx.index_type, + "file_name": idx.file_name, + "file_size": idx.file_size, + "row_count": idx.row_count, + "external_path": idx.external_path, + } + + +def _index_file_from_dict(data: Dict[str, Any]) -> IndexFileMeta: + return IndexFileMeta( + index_type=data["index_type"], + file_name=data["file_name"], + file_size=data["file_size"], + row_count=data["row_count"], + external_path=data.get("external_path"), + ) diff --git a/paimon-python/pypaimon/write/compact_increment.py b/paimon-python/pypaimon/write/compact_increment.py new file mode 100644 index 000000000000..6e6d6e0a3722 --- /dev/null +++ b/paimon-python/pypaimon/write/compact_increment.py @@ -0,0 +1,56 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +from dataclasses import dataclass, field +from typing import List + +from pypaimon.index.index_file_meta import IndexFileMeta +from pypaimon.manifest.schema.data_file_meta import DataFileMeta + + +@dataclass +class CompactIncrement: + """Files changed before and after compaction, with changelog produced during compaction. + + - compact_before: input files consumed by compaction (DELETE entries). + - compact_after: rewritten output files (ADD entries). + - changelog_files: changelog files emitted while compacting (used by the + full-compaction changelog producer; empty in the basic dedup path). + - new_index_files / deleted_index_files: index file deltas attributable + to this compaction (deletion vectors / global index updates). Empty + lists by default. + """ + + compact_before: List[DataFileMeta] = field(default_factory=list) + compact_after: List[DataFileMeta] = field(default_factory=list) + changelog_files: List[DataFileMeta] = field(default_factory=list) + new_index_files: List[IndexFileMeta] = field(default_factory=list) + deleted_index_files: List[IndexFileMeta] = field(default_factory=list) + + def is_empty(self) -> bool: + return ( + not self.compact_before + and not self.compact_after + and not self.changelog_files + and not self.new_index_files + and not self.deleted_index_files + ) + + @classmethod + def empty_increment(cls) -> "CompactIncrement": + return cls() diff --git a/paimon-python/pypaimon/write/data_increment.py b/paimon-python/pypaimon/write/data_increment.py new file mode 100644 index 000000000000..fa052a333794 --- /dev/null +++ b/paimon-python/pypaimon/write/data_increment.py @@ -0,0 +1,60 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +from dataclasses import dataclass, field +from typing import List + +from pypaimon.index.index_file_meta import IndexFileMeta +from pypaimon.manifest.schema.data_file_meta import DataFileMeta + + +@dataclass +class DataIncrement: + """Increment of data files, changelog files and index files produced by a write. + + Carries everything one write attempt contributes to a snapshot, so a + CommitMessage can be constructed from a (DataIncrement, CompactIncrement) + pair. + + - new_files: data files this write created (ADD entries). + - deleted_files: data files this write removed without compaction + (e.g. row-level delete in data-evolution tables); ADD/DELETE asymmetry + is preserved by giving each list its own slot. + - changelog_files: changelog data files associated with this write. + - new_index_files / deleted_index_files: index file deltas (deletion + vectors, global index, ...). Empty lists by default. + """ + + new_files: List[DataFileMeta] = field(default_factory=list) + deleted_files: List[DataFileMeta] = field(default_factory=list) + changelog_files: List[DataFileMeta] = field(default_factory=list) + new_index_files: List[IndexFileMeta] = field(default_factory=list) + deleted_index_files: List[IndexFileMeta] = field(default_factory=list) + + def is_empty(self) -> bool: + return ( + not self.new_files + and not self.deleted_files + and not self.changelog_files + and not self.new_index_files + and not self.deleted_index_files + ) + + @classmethod + def empty_increment(cls) -> "DataIncrement": + return cls() diff --git a/paimon-python/pypaimon/write/file_store_commit.py b/paimon-python/pypaimon/write/file_store_commit.py index d88beb164e03..08d3d231a6c6 100644 --- a/paimon-python/pypaimon/write/file_store_commit.py +++ b/paimon-python/pypaimon/write/file_store_commit.py @@ -110,10 +110,22 @@ def __init__(self, snapshot_commit: SnapshotCommit, table, commit_user: str): self.rollback = CommitRollback(table_rollback) if table_rollback is not None else None def commit(self, commit_messages: List[CommitMessage], commit_identifier: int): - """Commit the given commit messages in normal append mode.""" + """Commit write-side messages (data_increment only). + + Compaction results (compact_before / compact_after) must be committed + through commit_compact() instead — they produce a separate snapshot kind + and skip row-id assignment. + """ if not commit_messages: return + for msg in commit_messages: + if not msg.compact_increment.is_empty(): + raise ValueError( + "commit() rejects messages carrying compact_increment; " + "use commit_compact() for compaction results." + ) + # Extract the minimum check_from_snapshot from commit messages valid_snapshots = [msg.check_from_snapshot for msg in commit_messages if msg.check_from_snapshot != -1] @@ -125,17 +137,7 @@ def commit(self, commit_messages: List[CommitMessage], commit_identifier: int): self.table.identifier, len(commit_messages), ) - commit_entries = [] - for msg in commit_messages: - partition = GenericRow(list(msg.partition), self.table.partition_keys_fields) - for file in msg.new_files: - commit_entries.append(ManifestEntry( - kind=0, - partition=partition, - bucket=msg.bucket, - total_buckets=self.table.total_buckets, - file=file - )) + commit_entries = self._build_commit_entries(commit_messages) logger.info("Finished collecting changes, including: %d entries", len(commit_entries)) @@ -156,6 +158,96 @@ def commit(self, commit_messages: List[CommitMessage], commit_identifier: int): detect_conflicts=detect_conflicts, allow_rollback=allow_rollback) + def commit_compact(self, commit_messages: List[CommitMessage], commit_identifier: int): + """Commit compaction-only messages. + + Each message must carry no new_files; compact_before → DELETE entries, + compact_after → ADD entries; snapshot kind = COMPACT. Skips row-id + assignment and conflict detection (a compaction never produces new rows). + """ + if not commit_messages: + return + + for msg in commit_messages: + if msg.new_files: + raise ValueError( + "commit_compact rejects messages with new_files; use commit() instead." + ) + + logger.info( + "Ready to commit compact to table %s, number of commit messages: %d", + self.table.identifier, + len(commit_messages), + ) + commit_entries = self._build_commit_entries(commit_messages) + if not commit_entries: + return + + logger.info("Finished collecting compact changes: %d entries", len(commit_entries)) + + self._try_commit( + commit_kind="COMPACT", + commit_identifier=commit_identifier, + commit_entries_plan=lambda snapshot: commit_entries, + detect_conflicts=False, + allow_rollback=False, + ) + + def _build_commit_entries(self, commit_messages: List[CommitMessage]) -> List[ManifestEntry]: + # Only new_files / compact_before / compact_after are wired up today; + # reject the other 7 increment slots loudly so a future writer that + # starts filling them cannot silently lose data at commit time. + for msg in commit_messages: + di = msg.data_increment + if (di.deleted_files or di.changelog_files + or di.new_index_files or di.deleted_index_files): + raise NotImplementedError( + "FileStoreCommit does not yet handle DataIncrement.deleted_files / " + "changelog_files / new_index_files / deleted_index_files; these slots " + "will be wired in by the feature that first needs each one." + ) + ci = msg.compact_increment + if (ci.changelog_files or ci.new_index_files or ci.deleted_index_files): + raise NotImplementedError( + "FileStoreCommit does not yet handle CompactIncrement.changelog_files / " + "new_index_files / deleted_index_files; these slots will be wired in by " + "the feature that first needs each one." + ) + + entries: List[ManifestEntry] = [] + for msg in commit_messages: + partition = GenericRow(list(msg.partition), self.table.partition_keys_fields) + # Prefer the message's total_buckets (captured when the plan was + # built) over the current table value, so a plan that survived a + # bucket rescale is not silently rewritten with the new count. + total_buckets = msg.total_buckets if msg.total_buckets is not None \ + else self.table.total_buckets + for file in msg.new_files: + entries.append(ManifestEntry( + kind=0, + partition=partition, + bucket=msg.bucket, + total_buckets=total_buckets, + file=file, + )) + for file in msg.compact_before: + entries.append(ManifestEntry( + kind=1, + partition=partition, + bucket=msg.bucket, + total_buckets=total_buckets, + file=file, + )) + for file in msg.compact_after: + entries.append(ManifestEntry( + kind=0, + partition=partition, + bucket=msg.bucket, + total_buckets=total_buckets, + file=file, + )) + return entries + def overwrite(self, overwrite_partition, commit_messages: List[CommitMessage], commit_identifier: int): """Commit the given commit messages in overwrite mode.""" logger.info( diff --git a/paimon-python/pypaimon/write/file_store_write.py b/paimon-python/pypaimon/write/file_store_write.py index c33fca3792eb..92c61823d565 100644 --- a/paimon-python/pypaimon/write/file_store_write.py +++ b/paimon-python/pypaimon/write/file_store_write.py @@ -22,6 +22,7 @@ from pypaimon.common.options.core_options import CoreOptions from pypaimon.write.commit_message import CommitMessage +from pypaimon.write.data_increment import DataIncrement from pypaimon.write.writer.append_only_data_writer import AppendOnlyDataWriter from pypaimon.write.writer.data_blob_writer import DataBlobWriter from pypaimon.write.writer.data_writer import DataWriter @@ -110,7 +111,8 @@ def prepare_commit(self, commit_identifier) -> List[CommitMessage]: commit_message = CommitMessage( partition=partition, bucket=bucket, - new_files=committed_files + total_buckets=self.table.total_buckets, + data_increment=DataIncrement(new_files=committed_files), ) commit_messages.append(commit_message) return commit_messages diff --git a/paimon-python/pypaimon/write/table_update.py b/paimon-python/pypaimon/write/table_update.py index fe2fb9a64b79..12af0391397d 100644 --- a/paimon-python/pypaimon/write/table_update.py +++ b/paimon-python/pypaimon/write/table_update.py @@ -27,6 +27,7 @@ from pypaimon.read.split import DataSplit from pypaimon.snapshot.snapshot import BATCH_COMMIT_IDENTIFIER from pypaimon.write.commit_message import CommitMessage +from pypaimon.write.data_increment import DataIncrement from pypaimon.write.table_update_by_row_id import TableUpdateByRowId from pypaimon.write.table_upsert_by_key import TableUpsertByKey from pypaimon.write.writer.data_writer import DataWriter @@ -269,7 +270,13 @@ def arrow_reader(self) -> pyarrow.ipc.RecordBatchReader: def prepare_commit(self) -> List[CommitMessage]: commit_messages = [] for (partition, files) in self.dict.items(): - commit_messages.append(CommitMessage(partition, 0, files, self.snapshot_id)) + commit_messages.append(CommitMessage( + partition=partition, + bucket=0, + total_buckets=self.table.total_buckets, + data_increment=DataIncrement(new_files=files), + check_from_snapshot=self.snapshot_id, + )) return commit_messages def update_by_arrow_batch(self, data: pa.RecordBatch):