From c105416db2444dac67a392aaa60288b3752578c2 Mon Sep 17 00:00:00 2001 From: TheR1sing3un Date: Sat, 16 May 2026 17:43:06 +0800 Subject: [PATCH 1/2] [python] Extend commit protocol for compaction (DataIncrement/CompactIncrement) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Lay the protocol-level groundwork for upcoming compaction work in pypaimon by aligning CommitMessage with Java's CommitMessageImpl shape and adding a JSON-safe wire format for cross-process transport. Structural changes: - New DataIncrement (write side) and CompactIncrement (compaction side) value objects, direct ports of org.apache.paimon.io.DataIncrement and CompactIncrement. Each holds (new_files, deleted_files, changelog_files, new_index_files, deleted_index_files) so future deletion-vector / changelog work has an unambiguous slot. - CommitMessage refactored to (partition, bucket, total_buckets, data_increment, compact_increment, check_from_snapshot). Convenience properties (new_files, compact_before, compact_after, ...) preserve read-site ergonomics. - FileStoreCommit emits ADD entries for compact_after, DELETE entries for compact_before, and auto-selects commit_kind=COMPACT when a message carries only compact increments. A dedicated commit_compact() helper enforces COMPACT-only semantics with no row-id assignment. - FileStoreWrite / TableUpdate construct CommitMessage via DataIncrement on the existing write path — no behavior change for current callers. DataFileMeta serde: - to_dict / from_dict round-trip with tagged-value encoding for bytes, Decimal, datetime, date, time, and Timestamp so file metas can ship through JSON-only transports (e.g. Ray task payloads later). - Public encode_value / decode_value helpers reused by CommitMessage's partition tuples (DATE / DECIMAL / bytes / Timestamp partitions). - Tolerates manifest-side BinaryRow (lazy-decoded) and pyarrow Array-like null_counts so round-tripping a freshly-produced file meta doesn't fail. CommitMessageSerializer: - VERSION=1 wire format covering full DataIncrement + CompactIncrement shape (including IndexFileMeta identity fields). dv_ranges / global_index_meta will be wired up alongside deletion-vector phases. No observable behavior change for read / write / commit today; this is foundation for the compaction module, append-only compaction job, PK LSM compaction, and Ray distributed executor that land in follow-up PRs (#7771 originally bundled all of them). Test plan: - New commit_message_serializer_test: round-trip CommitMessage with DataIncrement / CompactIncrement / index files / non-JSON-native partition tuples (DATE, Decimal, bytes, Timestamp); IndexFileMeta round-trip; unknown-version rejection. - New file_store_commit_compact_test: protocol-level coverage of compact_before -> DELETE entry, compact_after -> ADD entry, and auto-COMPACT kind selection (full e2e covered when the compactor lands). - Existing file_store_commit_test / partition_predicate_test / table_commit_test updated to construct CommitMessage via DataIncrement instead of the legacy new_files= kwarg. Refs: split from #7771 to ease incremental community review. --- .../manifest/schema/data_file_meta.py | 188 ++++++++++++++- .../tests/commit_message_serializer_test.py | 228 ++++++++++++++++++ .../tests/file_store_commit_compact_test.py | 205 ++++++++++++++++ .../pypaimon/tests/file_store_commit_test.py | 15 +- .../tests/partition_predicate_test.py | 7 +- .../pypaimon/tests/table_commit_test.py | 13 +- .../pypaimon/write/commit_message.py | 59 ++++- .../write/commit_message_serializer.py | 163 +++++++++++++ .../pypaimon/write/compact_increment.py | 58 +++++ .../pypaimon/write/data_increment.py | 61 +++++ .../pypaimon/write/file_store_commit.py | 86 ++++++- .../pypaimon/write/file_store_write.py | 4 +- paimon-python/pypaimon/write/table_update.py | 9 +- 13 files changed, 1064 insertions(+), 32 deletions(-) create mode 100644 paimon-python/pypaimon/tests/commit_message_serializer_test.py create mode 100644 paimon-python/pypaimon/tests/file_store_commit_compact_test.py create mode 100644 paimon-python/pypaimon/write/commit_message_serializer.py create mode 100644 paimon-python/pypaimon/write/compact_increment.py create mode 100644 paimon-python/pypaimon/write/data_increment.py diff --git a/paimon-python/pypaimon/manifest/schema/data_file_meta.py b/paimon-python/pypaimon/manifest/schema/data_file_meta.py index 98f29b6e0afe..2aecfc21c2e7 100644 --- a/paimon-python/pypaimon/manifest/schema/data_file_meta.py +++ b/paimon-python/pypaimon/manifest/schema/data_file_meta.py @@ -16,15 +16,19 @@ # under the License. from dataclasses import dataclass -from datetime import datetime -from typing import List, Optional +from datetime import date, datetime, time as dt_time +from decimal import Decimal +from base64 import b64decode, b64encode +from typing import Any, Dict, List, Optional import time from pypaimon.utils.range import Range from pypaimon.data.timestamp import Timestamp from pypaimon.manifest.schema.simple_stats import (KEY_STATS_SCHEMA, VALUE_STATS_SCHEMA, SimpleStats) +from pypaimon.schema.data_types import DataField from pypaimon.table.row.generic_row import GenericRow +from pypaimon.table.row.internal_row import RowKind from pypaimon.utils.file_store_path_factory import _is_null_or_whitespace_only @@ -223,6 +227,186 @@ def assign_sequence_number(self, min_sequence_number: int, max_sequence_number: file_path=self.file_path ) + def to_dict(self) -> Dict[str, Any]: + """Serialize to a JSON-friendly dict for cross-process transport (e.g. Ray task payloads). + + Field types preserved via tagged objects (see encode_value/decode_value). + """ + return { + "file_name": self.file_name, + "file_size": self.file_size, + "row_count": self.row_count, + "min_key": _generic_row_to_dict(self.min_key), + "max_key": _generic_row_to_dict(self.max_key), + "key_stats": _simple_stats_to_dict(self.key_stats), + "value_stats": _simple_stats_to_dict(self.value_stats), + "min_sequence_number": self.min_sequence_number, + "max_sequence_number": self.max_sequence_number, + "schema_id": self.schema_id, + "level": self.level, + "extra_files": list(self.extra_files) if self.extra_files is not None else [], + "creation_time": _timestamp_to_dict(self.creation_time), + "delete_row_count": self.delete_row_count, + "embedded_index": _bytes_to_str(self.embedded_index), + "file_source": self.file_source, + "value_stats_cols": list(self.value_stats_cols) if self.value_stats_cols is not None else None, + "external_path": self.external_path, + "first_row_id": self.first_row_id, + "write_cols": list(self.write_cols) if self.write_cols is not None else None, + "file_path": self.file_path, + } + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> "DataFileMeta": + return cls( + file_name=data["file_name"], + file_size=data["file_size"], + row_count=data["row_count"], + min_key=_generic_row_from_dict(data.get("min_key")), + max_key=_generic_row_from_dict(data.get("max_key")), + key_stats=_simple_stats_from_dict(data.get("key_stats")), + value_stats=_simple_stats_from_dict(data.get("value_stats")), + min_sequence_number=data["min_sequence_number"], + max_sequence_number=data["max_sequence_number"], + schema_id=data["schema_id"], + level=data["level"], + extra_files=list(data.get("extra_files") or []), + creation_time=_timestamp_from_dict(data.get("creation_time")), + delete_row_count=data.get("delete_row_count"), + embedded_index=_bytes_from_str(data.get("embedded_index")), + file_source=data.get("file_source"), + value_stats_cols=list(data["value_stats_cols"]) if data.get("value_stats_cols") is not None else None, + external_path=data.get("external_path"), + first_row_id=data.get("first_row_id"), + write_cols=list(data["write_cols"]) if data.get("write_cols") is not None else None, + file_path=data.get("file_path"), + ) + + +def _bytes_to_str(value: Optional[bytes]) -> Optional[str]: + if value is None: + return None + return b64encode(value).decode("ascii") + + +def _bytes_from_str(value: Optional[str]) -> Optional[bytes]: + if value is None: + return None + return b64decode(value.encode("ascii")) + + +def _timestamp_to_dict(ts: Optional[Timestamp]) -> Optional[Dict[str, int]]: + if ts is None: + return None + return {"ms": ts.get_millisecond(), "ns": ts.get_nano_of_millisecond()} + + +def _timestamp_from_dict(data: Optional[Dict[str, int]]) -> Optional[Timestamp]: + if data is None: + return None + return Timestamp(data["ms"], data.get("ns", 0)) + + +def encode_value(value: Any) -> Any: + """Encode a GenericRow / SimpleStats / partition field value into a JSON-friendly form. + + Tagged dicts mark non-JSON-native types so decode_value can round-trip them. + Public so that callers serializing other field-bearing structures (e.g. partitions + in CommitMessage) can reuse the same tagged encoding. + """ + if value is None or isinstance(value, (bool, int, float, str)): + return value + if isinstance(value, bytes): + return {"__t__": "bytes", "v": b64encode(value).decode("ascii")} + if isinstance(value, Decimal): + return {"__t__": "decimal", "v": str(value)} + if isinstance(value, Timestamp): + return {"__t__": "ts", "ms": value.get_millisecond(), "ns": value.get_nano_of_millisecond()} + if isinstance(value, datetime): + return {"__t__": "datetime", "v": value.isoformat()} + if isinstance(value, date): + return {"__t__": "date", "v": value.isoformat()} + if isinstance(value, dt_time): + return {"__t__": "time", "v": value.isoformat()} + raise TypeError( + f"Unsupported value type for DataFileMeta serialization: {type(value).__name__}" + ) + + +def decode_value(value: Any) -> Any: + if not isinstance(value, dict) or "__t__" not in value: + return value + tag = value["__t__"] + if tag == "bytes": + return b64decode(value["v"].encode("ascii")) + if tag == "decimal": + return Decimal(value["v"]) + if tag == "ts": + return Timestamp(value["ms"], value.get("ns", 0)) + if tag == "datetime": + return datetime.fromisoformat(value["v"]) + if tag == "date": + return date.fromisoformat(value["v"]) + if tag == "time": + return dt_time.fromisoformat(value["v"]) + raise ValueError(f"Unknown tagged value type: {tag}") + + +def _generic_row_to_dict(row) -> Optional[Dict[str, Any]]: + if row is None: + return None + # GenericRow exposes .values directly; BinaryRow lazily decodes per field + # via get_field(i). Normalize both into a list of decoded Python values + # so the dict format stays uniform. + if hasattr(row, "values"): + values = row.values + else: + values = [row.get_field(i) for i in range(len(row))] + fields = getattr(row, "fields", None) + return { + "values": [encode_value(v) for v in values], + "fields": [f.to_dict() for f in fields] if fields else [], + "row_kind": row.get_row_kind().value if hasattr(row, "get_row_kind") else 0, + } + + +def _generic_row_from_dict(data: Optional[Dict[str, Any]]) -> Optional[GenericRow]: + if data is None: + return None + fields = [DataField.from_dict(f) for f in data.get("fields", [])] + values = [decode_value(v) for v in data.get("values", [])] + row_kind = RowKind(data.get("row_kind", RowKind.INSERT.value)) + return GenericRow(values, fields, row_kind) + + +def _simple_stats_to_dict(stats: Optional[SimpleStats]) -> Optional[Dict[str, Any]]: + if stats is None: + return None + # null_counts may be a Python list (writer path) or a pyarrow Array-like + # (manifest reader path). Normalize to a plain list of ints. + nc = stats.null_counts + if nc is None: + null_counts = [] + elif hasattr(nc, "to_pylist"): + null_counts = nc.to_pylist() + else: + null_counts = list(nc) + return { + "min_values": _generic_row_to_dict(stats.min_values), + "max_values": _generic_row_to_dict(stats.max_values), + "null_counts": null_counts, + } + + +def _simple_stats_from_dict(data: Optional[Dict[str, Any]]) -> Optional[SimpleStats]: + if data is None: + return None + return SimpleStats( + min_values=_generic_row_from_dict(data.get("min_values")), + max_values=_generic_row_from_dict(data.get("max_values")), + null_counts=list(data.get("null_counts") or []), + ) + DATA_FILE_META_SCHEMA = { "type": "record", diff --git a/paimon-python/pypaimon/tests/commit_message_serializer_test.py b/paimon-python/pypaimon/tests/commit_message_serializer_test.py new file mode 100644 index 000000000000..d41dcbe0520c --- /dev/null +++ b/paimon-python/pypaimon/tests/commit_message_serializer_test.py @@ -0,0 +1,228 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +import unittest +from datetime import date, datetime, time as dt_time +from decimal import Decimal + +from pypaimon.data.timestamp import Timestamp +from pypaimon.manifest.schema.data_file_meta import DataFileMeta +from pypaimon.manifest.schema.simple_stats import SimpleStats +from pypaimon.schema.data_types import AtomicType, DataField +from pypaimon.table.row.generic_row import GenericRow +from pypaimon.table.row.internal_row import RowKind +from pypaimon.write.commit_message import CommitMessage +from pypaimon.write.commit_message_serializer import CommitMessageSerializer +from pypaimon.write.compact_increment import CompactIncrement +from pypaimon.write.data_increment import DataIncrement + + +def _key_field(idx: int, name: str, type_str: str) -> DataField: + return DataField(idx, name, AtomicType(type_str)) + + +def _build_data_file_meta(file_name: str = "data-1.parquet") -> DataFileMeta: + pk_fields = [_key_field(0, "id", "BIGINT"), _key_field(1, "name", "STRING")] + min_key = GenericRow([1, "alice"], pk_fields) + max_key = GenericRow([99, "zoe"], pk_fields) + key_stats = SimpleStats( + min_values=GenericRow([1, "alice"], pk_fields), + max_values=GenericRow([99, "zoe"], pk_fields), + null_counts=[0, 0], + ) + value_stats = SimpleStats( + min_values=GenericRow([], []), + max_values=GenericRow([], []), + null_counts=[], + ) + return DataFileMeta.create( + file_name=file_name, + file_size=4096, + row_count=99, + min_key=min_key, + max_key=max_key, + key_stats=key_stats, + value_stats=value_stats, + min_sequence_number=10, + max_sequence_number=200, + schema_id=0, + level=0, + extra_files=["index-1.idx"], + creation_time=Timestamp.from_epoch_millis(1_700_000_000_000, 123_456), + delete_row_count=2, + embedded_index=b"\x00\x01\x02\x03embedded", + file_source=1, + value_stats_cols=["c1"], + external_path="oss://bucket/path/to/file", + first_row_id=1000, + write_cols=["id", "name"], + file_path="/abs/path/data-1.parquet", + ) + + +class DataFileMetaSerdeTest(unittest.TestCase): + + def test_to_from_dict_roundtrip(self): + original = _build_data_file_meta() + rebuilt = DataFileMeta.from_dict(original.to_dict()) + + self.assertEqual(original, rebuilt) + # spot check of complex sub-fields that use tagged encoding + self.assertEqual(original.embedded_index, rebuilt.embedded_index) + self.assertEqual(original.creation_time, rebuilt.creation_time) + self.assertEqual(original.min_key.values, rebuilt.min_key.values) + self.assertEqual(original.min_key.row_kind, rebuilt.min_key.row_kind) + self.assertEqual( + [f.to_dict() for f in original.min_key.fields], + [f.to_dict() for f in rebuilt.min_key.fields], + ) + + def test_value_encoding_supports_decimal_and_temporal_types(self): + fields = [ + _key_field(0, "amount", "DECIMAL(10, 2)"), + _key_field(1, "ts", "TIMESTAMP(6)"), + _key_field(2, "d", "DATE"), + _key_field(3, "t", "TIME"), + _key_field(4, "blob", "BYTES"), + ] + row = GenericRow( + values=[ + Decimal("12.34"), + datetime(2024, 1, 2, 3, 4, 5, 678901), + date(2024, 1, 2), + dt_time(13, 45, 30, 250000), + b"binary-payload", + ], + fields=fields, + row_kind=RowKind.UPDATE_AFTER, + ) + # Reuse the GenericRow encode path through SimpleStats + stats = SimpleStats(min_values=row, max_values=row, null_counts=[0, 0, 0, 0, 0]) + meta = _build_data_file_meta() + meta.key_stats = stats + + rebuilt = DataFileMeta.from_dict(meta.to_dict()) + + self.assertEqual(rebuilt.key_stats.min_values.values[0], Decimal("12.34")) + self.assertEqual(rebuilt.key_stats.min_values.values[1], datetime(2024, 1, 2, 3, 4, 5, 678901)) + self.assertEqual(rebuilt.key_stats.min_values.values[2], date(2024, 1, 2)) + self.assertEqual(rebuilt.key_stats.min_values.values[3], dt_time(13, 45, 30, 250000)) + self.assertEqual(rebuilt.key_stats.min_values.values[4], b"binary-payload") + self.assertEqual(rebuilt.key_stats.min_values.row_kind, RowKind.UPDATE_AFTER) + + +class CommitMessageSerializerTest(unittest.TestCase): + + def test_serialize_deserialize_roundtrip_for_compact_message(self): + before_files = [_build_data_file_meta(f"old-{i}.parquet") for i in range(3)] + after_files = [_build_data_file_meta("new-merged.parquet")] + message = CommitMessage( + partition=("2024-01-01", "us"), + bucket=2, + total_buckets=8, + compact_increment=CompactIncrement( + compact_before=before_files, + compact_after=after_files, + ), + check_from_snapshot=42, + ) + + payload = CommitMessageSerializer.serialize(message) + rebuilt = CommitMessageSerializer.deserialize(payload) + + self.assertIsInstance(payload, bytes) + self.assertEqual(message.partition, rebuilt.partition) + self.assertEqual(message.bucket, rebuilt.bucket) + self.assertEqual(message.total_buckets, rebuilt.total_buckets) + self.assertEqual(message.new_files, rebuilt.new_files) + self.assertEqual(message.compact_before, rebuilt.compact_before) + self.assertEqual(message.compact_after, rebuilt.compact_after) + self.assertEqual(message.check_from_snapshot, rebuilt.check_from_snapshot) + + def test_serialize_deserialize_roundtrip_for_append_message(self): + message = CommitMessage( + partition=(), + bucket=0, + data_increment=DataIncrement(new_files=[_build_data_file_meta("append-1.parquet")]), + ) + + rebuilt = CommitMessageSerializer.deserialize(CommitMessageSerializer.serialize(message)) + + self.assertEqual(message.partition, rebuilt.partition) + self.assertEqual(message.bucket, rebuilt.bucket) + self.assertEqual(message.new_files, rebuilt.new_files) + self.assertEqual([], rebuilt.compact_before) + self.assertEqual([], rebuilt.compact_after) + + def test_unsupported_version_is_rejected(self): + message = CommitMessage( + partition=(), + bucket=0, + data_increment=DataIncrement(new_files=[_build_data_file_meta()]), + ) + payload_dict = CommitMessageSerializer.to_dict(message) + payload_dict["version"] = CommitMessageSerializer.VERSION + 1 + + with self.assertRaises(ValueError): + CommitMessageSerializer.from_dict(payload_dict) + + def test_serialize_supports_partition_with_non_json_native_types(self): + # Partitions can carry DATE/DECIMAL/bytes columns; serializer must round-trip them. + message = CommitMessage( + partition=(date(2024, 1, 2), Decimal("99.50"), b"raw"), + bucket=0, + compact_increment=CompactIncrement(compact_after=[_build_data_file_meta()]), + ) + + rebuilt = CommitMessageSerializer.deserialize(CommitMessageSerializer.serialize(message)) + + self.assertEqual((date(2024, 1, 2), Decimal("99.50"), b"raw"), rebuilt.partition) + + def test_serialize_supports_timestamp_partition(self): + ts = Timestamp.from_epoch_millis(1_700_000_000_000, 500_000) + message = CommitMessage( + partition=(ts,), + bucket=0, + compact_increment=CompactIncrement(compact_after=[_build_data_file_meta()]), + ) + + rebuilt = CommitMessageSerializer.deserialize(CommitMessageSerializer.serialize(message)) + + self.assertEqual((ts,), rebuilt.partition) + + def test_serialize_list_round_trip(self): + messages = [ + CommitMessage( + partition=(f"p{i}",), + bucket=i, + data_increment=DataIncrement(new_files=[_build_data_file_meta(f"f{i}.parquet")]), + ) + for i in range(3) + ] + payloads = CommitMessageSerializer.serialize_list(messages) + rebuilt = CommitMessageSerializer.deserialize_list(payloads) + + self.assertEqual(len(messages), len(rebuilt)) + for original, copy in zip(messages, rebuilt): + self.assertEqual(original.partition, copy.partition) + self.assertEqual(original.bucket, copy.bucket) + self.assertEqual(original.new_files, copy.new_files) + + +if __name__ == "__main__": + unittest.main() diff --git a/paimon-python/pypaimon/tests/file_store_commit_compact_test.py b/paimon-python/pypaimon/tests/file_store_commit_compact_test.py new file mode 100644 index 000000000000..9e786dcd8075 --- /dev/null +++ b/paimon-python/pypaimon/tests/file_store_commit_compact_test.py @@ -0,0 +1,205 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +import unittest +from datetime import datetime +from unittest.mock import Mock, patch + +from pypaimon.data.timestamp import Timestamp +from pypaimon.manifest.schema.data_file_meta import DataFileMeta +from pypaimon.manifest.schema.simple_stats import SimpleStats +from pypaimon.schema.data_types import AtomicType, DataField +from pypaimon.table.row.generic_row import GenericRow +from pypaimon.write.commit_message import CommitMessage +from pypaimon.write.compact_increment import CompactIncrement +from pypaimon.write.data_increment import DataIncrement +from pypaimon.write.file_store_commit import FileStoreCommit + + +def _make_file(name: str, *, first_row_id=None) -> DataFileMeta: + return DataFileMeta.create( + file_name=name, + file_size=4096, + row_count=10, + min_key=GenericRow([], []), + max_key=GenericRow([], []), + key_stats=SimpleStats.empty_stats(), + value_stats=SimpleStats.empty_stats(), + min_sequence_number=1, + max_sequence_number=10, + schema_id=0, + level=0, + extra_files=[], + creation_time=Timestamp.from_local_date_time(datetime(2024, 1, 15, 10, 30, 0)), + first_row_id=first_row_id, + ) + + +@patch('pypaimon.write.file_store_commit.ManifestFileManager') +@patch('pypaimon.write.file_store_commit.ManifestListManager') +class TestFileStoreCommitCompact(unittest.TestCase): + """Phase 1 protocol-level tests: verify compact_before/after entries flow correctly through commit(). + + These tests stub _try_commit so we only verify the entry-construction and commit_kind selection. + Full e2e (with real manifest writes / scans) is covered in Phase 2 once the rewriter exists. + """ + + def setUp(self): + self.mock_table = Mock() + self.mock_table.partition_keys = ['dt'] + self.mock_table.partition_keys_fields = [DataField(0, 'dt', AtomicType('STRING'))] + self.mock_table.total_buckets = 4 + self.mock_table.current_branch.return_value = 'main' + self.mock_table.identifier = 'default.t' + self.mock_snapshot_commit = Mock() + + def _create_commit(self): + return FileStoreCommit( + snapshot_commit=self.mock_snapshot_commit, + table=self.mock_table, + commit_user='test_user', + ) + + def test_build_entries_emits_add_for_new_files(self, *_): + commit = self._create_commit() + msg = CommitMessage( + partition=('2024-01-15',), + bucket=2, + data_increment=DataIncrement(new_files=[_make_file('a.parquet')]), + ) + + entries = commit._build_commit_entries([msg]) + + self.assertEqual(1, len(entries)) + self.assertEqual(0, entries[0].kind) + self.assertEqual(2, entries[0].bucket) + self.assertEqual('a.parquet', entries[0].file.file_name) + self.assertEqual(['2024-01-15'], list(entries[0].partition.values)) + + def test_build_entries_emits_delete_for_compact_before_and_add_for_compact_after(self, *_): + commit = self._create_commit() + msg = CommitMessage( + partition=('2024-01-15',), + bucket=1, + compact_increment=CompactIncrement( + compact_before=[_make_file('old-1.parquet'), _make_file('old-2.parquet')], + compact_after=[_make_file('merged.parquet')], + ), + ) + + entries = commit._build_commit_entries([msg]) + + kinds = [e.kind for e in entries] + names = [e.file.file_name for e in entries] + self.assertEqual([1, 1, 0], kinds) + self.assertEqual(['old-1.parquet', 'old-2.parquet', 'merged.parquet'], names) + self.assertTrue(all(e.bucket == 1 for e in entries)) + + def test_commit_with_only_compact_messages_uses_compact_kind(self, *_): + commit = self._create_commit() + commit._try_commit = Mock() + msg = CommitMessage( + partition=('p1',), + bucket=0, + compact_increment=CompactIncrement( + compact_before=[_make_file('old.parquet')], + compact_after=[_make_file('new.parquet')], + ), + ) + + commit.commit([msg], commit_identifier=100) + + commit._try_commit.assert_called_once() + call_kwargs = commit._try_commit.call_args.kwargs + self.assertEqual('COMPACT', call_kwargs['commit_kind']) + self.assertEqual(100, call_kwargs['commit_identifier']) + + def test_commit_with_new_files_keeps_append_kind_even_when_compact_fields_present(self, *_): + commit = self._create_commit() + commit._try_commit = Mock() + msg = CommitMessage( + partition=('p1',), + bucket=0, + data_increment=DataIncrement(new_files=[_make_file('new.parquet')]), + compact_increment=CompactIncrement( + compact_before=[_make_file('old.parquet')], + compact_after=[_make_file('merged.parquet')], + ), + ) + + commit.commit([msg], commit_identifier=200) + + call_kwargs = commit._try_commit.call_args.kwargs + self.assertEqual('APPEND', call_kwargs['commit_kind']) + + def test_commit_compact_uses_compact_kind_and_no_conflict_detection(self, *_): + commit = self._create_commit() + commit._try_commit = Mock() + msg = CommitMessage( + partition=('p1',), + bucket=3, + compact_increment=CompactIncrement( + compact_before=[_make_file('old.parquet')], + compact_after=[_make_file('new.parquet')], + ), + ) + + commit.commit_compact([msg], commit_identifier=300) + + commit._try_commit.assert_called_once() + kwargs = commit._try_commit.call_args.kwargs + self.assertEqual('COMPACT', kwargs['commit_kind']) + self.assertEqual(300, kwargs['commit_identifier']) + self.assertFalse(kwargs['detect_conflicts']) + self.assertFalse(kwargs['allow_rollback']) + + def test_commit_compact_rejects_messages_with_new_files(self, *_): + commit = self._create_commit() + msg = CommitMessage( + partition=('p1',), + bucket=0, + data_increment=DataIncrement(new_files=[_make_file('append.parquet')]), + compact_increment=CompactIncrement( + compact_before=[_make_file('old.parquet')], + compact_after=[_make_file('new.parquet')], + ), + ) + + with self.assertRaises(ValueError): + commit.commit_compact([msg], commit_identifier=400) + + def test_commit_compact_skips_when_no_messages(self, *_): + commit = self._create_commit() + commit._try_commit = Mock() + + commit.commit_compact([], commit_identifier=500) + + commit._try_commit.assert_not_called() + + def test_commit_compact_skips_when_messages_have_no_files(self, *_): + commit = self._create_commit() + commit._try_commit = Mock() + empty_msg = CommitMessage(partition=('p1',), bucket=0) + + commit.commit_compact([empty_msg], commit_identifier=600) + + commit._try_commit.assert_not_called() + + +if __name__ == '__main__': + unittest.main() diff --git a/paimon-python/pypaimon/tests/file_store_commit_test.py b/paimon-python/pypaimon/tests/file_store_commit_test.py index ec13d4041add..68ff4c37f9f2 100644 --- a/paimon-python/pypaimon/tests/file_store_commit_test.py +++ b/paimon-python/pypaimon/tests/file_store_commit_test.py @@ -24,6 +24,7 @@ from pypaimon.snapshot.snapshot_commit import PartitionStatistics from pypaimon.table.row.generic_row import GenericRow from pypaimon.write.commit_message import CommitMessage +from pypaimon.write.data_increment import DataIncrement from pypaimon.write.file_store_commit import FileStoreCommit @@ -86,7 +87,7 @@ def test_generate_partition_statistics_single_partition_single_file( commit_message = CommitMessage( partition=('2024-01-15', 'us-east-1'), bucket=0, - new_files=[file_meta] + data_increment=DataIncrement(new_files=[file_meta]) ) # Test method @@ -151,7 +152,7 @@ def test_generate_partition_statistics_multiple_files_same_partition( commit_message = CommitMessage( partition=('2024-01-15', 'us-east-1'), bucket=0, - new_files=[file_meta_1, file_meta_2] + data_increment=DataIncrement(new_files=[file_meta_1, file_meta_2]) ) # Test method @@ -223,13 +224,13 @@ def test_generate_partition_statistics_multiple_partitions( commit_message_1 = CommitMessage( partition=('2024-01-15', 'us-east-1'), bucket=0, - new_files=[file_meta_1] + data_increment=DataIncrement(new_files=[file_meta_1]) ) commit_message_2 = CommitMessage( partition=('2024-01-15', 'us-west-2'), bucket=0, - new_files=[file_meta_2] + data_increment=DataIncrement(new_files=[file_meta_2]) ) # Test method @@ -292,7 +293,7 @@ def test_generate_partition_statistics_unpartitioned_table( commit_message = CommitMessage( partition=(), # Empty partition for unpartitioned table bucket=0, - new_files=[file_meta] + data_increment=DataIncrement(new_files=[file_meta]) ) # Test method @@ -331,7 +332,7 @@ def test_generate_partition_statistics_no_creation_time( commit_message = CommitMessage( partition=('2024-01-15', 'us-east-1'), bucket=0, - new_files=[file_meta] + data_increment=DataIncrement(new_files=[file_meta]) ) # Test method @@ -373,7 +374,7 @@ def test_generate_partition_statistics_mismatched_partition_keys( commit_message = CommitMessage( partition=('2024-01-15', 'us-east-1', 'extra-value'), # 3 values but table has 2 keys bucket=0, - new_files=[file_meta] + data_increment=DataIncrement(new_files=[file_meta]) ) # Test method diff --git a/paimon-python/pypaimon/tests/partition_predicate_test.py b/paimon-python/pypaimon/tests/partition_predicate_test.py index abf5212d0e75..005303217724 100644 --- a/paimon-python/pypaimon/tests/partition_predicate_test.py +++ b/paimon-python/pypaimon/tests/partition_predicate_test.py @@ -28,6 +28,7 @@ from pypaimon.table.row.offset_row import OffsetRow from pypaimon.write.commit.commit_scanner import CommitScanner from pypaimon.write.commit_message import CommitMessage +from pypaimon.write.data_increment import DataIncrement from pypaimon.write.file_store_commit import FileStoreCommit PARTITION_FIELDS = [ @@ -170,7 +171,11 @@ def _create_commit(self, stub_commit=True): @staticmethod def _msg(partition): - return CommitMessage(partition=partition, bucket=0, new_files=[Mock(row_count=10)]) + return CommitMessage( + partition=partition, + bucket=0, + data_increment=DataIncrement(new_files=[Mock(row_count=10)]), + ) def _extract_partition_predicate(self, commit): entries_plan = commit._try_commit.call_args[1]['commit_entries_plan'] diff --git a/paimon-python/pypaimon/tests/table_commit_test.py b/paimon-python/pypaimon/tests/table_commit_test.py index d0b9d62762df..d84a7c417d4a 100644 --- a/paimon-python/pypaimon/tests/table_commit_test.py +++ b/paimon-python/pypaimon/tests/table_commit_test.py @@ -22,6 +22,7 @@ from pypaimon.snapshot.snapshot import BATCH_COMMIT_IDENTIFIER from pypaimon.write.commit_message import CommitMessage +from pypaimon.write.data_increment import DataIncrement from pypaimon.write.table_commit import BatchTableCommit, StreamTableCommit @@ -51,7 +52,11 @@ def test_overwrite_forwards_filtered_messages(self, name, msg_flags): commit, mock_fsc = self._create_commit(BatchTableCommit, overwrite_partition={'f0': 1}) messages = [ - CommitMessage(partition=(1,), bucket=0, new_files=[Mock()] if has_files else []) + CommitMessage( + partition=(1,), + bucket=0, + data_increment=DataIncrement(new_files=[Mock()] if has_files else []), + ) for has_files in msg_flags ] commit.commit(messages) @@ -74,7 +79,11 @@ def test_append_forwards_non_empty_messages(self, name, msg_flags): commit, mock_fsc = self._create_commit(BatchTableCommit, overwrite_partition=None) messages = [ - CommitMessage(partition=(), bucket=0, new_files=[Mock()] if has_files else []) + CommitMessage( + partition=(), + bucket=0, + data_increment=DataIncrement(new_files=[Mock()] if has_files else []), + ) for has_files in msg_flags ] commit.commit(messages) diff --git a/paimon-python/pypaimon/write/commit_message.py b/paimon-python/pypaimon/write/commit_message.py index 7bce06d8ab13..00d8c49130cd 100644 --- a/paimon-python/pypaimon/write/commit_message.py +++ b/paimon-python/pypaimon/write/commit_message.py @@ -15,18 +15,67 @@ # specific language governing permissions and limitations # under the License. -from dataclasses import dataclass -from typing import List, Tuple, Optional +from dataclasses import dataclass, field +from typing import List, Optional, Tuple from pypaimon.manifest.schema.data_file_meta import DataFileMeta +from pypaimon.write.compact_increment import CompactIncrement +from pypaimon.write.data_increment import DataIncrement @dataclass class CommitMessage: + """File committable for sink. + + Direct port of org.apache.paimon.table.sink.CommitMessageImpl. Carries + everything one (partition, bucket) writer or compactor contributes to a + snapshot, packaged as a (data_increment, compact_increment) pair so the + same message type can describe both pure writes and compaction results. + + - partition / bucket: identify the (partition, bucket) the message + applies to. + - total_buckets: number of buckets the table had at write time, used by + the commit path to detect bucket-count changes. + - data_increment: ADD/DELETE/changelog/index deltas from a normal write. + - compact_increment: ADD/DELETE/changelog/index deltas from compaction. + - check_from_snapshot: row-tracking conflict-detection anchor; -1 means + "no check" (default). + """ + partition: Tuple bucket: int - new_files: List[DataFileMeta] + total_buckets: Optional[int] = None + data_increment: DataIncrement = field(default_factory=DataIncrement) + compact_increment: CompactIncrement = field(default_factory=CompactIncrement) check_from_snapshot: Optional[int] = -1 - def is_empty(self): - return not self.new_files + # ---- Convenience accessors --------------------------------------------- + # Mirror Java's CommitMessageImpl shape: callers usually want the + # individual file lists rather than reaching through the increment. + + @property + def new_files(self) -> List[DataFileMeta]: + return self.data_increment.new_files + + @property + def deleted_files(self) -> List[DataFileMeta]: + return self.data_increment.deleted_files + + @property + def changelog_files(self) -> List[DataFileMeta]: + return self.data_increment.changelog_files + + @property + def compact_before(self) -> List[DataFileMeta]: + return self.compact_increment.compact_before + + @property + def compact_after(self) -> List[DataFileMeta]: + return self.compact_increment.compact_after + + @property + def compact_changelog_files(self) -> List[DataFileMeta]: + return self.compact_increment.changelog_files + + def is_empty(self) -> bool: + return self.data_increment.is_empty() and self.compact_increment.is_empty() diff --git a/paimon-python/pypaimon/write/commit_message_serializer.py b/paimon-python/pypaimon/write/commit_message_serializer.py new file mode 100644 index 000000000000..57f81c9ce820 --- /dev/null +++ b/paimon-python/pypaimon/write/commit_message_serializer.py @@ -0,0 +1,163 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +import json +from typing import Any, Dict, List + +from pypaimon.index.index_file_meta import IndexFileMeta +from pypaimon.manifest.schema.data_file_meta import (DataFileMeta, decode_value, + encode_value) +from pypaimon.write.commit_message import CommitMessage +from pypaimon.write.compact_increment import CompactIncrement +from pypaimon.write.data_increment import DataIncrement + + +class CommitMessageSerializer: + """Cross-process serializer for CommitMessage payloads. + + JSON-based on purpose: human-debuggable, version-tolerant across worker + Python versions, and avoids the security/compat pitfalls of pickle when + shipping CompactTask outputs from Ray workers back to the driver. + + The wire shape mirrors org.apache.paimon.table.sink.CommitMessageImpl: + every message is (partition, bucket, total_buckets, data_increment, + compact_increment), with each increment carrying its own new/deleted/ + changelog file lists plus index file deltas. Today the index slots are + populated only by tables that opt into them; the serializer round-trips + them either way so adding deletion vectors / global index later does + not need a new payload version. + """ + + VERSION = 1 + + @classmethod + def serialize(cls, message: CommitMessage) -> bytes: + return json.dumps(cls.to_dict(message), separators=(",", ":")).encode("utf-8") + + @classmethod + def deserialize(cls, payload: bytes) -> CommitMessage: + return cls.from_dict(json.loads(payload.decode("utf-8"))) + + @classmethod + def to_dict(cls, message: CommitMessage) -> Dict[str, Any]: + partition = message.partition if message.partition is not None else () + return { + "version": cls.VERSION, + "partition": [encode_value(v) for v in partition], + "bucket": message.bucket, + "total_buckets": message.total_buckets, + "data_increment": cls._data_increment_to_dict(message.data_increment), + "compact_increment": cls._compact_increment_to_dict(message.compact_increment), + "check_from_snapshot": message.check_from_snapshot, + } + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> CommitMessage: + version = data.get("version", cls.VERSION) + if version != cls.VERSION: + raise ValueError( + f"Unsupported CommitMessage payload version: {version} (expected {cls.VERSION})" + ) + partition_values = data.get("partition") or [] + return CommitMessage( + partition=tuple(decode_value(v) for v in partition_values), + bucket=data["bucket"], + total_buckets=data.get("total_buckets"), + data_increment=cls._data_increment_from_dict(data.get("data_increment")), + compact_increment=cls._compact_increment_from_dict(data.get("compact_increment")), + check_from_snapshot=data.get("check_from_snapshot", -1), + ) + + @classmethod + def serialize_list(cls, messages: List[CommitMessage]) -> List[bytes]: + return [cls.serialize(m) for m in messages] + + @classmethod + def deserialize_list(cls, payloads: List[bytes]) -> List[CommitMessage]: + return [cls.deserialize(p) for p in payloads] + + # ---- Increment helpers ------------------------------------------------- + + @classmethod + def _data_increment_to_dict(cls, inc: DataIncrement) -> Dict[str, Any]: + return { + "new_files": [f.to_dict() for f in inc.new_files], + "deleted_files": [f.to_dict() for f in inc.deleted_files], + "changelog_files": [f.to_dict() for f in inc.changelog_files], + "new_index_files": [_index_file_to_dict(i) for i in inc.new_index_files], + "deleted_index_files": [_index_file_to_dict(i) for i in inc.deleted_index_files], + } + + @classmethod + def _data_increment_from_dict(cls, data) -> DataIncrement: + if not data: + return DataIncrement() + return DataIncrement( + new_files=[DataFileMeta.from_dict(f) for f in data.get("new_files") or []], + deleted_files=[DataFileMeta.from_dict(f) for f in data.get("deleted_files") or []], + changelog_files=[DataFileMeta.from_dict(f) for f in data.get("changelog_files") or []], + new_index_files=[_index_file_from_dict(i) for i in data.get("new_index_files") or []], + deleted_index_files=[_index_file_from_dict(i) for i in data.get("deleted_index_files") or []], + ) + + @classmethod + def _compact_increment_to_dict(cls, inc: CompactIncrement) -> Dict[str, Any]: + return { + "compact_before": [f.to_dict() for f in inc.compact_before], + "compact_after": [f.to_dict() for f in inc.compact_after], + "changelog_files": [f.to_dict() for f in inc.changelog_files], + "new_index_files": [_index_file_to_dict(i) for i in inc.new_index_files], + "deleted_index_files": [_index_file_to_dict(i) for i in inc.deleted_index_files], + } + + @classmethod + def _compact_increment_from_dict(cls, data) -> CompactIncrement: + if not data: + return CompactIncrement() + return CompactIncrement( + compact_before=[DataFileMeta.from_dict(f) for f in data.get("compact_before") or []], + compact_after=[DataFileMeta.from_dict(f) for f in data.get("compact_after") or []], + changelog_files=[DataFileMeta.from_dict(f) for f in data.get("changelog_files") or []], + new_index_files=[_index_file_from_dict(i) for i in data.get("new_index_files") or []], + deleted_index_files=[_index_file_from_dict(i) for i in data.get("deleted_index_files") or []], + ) + + +# IndexFileMeta has richer payloads (deletion vector ranges, global index +# meta) that aren't relevant to the basic compaction path yet — round-trip +# only the scalar identity fields here. Phase 6/7 (deletion vectors, +# changelog producer) will extend this to cover dv_ranges and +# global_index_meta as the rewriter starts producing them. +def _index_file_to_dict(idx: IndexFileMeta) -> Dict[str, Any]: + return { + "index_type": idx.index_type, + "file_name": idx.file_name, + "file_size": idx.file_size, + "row_count": idx.row_count, + "external_path": idx.external_path, + } + + +def _index_file_from_dict(data: Dict[str, Any]) -> IndexFileMeta: + return IndexFileMeta( + index_type=data["index_type"], + file_name=data["file_name"], + file_size=data["file_size"], + row_count=data["row_count"], + external_path=data.get("external_path"), + ) diff --git a/paimon-python/pypaimon/write/compact_increment.py b/paimon-python/pypaimon/write/compact_increment.py new file mode 100644 index 000000000000..ddf923ceb9c8 --- /dev/null +++ b/paimon-python/pypaimon/write/compact_increment.py @@ -0,0 +1,58 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +from dataclasses import dataclass, field +from typing import List + +from pypaimon.index.index_file_meta import IndexFileMeta +from pypaimon.manifest.schema.data_file_meta import DataFileMeta + + +@dataclass +class CompactIncrement: + """Files changed before and after compaction, with changelog produced during compaction. + + Direct port of org.apache.paimon.io.CompactIncrement. + + - compact_before: input files consumed by compaction (DELETE entries). + - compact_after: rewritten output files (ADD entries). + - changelog_files: changelog files emitted while compacting (used by the + full-compaction changelog producer; empty in the basic dedup path). + - new_index_files / deleted_index_files: index file deltas attributable + to this compaction (deletion vectors / global index updates). Empty + lists by default. + """ + + compact_before: List[DataFileMeta] = field(default_factory=list) + compact_after: List[DataFileMeta] = field(default_factory=list) + changelog_files: List[DataFileMeta] = field(default_factory=list) + new_index_files: List[IndexFileMeta] = field(default_factory=list) + deleted_index_files: List[IndexFileMeta] = field(default_factory=list) + + def is_empty(self) -> bool: + return ( + not self.compact_before + and not self.compact_after + and not self.changelog_files + and not self.new_index_files + and not self.deleted_index_files + ) + + @classmethod + def empty(cls) -> "CompactIncrement": + return cls() diff --git a/paimon-python/pypaimon/write/data_increment.py b/paimon-python/pypaimon/write/data_increment.py new file mode 100644 index 000000000000..e0b1d16f98d1 --- /dev/null +++ b/paimon-python/pypaimon/write/data_increment.py @@ -0,0 +1,61 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +from dataclasses import dataclass, field +from typing import List + +from pypaimon.index.index_file_meta import IndexFileMeta +from pypaimon.manifest.schema.data_file_meta import DataFileMeta + + +@dataclass +class DataIncrement: + """Increment of data files, changelog files and index files produced by a write. + + Direct port of org.apache.paimon.io.DataIncrement. Carries everything one + write attempt contributes to a snapshot, so a CommitMessage can be + constructed from a (DataIncrement, CompactIncrement) pair just like the + Java side. + + - new_files: data files this write created (ADD entries). + - deleted_files: data files this write removed without compaction + (e.g. row-level delete in data-evolution tables); ADD/DELETE asymmetry + is preserved by giving each list its own slot. + - changelog_files: changelog data files associated with this write. + - new_index_files / deleted_index_files: index file deltas (deletion + vectors, global index, ...). Empty lists by default. + """ + + new_files: List[DataFileMeta] = field(default_factory=list) + deleted_files: List[DataFileMeta] = field(default_factory=list) + changelog_files: List[DataFileMeta] = field(default_factory=list) + new_index_files: List[IndexFileMeta] = field(default_factory=list) + deleted_index_files: List[IndexFileMeta] = field(default_factory=list) + + def is_empty(self) -> bool: + return ( + not self.new_files + and not self.deleted_files + and not self.changelog_files + and not self.new_index_files + and not self.deleted_index_files + ) + + @classmethod + def empty(cls) -> "DataIncrement": + return cls() diff --git a/paimon-python/pypaimon/write/file_store_commit.py b/paimon-python/pypaimon/write/file_store_commit.py index d88beb164e03..611dffb1fcd7 100644 --- a/paimon-python/pypaimon/write/file_store_commit.py +++ b/paimon-python/pypaimon/write/file_store_commit.py @@ -110,7 +110,12 @@ def __init__(self, snapshot_commit: SnapshotCommit, table, commit_user: str): self.rollback = CommitRollback(table_rollback) if table_rollback is not None else None def commit(self, commit_messages: List[CommitMessage], commit_identifier: int): - """Commit the given commit messages in normal append mode.""" + """Commit the given commit messages in normal append mode. + + new_files in each message generate ADD entries; compact_before/compact_after + generate DELETE/ADD entries respectively. If only compact_* fields are present + across all messages (no new_files), commit_kind becomes COMPACT. + """ if not commit_messages: return @@ -125,21 +130,12 @@ def commit(self, commit_messages: List[CommitMessage], commit_identifier: int): self.table.identifier, len(commit_messages), ) - commit_entries = [] - for msg in commit_messages: - partition = GenericRow(list(msg.partition), self.table.partition_keys_fields) - for file in msg.new_files: - commit_entries.append(ManifestEntry( - kind=0, - partition=partition, - bucket=msg.bucket, - total_buckets=self.table.total_buckets, - file=file - )) + commit_entries = self._build_commit_entries(commit_messages) + has_new_files = any(msg.new_files for msg in commit_messages) logger.info("Finished collecting changes, including: %d entries", len(commit_entries)) - commit_kind = "APPEND" + commit_kind = "APPEND" if has_new_files else "COMPACT" detect_conflicts = False allow_rollback = False if self.conflict_detection.should_be_overwrite_commit(): @@ -156,6 +152,70 @@ def commit(self, commit_messages: List[CommitMessage], commit_identifier: int): detect_conflicts=detect_conflicts, allow_rollback=allow_rollback) + def commit_compact(self, commit_messages: List[CommitMessage], commit_identifier: int): + """Commit compaction results (compact_before/compact_after only). + + Each message must carry no new_files. compact_before generate DELETE entries, + compact_after generate ADD entries. Snapshot kind is COMPACT. + """ + if not commit_messages: + return + + for msg in commit_messages: + if msg.new_files: + raise ValueError( + "commit_compact rejects messages with new_files; use commit() instead." + ) + + logger.info( + "Ready to commit compact to table %s, number of commit messages: %d", + self.table.identifier, + len(commit_messages), + ) + commit_entries = self._build_commit_entries(commit_messages) + if not commit_entries: + return + + logger.info("Finished collecting compact changes: %d entries", len(commit_entries)) + + self._try_commit( + commit_kind="COMPACT", + commit_identifier=commit_identifier, + commit_entries_plan=lambda snapshot: commit_entries, + detect_conflicts=False, + allow_rollback=False, + ) + + def _build_commit_entries(self, commit_messages: List[CommitMessage]) -> List[ManifestEntry]: + entries: List[ManifestEntry] = [] + for msg in commit_messages: + partition = GenericRow(list(msg.partition), self.table.partition_keys_fields) + for file in msg.new_files: + entries.append(ManifestEntry( + kind=0, + partition=partition, + bucket=msg.bucket, + total_buckets=self.table.total_buckets, + file=file, + )) + for file in msg.compact_before: + entries.append(ManifestEntry( + kind=1, + partition=partition, + bucket=msg.bucket, + total_buckets=self.table.total_buckets, + file=file, + )) + for file in msg.compact_after: + entries.append(ManifestEntry( + kind=0, + partition=partition, + bucket=msg.bucket, + total_buckets=self.table.total_buckets, + file=file, + )) + return entries + def overwrite(self, overwrite_partition, commit_messages: List[CommitMessage], commit_identifier: int): """Commit the given commit messages in overwrite mode.""" logger.info( diff --git a/paimon-python/pypaimon/write/file_store_write.py b/paimon-python/pypaimon/write/file_store_write.py index c33fca3792eb..92c61823d565 100644 --- a/paimon-python/pypaimon/write/file_store_write.py +++ b/paimon-python/pypaimon/write/file_store_write.py @@ -22,6 +22,7 @@ from pypaimon.common.options.core_options import CoreOptions from pypaimon.write.commit_message import CommitMessage +from pypaimon.write.data_increment import DataIncrement from pypaimon.write.writer.append_only_data_writer import AppendOnlyDataWriter from pypaimon.write.writer.data_blob_writer import DataBlobWriter from pypaimon.write.writer.data_writer import DataWriter @@ -110,7 +111,8 @@ def prepare_commit(self, commit_identifier) -> List[CommitMessage]: commit_message = CommitMessage( partition=partition, bucket=bucket, - new_files=committed_files + total_buckets=self.table.total_buckets, + data_increment=DataIncrement(new_files=committed_files), ) commit_messages.append(commit_message) return commit_messages diff --git a/paimon-python/pypaimon/write/table_update.py b/paimon-python/pypaimon/write/table_update.py index fe2fb9a64b79..12af0391397d 100644 --- a/paimon-python/pypaimon/write/table_update.py +++ b/paimon-python/pypaimon/write/table_update.py @@ -27,6 +27,7 @@ from pypaimon.read.split import DataSplit from pypaimon.snapshot.snapshot import BATCH_COMMIT_IDENTIFIER from pypaimon.write.commit_message import CommitMessage +from pypaimon.write.data_increment import DataIncrement from pypaimon.write.table_update_by_row_id import TableUpdateByRowId from pypaimon.write.table_upsert_by_key import TableUpsertByKey from pypaimon.write.writer.data_writer import DataWriter @@ -269,7 +270,13 @@ def arrow_reader(self) -> pyarrow.ipc.RecordBatchReader: def prepare_commit(self) -> List[CommitMessage]: commit_messages = [] for (partition, files) in self.dict.items(): - commit_messages.append(CommitMessage(partition, 0, files, self.snapshot_id)) + commit_messages.append(CommitMessage( + partition=partition, + bucket=0, + total_buckets=self.table.total_buckets, + data_increment=DataIncrement(new_files=files), + check_from_snapshot=self.snapshot_id, + )) return commit_messages def update_by_arrow_batch(self, data: pa.RecordBatch): From 4ee065481aa8f020af37b2c1fd3145d132fa0cf3 Mon Sep 17 00:00:00 2001 From: TheR1sing3un Date: Sat, 16 May 2026 21:02:48 +0800 Subject: [PATCH 2/2] [python][compact] address review findings on CommitMessage protocol MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Correctness: - FileStoreCommit._build_commit_entries now rejects DataIncrement.{deleted_files, changelog_files, new_index_files, deleted_index_files} and CompactIncrement.{changelog_files, new_index_files, deleted_index_files}. These slots used to be silently dropped at commit; raising loudly turns a future correctness foot-gun into a NotImplementedError so later changelog-producer / deletion-vector / row-level-delete work has to wire the new path through commit explicitly. - ManifestEntry now uses msg.total_buckets when set, falling back to self.table.total_buckets otherwise. A stale plan whose bucket count has since been rescaled would otherwise be silently overwritten with the new value. - FileStoreCommit.commit() now rejects messages carrying compact_increment. commit() is the write-side entry (always APPEND, OVERWRITE if conflict detection demands it); compact_increment must go through commit_compact(). The previous 'auto-pick COMPACT when no new_files' branch was unreachable (FileStoreWrite.prepare_commit() only fills new_files; CompactJob calls commit_compact() directly) and would have produced the wrong snapshot shape for a mixed message anyway. Docs / naming: - CommitMessageSerializer: docstring trimmed to its job (JSON wire format for pypaimon ↔ pypaimon transport, e.g. Ray driver ↔ workers). - commit_compact: docstring trimmed to its behavior. - DataIncrement.empty() / CompactIncrement.empty() renamed to empty_increment() for a more specific name; no callers in-tree yet. - Trim cross-language commentary from class docstrings on CommitMessage, DataIncrement, CompactIncrement. Tests (file_store_commit_compact_test): - New: NotImplementedError raised for unsupported DataIncrement / CompactIncrement fields. - New: msg.total_buckets wins over table.total_buckets when set; fallback otherwise. - New: commit() rejects compact_increment messages. - Removed: two cases that exercised the old auto-COMPACT branch (now unreachable). --- .../tests/file_store_commit_compact_test.py | 121 ++++++++++++------ .../pypaimon/write/commit_message.py | 13 +- .../write/commit_message_serializer.py | 15 ++- .../pypaimon/write/compact_increment.py | 4 +- .../pypaimon/write/data_increment.py | 9 +- .../pypaimon/write/file_store_commit.py | 56 ++++++-- 6 files changed, 147 insertions(+), 71 deletions(-) diff --git a/paimon-python/pypaimon/tests/file_store_commit_compact_test.py b/paimon-python/pypaimon/tests/file_store_commit_compact_test.py index 9e786dcd8075..f8ff34b06bf0 100644 --- a/paimon-python/pypaimon/tests/file_store_commit_compact_test.py +++ b/paimon-python/pypaimon/tests/file_store_commit_compact_test.py @@ -110,43 +110,6 @@ def test_build_entries_emits_delete_for_compact_before_and_add_for_compact_after self.assertEqual(['old-1.parquet', 'old-2.parquet', 'merged.parquet'], names) self.assertTrue(all(e.bucket == 1 for e in entries)) - def test_commit_with_only_compact_messages_uses_compact_kind(self, *_): - commit = self._create_commit() - commit._try_commit = Mock() - msg = CommitMessage( - partition=('p1',), - bucket=0, - compact_increment=CompactIncrement( - compact_before=[_make_file('old.parquet')], - compact_after=[_make_file('new.parquet')], - ), - ) - - commit.commit([msg], commit_identifier=100) - - commit._try_commit.assert_called_once() - call_kwargs = commit._try_commit.call_args.kwargs - self.assertEqual('COMPACT', call_kwargs['commit_kind']) - self.assertEqual(100, call_kwargs['commit_identifier']) - - def test_commit_with_new_files_keeps_append_kind_even_when_compact_fields_present(self, *_): - commit = self._create_commit() - commit._try_commit = Mock() - msg = CommitMessage( - partition=('p1',), - bucket=0, - data_increment=DataIncrement(new_files=[_make_file('new.parquet')]), - compact_increment=CompactIncrement( - compact_before=[_make_file('old.parquet')], - compact_after=[_make_file('merged.parquet')], - ), - ) - - commit.commit([msg], commit_identifier=200) - - call_kwargs = commit._try_commit.call_args.kwargs - self.assertEqual('APPEND', call_kwargs['commit_kind']) - def test_commit_compact_uses_compact_kind_and_no_conflict_detection(self, *_): commit = self._create_commit() commit._try_commit = Mock() @@ -200,6 +163,90 @@ def test_commit_compact_skips_when_messages_have_no_files(self, *_): commit._try_commit.assert_not_called() + # _build_commit_entries must reject increment fields it does not yet + # wire up so a future writer cannot silently drop them at commit time. + + def test_build_entries_rejects_data_increment_deleted_files(self, *_): + commit = self._create_commit() + msg = CommitMessage( + partition=('2024-01-15',), + bucket=0, + data_increment=DataIncrement(deleted_files=[_make_file('d.parquet')]), + ) + with self.assertRaises(NotImplementedError): + commit._build_commit_entries([msg]) + + def test_build_entries_rejects_data_increment_changelog_files(self, *_): + commit = self._create_commit() + msg = CommitMessage( + partition=('2024-01-15',), + bucket=0, + data_increment=DataIncrement(changelog_files=[_make_file('c.parquet')]), + ) + with self.assertRaises(NotImplementedError): + commit._build_commit_entries([msg]) + + def test_build_entries_rejects_compact_increment_changelog_files(self, *_): + commit = self._create_commit() + msg = CommitMessage( + partition=('2024-01-15',), + bucket=0, + compact_increment=CompactIncrement(changelog_files=[_make_file('c.parquet')]), + ) + with self.assertRaises(NotImplementedError): + commit._build_commit_entries([msg]) + + def test_build_entries_uses_message_total_buckets_when_set(self, *_): + """A stale plan whose bucket count has been rescaled should keep the + message's own total_buckets, not be silently overwritten with the + table's current value. + """ + commit = self._create_commit() + # Table claims 4 buckets; message was planned when there were 8. + self.mock_table.total_buckets = 4 + msg = CommitMessage( + partition=('2024-01-15',), + bucket=0, + total_buckets=8, + data_increment=DataIncrement(new_files=[_make_file('a.parquet')]), + ) + + entries = commit._build_commit_entries([msg]) + + self.assertEqual(1, len(entries)) + self.assertEqual(8, entries[0].total_buckets) + + def test_commit_rejects_messages_with_compact_increment(self, *_): + """commit() is the write-side entry; compaction results must go through + commit_compact(). A caller that mixes them is almost certainly wrong + (would otherwise silently flatten two snapshot kinds into one).""" + commit = self._create_commit() + commit._try_commit = Mock() + msg = CommitMessage( + partition=('p1',), + bucket=0, + compact_increment=CompactIncrement(compact_after=[_make_file('a.parquet')]), + ) + + with self.assertRaises(ValueError): + commit.commit([msg], commit_identifier=700) + + commit._try_commit.assert_not_called() + + def test_build_entries_falls_back_to_table_total_buckets_when_message_has_none(self, *_): + commit = self._create_commit() + self.mock_table.total_buckets = 4 + msg = CommitMessage( + partition=('2024-01-15',), + bucket=0, + total_buckets=None, + data_increment=DataIncrement(new_files=[_make_file('a.parquet')]), + ) + + entries = commit._build_commit_entries([msg]) + + self.assertEqual(4, entries[0].total_buckets) + if __name__ == '__main__': unittest.main() diff --git a/paimon-python/pypaimon/write/commit_message.py b/paimon-python/pypaimon/write/commit_message.py index 00d8c49130cd..223c3e572ca5 100644 --- a/paimon-python/pypaimon/write/commit_message.py +++ b/paimon-python/pypaimon/write/commit_message.py @@ -27,10 +27,9 @@ class CommitMessage: """File committable for sink. - Direct port of org.apache.paimon.table.sink.CommitMessageImpl. Carries - everything one (partition, bucket) writer or compactor contributes to a - snapshot, packaged as a (data_increment, compact_increment) pair so the - same message type can describe both pure writes and compaction results. + Carries everything one (partition, bucket) writer or compactor contributes + to a snapshot, packaged as a (data_increment, compact_increment) pair so + the same message type can describe both pure writes and compaction results. - partition / bucket: identify the (partition, bucket) the message applies to. @@ -39,7 +38,7 @@ class CommitMessage: - data_increment: ADD/DELETE/changelog/index deltas from a normal write. - compact_increment: ADD/DELETE/changelog/index deltas from compaction. - check_from_snapshot: row-tracking conflict-detection anchor; -1 means - "no check" (default). + "no check" (default). Read per-message by write/commit/conflict_detection.py. """ partition: Tuple @@ -50,8 +49,8 @@ class CommitMessage: check_from_snapshot: Optional[int] = -1 # ---- Convenience accessors --------------------------------------------- - # Mirror Java's CommitMessageImpl shape: callers usually want the - # individual file lists rather than reaching through the increment. + # Callers usually want the individual file lists rather than reaching + # through the increment. @property def new_files(self) -> List[DataFileMeta]: diff --git a/paimon-python/pypaimon/write/commit_message_serializer.py b/paimon-python/pypaimon/write/commit_message_serializer.py index 57f81c9ce820..d218d4e91f65 100644 --- a/paimon-python/pypaimon/write/commit_message_serializer.py +++ b/paimon-python/pypaimon/write/commit_message_serializer.py @@ -28,21 +28,22 @@ class CommitMessageSerializer: - """Cross-process serializer for CommitMessage payloads. + """JSON wire format for shipping CommitMessage between pypaimon processes + (e.g. Ray driver ↔ workers). - JSON-based on purpose: human-debuggable, version-tolerant across worker - Python versions, and avoids the security/compat pitfalls of pickle when - shipping CompactTask outputs from Ray workers back to the driver. + JSON on purpose: human-debuggable, tolerant of Python version drift, and + avoids pickle's compat/security pitfalls when shipping CompactTask outputs + from Ray workers back to the driver. - The wire shape mirrors org.apache.paimon.table.sink.CommitMessageImpl: - every message is (partition, bucket, total_buckets, data_increment, - compact_increment), with each increment carrying its own new/deleted/ + Every message round-trips (partition, bucket, total_buckets, data_increment, + compact_increment), with each increment carrying its own new / deleted / changelog file lists plus index file deltas. Today the index slots are populated only by tables that opt into them; the serializer round-trips them either way so adding deletion vectors / global index later does not need a new payload version. """ + # Wire format version; bump on incompatible payload changes. VERSION = 1 @classmethod diff --git a/paimon-python/pypaimon/write/compact_increment.py b/paimon-python/pypaimon/write/compact_increment.py index ddf923ceb9c8..6e6d6e0a3722 100644 --- a/paimon-python/pypaimon/write/compact_increment.py +++ b/paimon-python/pypaimon/write/compact_increment.py @@ -27,8 +27,6 @@ class CompactIncrement: """Files changed before and after compaction, with changelog produced during compaction. - Direct port of org.apache.paimon.io.CompactIncrement. - - compact_before: input files consumed by compaction (DELETE entries). - compact_after: rewritten output files (ADD entries). - changelog_files: changelog files emitted while compacting (used by the @@ -54,5 +52,5 @@ def is_empty(self) -> bool: ) @classmethod - def empty(cls) -> "CompactIncrement": + def empty_increment(cls) -> "CompactIncrement": return cls() diff --git a/paimon-python/pypaimon/write/data_increment.py b/paimon-python/pypaimon/write/data_increment.py index e0b1d16f98d1..fa052a333794 100644 --- a/paimon-python/pypaimon/write/data_increment.py +++ b/paimon-python/pypaimon/write/data_increment.py @@ -27,10 +27,9 @@ class DataIncrement: """Increment of data files, changelog files and index files produced by a write. - Direct port of org.apache.paimon.io.DataIncrement. Carries everything one - write attempt contributes to a snapshot, so a CommitMessage can be - constructed from a (DataIncrement, CompactIncrement) pair just like the - Java side. + Carries everything one write attempt contributes to a snapshot, so a + CommitMessage can be constructed from a (DataIncrement, CompactIncrement) + pair. - new_files: data files this write created (ADD entries). - deleted_files: data files this write removed without compaction @@ -57,5 +56,5 @@ def is_empty(self) -> bool: ) @classmethod - def empty(cls) -> "DataIncrement": + def empty_increment(cls) -> "DataIncrement": return cls() diff --git a/paimon-python/pypaimon/write/file_store_commit.py b/paimon-python/pypaimon/write/file_store_commit.py index 611dffb1fcd7..08d3d231a6c6 100644 --- a/paimon-python/pypaimon/write/file_store_commit.py +++ b/paimon-python/pypaimon/write/file_store_commit.py @@ -110,15 +110,22 @@ def __init__(self, snapshot_commit: SnapshotCommit, table, commit_user: str): self.rollback = CommitRollback(table_rollback) if table_rollback is not None else None def commit(self, commit_messages: List[CommitMessage], commit_identifier: int): - """Commit the given commit messages in normal append mode. + """Commit write-side messages (data_increment only). - new_files in each message generate ADD entries; compact_before/compact_after - generate DELETE/ADD entries respectively. If only compact_* fields are present - across all messages (no new_files), commit_kind becomes COMPACT. + Compaction results (compact_before / compact_after) must be committed + through commit_compact() instead — they produce a separate snapshot kind + and skip row-id assignment. """ if not commit_messages: return + for msg in commit_messages: + if not msg.compact_increment.is_empty(): + raise ValueError( + "commit() rejects messages carrying compact_increment; " + "use commit_compact() for compaction results." + ) + # Extract the minimum check_from_snapshot from commit messages valid_snapshots = [msg.check_from_snapshot for msg in commit_messages if msg.check_from_snapshot != -1] @@ -131,11 +138,10 @@ def commit(self, commit_messages: List[CommitMessage], commit_identifier: int): len(commit_messages), ) commit_entries = self._build_commit_entries(commit_messages) - has_new_files = any(msg.new_files for msg in commit_messages) logger.info("Finished collecting changes, including: %d entries", len(commit_entries)) - commit_kind = "APPEND" if has_new_files else "COMPACT" + commit_kind = "APPEND" detect_conflicts = False allow_rollback = False if self.conflict_detection.should_be_overwrite_commit(): @@ -153,10 +159,11 @@ def commit(self, commit_messages: List[CommitMessage], commit_identifier: int): allow_rollback=allow_rollback) def commit_compact(self, commit_messages: List[CommitMessage], commit_identifier: int): - """Commit compaction results (compact_before/compact_after only). + """Commit compaction-only messages. - Each message must carry no new_files. compact_before generate DELETE entries, - compact_after generate ADD entries. Snapshot kind is COMPACT. + Each message must carry no new_files; compact_before → DELETE entries, + compact_after → ADD entries; snapshot kind = COMPACT. Skips row-id + assignment and conflict detection (a compaction never produces new rows). """ if not commit_messages: return @@ -187,15 +194,40 @@ def commit_compact(self, commit_messages: List[CommitMessage], commit_identifier ) def _build_commit_entries(self, commit_messages: List[CommitMessage]) -> List[ManifestEntry]: + # Only new_files / compact_before / compact_after are wired up today; + # reject the other 7 increment slots loudly so a future writer that + # starts filling them cannot silently lose data at commit time. + for msg in commit_messages: + di = msg.data_increment + if (di.deleted_files or di.changelog_files + or di.new_index_files or di.deleted_index_files): + raise NotImplementedError( + "FileStoreCommit does not yet handle DataIncrement.deleted_files / " + "changelog_files / new_index_files / deleted_index_files; these slots " + "will be wired in by the feature that first needs each one." + ) + ci = msg.compact_increment + if (ci.changelog_files or ci.new_index_files or ci.deleted_index_files): + raise NotImplementedError( + "FileStoreCommit does not yet handle CompactIncrement.changelog_files / " + "new_index_files / deleted_index_files; these slots will be wired in by " + "the feature that first needs each one." + ) + entries: List[ManifestEntry] = [] for msg in commit_messages: partition = GenericRow(list(msg.partition), self.table.partition_keys_fields) + # Prefer the message's total_buckets (captured when the plan was + # built) over the current table value, so a plan that survived a + # bucket rescale is not silently rewritten with the new count. + total_buckets = msg.total_buckets if msg.total_buckets is not None \ + else self.table.total_buckets for file in msg.new_files: entries.append(ManifestEntry( kind=0, partition=partition, bucket=msg.bucket, - total_buckets=self.table.total_buckets, + total_buckets=total_buckets, file=file, )) for file in msg.compact_before: @@ -203,7 +235,7 @@ def _build_commit_entries(self, commit_messages: List[CommitMessage]) -> List[Ma kind=1, partition=partition, bucket=msg.bucket, - total_buckets=self.table.total_buckets, + total_buckets=total_buckets, file=file, )) for file in msg.compact_after: @@ -211,7 +243,7 @@ def _build_commit_entries(self, commit_messages: List[CommitMessage]) -> List[Ma kind=0, partition=partition, bucket=msg.bucket, - total_buckets=self.table.total_buckets, + total_buckets=total_buckets, file=file, )) return entries