Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
188 changes: 186 additions & 2 deletions paimon-python/pypaimon/manifest/schema/data_file_meta.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,19 @@
# under the License.

from dataclasses import dataclass
from datetime import datetime
from typing import List, Optional
from datetime import date, datetime, time as dt_time
from decimal import Decimal
from base64 import b64decode, b64encode
from typing import Any, Dict, List, Optional
import time

from pypaimon.utils.range import Range
from pypaimon.data.timestamp import Timestamp
from pypaimon.manifest.schema.simple_stats import (KEY_STATS_SCHEMA, VALUE_STATS_SCHEMA,
SimpleStats)
from pypaimon.schema.data_types import DataField
from pypaimon.table.row.generic_row import GenericRow
from pypaimon.table.row.internal_row import RowKind
from pypaimon.utils.file_store_path_factory import _is_null_or_whitespace_only


Expand Down Expand Up @@ -223,6 +227,186 @@ def assign_sequence_number(self, min_sequence_number: int, max_sequence_number:
file_path=self.file_path
)

def to_dict(self) -> Dict[str, Any]:
"""Serialize to a JSON-friendly dict for cross-process transport (e.g. Ray task payloads).

Field types preserved via tagged objects (see encode_value/decode_value).
"""
return {
"file_name": self.file_name,
"file_size": self.file_size,
"row_count": self.row_count,
"min_key": _generic_row_to_dict(self.min_key),
"max_key": _generic_row_to_dict(self.max_key),
"key_stats": _simple_stats_to_dict(self.key_stats),
"value_stats": _simple_stats_to_dict(self.value_stats),
"min_sequence_number": self.min_sequence_number,
"max_sequence_number": self.max_sequence_number,
"schema_id": self.schema_id,
"level": self.level,
"extra_files": list(self.extra_files) if self.extra_files is not None else [],
"creation_time": _timestamp_to_dict(self.creation_time),
"delete_row_count": self.delete_row_count,
"embedded_index": _bytes_to_str(self.embedded_index),
"file_source": self.file_source,
"value_stats_cols": list(self.value_stats_cols) if self.value_stats_cols is not None else None,
"external_path": self.external_path,
"first_row_id": self.first_row_id,
"write_cols": list(self.write_cols) if self.write_cols is not None else None,
"file_path": self.file_path,
}

@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "DataFileMeta":
return cls(
file_name=data["file_name"],
file_size=data["file_size"],
row_count=data["row_count"],
min_key=_generic_row_from_dict(data.get("min_key")),
max_key=_generic_row_from_dict(data.get("max_key")),
key_stats=_simple_stats_from_dict(data.get("key_stats")),
value_stats=_simple_stats_from_dict(data.get("value_stats")),
min_sequence_number=data["min_sequence_number"],
max_sequence_number=data["max_sequence_number"],
schema_id=data["schema_id"],
level=data["level"],
extra_files=list(data.get("extra_files") or []),
creation_time=_timestamp_from_dict(data.get("creation_time")),
delete_row_count=data.get("delete_row_count"),
embedded_index=_bytes_from_str(data.get("embedded_index")),
file_source=data.get("file_source"),
value_stats_cols=list(data["value_stats_cols"]) if data.get("value_stats_cols") is not None else None,
external_path=data.get("external_path"),
first_row_id=data.get("first_row_id"),
write_cols=list(data["write_cols"]) if data.get("write_cols") is not None else None,
file_path=data.get("file_path"),
)


def _bytes_to_str(value: Optional[bytes]) -> Optional[str]:
if value is None:
return None
return b64encode(value).decode("ascii")


def _bytes_from_str(value: Optional[str]) -> Optional[bytes]:
if value is None:
return None
return b64decode(value.encode("ascii"))


def _timestamp_to_dict(ts: Optional[Timestamp]) -> Optional[Dict[str, int]]:
if ts is None:
return None
return {"ms": ts.get_millisecond(), "ns": ts.get_nano_of_millisecond()}


def _timestamp_from_dict(data: Optional[Dict[str, int]]) -> Optional[Timestamp]:
if data is None:
return None
return Timestamp(data["ms"], data.get("ns", 0))


def encode_value(value: Any) -> Any:
"""Encode a GenericRow / SimpleStats / partition field value into a JSON-friendly form.

Tagged dicts mark non-JSON-native types so decode_value can round-trip them.
Public so that callers serializing other field-bearing structures (e.g. partitions
in CommitMessage) can reuse the same tagged encoding.
"""
if value is None or isinstance(value, (bool, int, float, str)):
return value
if isinstance(value, bytes):
return {"__t__": "bytes", "v": b64encode(value).decode("ascii")}
if isinstance(value, Decimal):
return {"__t__": "decimal", "v": str(value)}
if isinstance(value, Timestamp):
return {"__t__": "ts", "ms": value.get_millisecond(), "ns": value.get_nano_of_millisecond()}
if isinstance(value, datetime):
return {"__t__": "datetime", "v": value.isoformat()}
if isinstance(value, date):
return {"__t__": "date", "v": value.isoformat()}
if isinstance(value, dt_time):
return {"__t__": "time", "v": value.isoformat()}
raise TypeError(
f"Unsupported value type for DataFileMeta serialization: {type(value).__name__}"
)


def decode_value(value: Any) -> Any:
if not isinstance(value, dict) or "__t__" not in value:
return value
tag = value["__t__"]
if tag == "bytes":
return b64decode(value["v"].encode("ascii"))
if tag == "decimal":
return Decimal(value["v"])
if tag == "ts":
return Timestamp(value["ms"], value.get("ns", 0))
if tag == "datetime":
return datetime.fromisoformat(value["v"])
if tag == "date":
return date.fromisoformat(value["v"])
if tag == "time":
return dt_time.fromisoformat(value["v"])
raise ValueError(f"Unknown tagged value type: {tag}")


def _generic_row_to_dict(row) -> Optional[Dict[str, Any]]:
if row is None:
return None
# GenericRow exposes .values directly; BinaryRow lazily decodes per field
# via get_field(i). Normalize both into a list of decoded Python values
# so the dict format stays uniform.
if hasattr(row, "values"):
values = row.values
else:
values = [row.get_field(i) for i in range(len(row))]
fields = getattr(row, "fields", None)
return {
"values": [encode_value(v) for v in values],
"fields": [f.to_dict() for f in fields] if fields else [],
"row_kind": row.get_row_kind().value if hasattr(row, "get_row_kind") else 0,
}


def _generic_row_from_dict(data: Optional[Dict[str, Any]]) -> Optional[GenericRow]:
if data is None:
return None
fields = [DataField.from_dict(f) for f in data.get("fields", [])]
values = [decode_value(v) for v in data.get("values", [])]
row_kind = RowKind(data.get("row_kind", RowKind.INSERT.value))
return GenericRow(values, fields, row_kind)


def _simple_stats_to_dict(stats: Optional[SimpleStats]) -> Optional[Dict[str, Any]]:
if stats is None:
return None
# null_counts may be a Python list (writer path) or a pyarrow Array-like
# (manifest reader path). Normalize to a plain list of ints.
nc = stats.null_counts
if nc is None:
null_counts = []
elif hasattr(nc, "to_pylist"):
null_counts = nc.to_pylist()
else:
null_counts = list(nc)
return {
"min_values": _generic_row_to_dict(stats.min_values),
"max_values": _generic_row_to_dict(stats.max_values),
"null_counts": null_counts,
}


def _simple_stats_from_dict(data: Optional[Dict[str, Any]]) -> Optional[SimpleStats]:
if data is None:
return None
return SimpleStats(
min_values=_generic_row_from_dict(data.get("min_values")),
max_values=_generic_row_from_dict(data.get("max_values")),
null_counts=list(data.get("null_counts") or []),
)


DATA_FILE_META_SCHEMA = {
"type": "record",
Expand Down
Loading
Loading