Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions paimon-python/pypaimon/common/options/core_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,36 @@ class CoreOptions:
)
)

SCAN_TIMESTAMP_MILLIS: ConfigOption[int] = (
ConfigOptions.key("scan.timestamp-millis")
.long_type()
.no_default_value()
.with_description(
"Optional timestamp in milliseconds used for time travel to the "
"latest snapshot equal to or earlier than the given timestamp."
)
)

SCAN_TIMESTAMP: ConfigOption[str] = (
ConfigOptions.key("scan.timestamp")
.string_type()
.no_default_value()
.with_description(
"Optional timestamp string (e.g. '2023-12-01 12:00:00') used for "
"time travel. Will be converted to milliseconds internally."
)
)

SCAN_WATERMARK: ConfigOption[int] = (
ConfigOptions.key("scan.watermark")
.long_type()
.no_default_value()
.with_description(
"Optional watermark used for time travel to the first snapshot "
"with watermark greater than or equal to the given value."
)
)

SOURCE_SPLIT_TARGET_SIZE: ConfigOption[MemorySize] = (
ConfigOptions.key("source.split.target-size")
.memory_type()
Expand Down Expand Up @@ -625,6 +655,15 @@ def scan_tag_name(self, default=None):
def scan_snapshot_id(self, default=None):
return self.options.get(CoreOptions.SCAN_SNAPSHOT_ID, default)

def scan_timestamp_millis(self, default=None):
return self.options.get(CoreOptions.SCAN_TIMESTAMP_MILLIS, default)

def scan_timestamp(self, default=None):
return self.options.get(CoreOptions.SCAN_TIMESTAMP, default)

def scan_watermark(self, default=None):
return self.options.get(CoreOptions.SCAN_WATERMARK, default)

def source_split_target_size(self, default=None):
return self.options.get(CoreOptions.SOURCE_SPLIT_TARGET_SIZE, default).get_bytes()

Expand Down
41 changes: 19 additions & 22 deletions paimon-python/pypaimon/read/table_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,18 @@ def _create_file_scanner(self) -> FileScanner:
options = self.table.options.options
snapshot_manager = self.table.snapshot_manager()
manifest_list_manager = ManifestListManager(self.table)
if options.contains(CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP):

from pypaimon.snapshot.time_travel_util import TimeTravelUtil, SCAN_KEYS
has_time_travel = any(options.contains_key(key) for key in SCAN_KEYS)
has_incremental = options.contains(CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP)

if has_incremental and has_time_travel:
raise ValueError(
"incremental-between-timestamp cannot be used together with "
"point-in-time scan options: %s" % SCAN_KEYS
)

if has_incremental:
ts = options.get(CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP).split(",")
if len(ts) != 2:
raise ValueError(
Expand Down Expand Up @@ -106,35 +117,21 @@ def incremental_manifest():
return manifests, end_snapshot

return FileScanner(self.table, incremental_manifest, self.predicate, self.limit)
elif options.contains(CoreOptions.SCAN_TAG_NAME): # Handle tag-based reading
tag_name = options.get(CoreOptions.SCAN_TAG_NAME)

def tag_manifest_scanner():
tag_manager = self.table.tag_manager()
tag = tag_manager.get_or_throw(tag_name)
snapshot = tag.trim_to_snapshot()
return manifest_list_manager.read_all(snapshot), snapshot

return FileScanner(
self.table,
tag_manifest_scanner,
self.predicate,
self.limit
)
elif options.contains(CoreOptions.SCAN_SNAPSHOT_ID): # Handle snapshot-id-based reading
snapshot_id = int(options.get(CoreOptions.SCAN_SNAPSHOT_ID))

def snapshot_id_manifest_scanner():
snapshot = snapshot_manager.get_snapshot_by_id(snapshot_id)
if has_time_travel:
def time_travel_manifest_scanner():
snapshot = TimeTravelUtil.try_travel_to_snapshot(
options, self.table.tag_manager(), snapshot_manager
)
if snapshot is None:
raise ValueError(
"Snapshot id %d does not exist" % snapshot_id
"Could not resolve time travel snapshot from scan options."
)
return manifest_list_manager.read_all(snapshot), snapshot

return FileScanner(
self.table,
snapshot_id_manifest_scanner,
time_travel_manifest_scanner,
self.predicate,
self.limit
)
Expand Down
50 changes: 50 additions & 0 deletions paimon-python/pypaimon/snapshot/snapshot_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,56 @@ def list_snapshots(self) -> List[Snapshot]:
snapshots.append(snapshot)
return snapshots

def later_or_equal_watermark(self, watermark: int) -> Optional[Snapshot]:
"""
Find the first snapshot with watermark >= the given value.

Args:
watermark: The watermark value to compare against

Returns:
The first snapshot with watermark >= the given value, or None if
no such snapshot exists
"""
earliest_snap = self.try_get_earliest_snapshot()
latest_snap = self.get_latest_snapshot()

if earliest_snap is None or latest_snap is None:
return None

earliest = earliest_snap.id
latest = latest_snap.id
result = None

while earliest <= latest:
mid = earliest + (latest - earliest) // 2
snapshot = self.get_snapshot_by_id(mid)

if snapshot is None:
found = False
for i in range(mid + 1, latest + 1):
snapshot = self.get_snapshot_by_id(i)
if snapshot is not None:
mid = i
found = True
break
if not found:
latest = mid - 1
continue

snap_watermark = snapshot.watermark
if snap_watermark is None:
earliest = mid + 1
continue

if snap_watermark >= watermark:
result = snapshot
latest = mid - 1
else:
earliest = mid + 1

return result

def get_snapshot_by_id(self, snapshot_id: int) -> Optional[Snapshot]:
"""
Get a snapshot by its ID.
Expand Down
79 changes: 76 additions & 3 deletions paimon-python/pypaimon/snapshot/time_travel_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

"""The util class of resolve snapshot from scan params for time travel."""

from datetime import datetime
from typing import Optional

from pypaimon.common.options.core_options import CoreOptions
Expand All @@ -25,11 +26,42 @@
from pypaimon.tag.tag_manager import TagManager

SCAN_KEYS = [
CoreOptions.SCAN_TAG_NAME.key(),
CoreOptions.SCAN_SNAPSHOT_ID.key(),
CoreOptions.SCAN_TAG_NAME.key(),
CoreOptions.SCAN_WATERMARK.key(),
CoreOptions.SCAN_TIMESTAMP.key(),
CoreOptions.SCAN_TIMESTAMP_MILLIS.key(),
]


def _parse_timestamp_to_millis(timestamp_str: str) -> int:
"""Parse a timestamp string to milliseconds since epoch using local timezone.

Consistent with Java's TimeZone.getDefault() behavior in TimeTravelUtil.

Supports formats:
- '2023-12-01 12:00:00'
- '2023-12-01T12:00:00'
- '2023-12-01 12:00:00.123'
"""
formats = [
"%Y-%m-%d %H:%M:%S.%f",
"%Y-%m-%d %H:%M:%S",
"%Y-%m-%dT%H:%M:%S.%f",
"%Y-%m-%dT%H:%M:%S",
]
for fmt in formats:
try:
dt = datetime.strptime(timestamp_str, fmt)
return int(dt.timestamp() * 1000)
except ValueError:
continue
raise ValueError(
f"Cannot parse timestamp '{timestamp_str}'. "
f"Expected format: 'yyyy-MM-dd HH:mm:ss' or 'yyyy-MM-dd HH:mm:ss.SSS'"
)


class TimeTravelUtil:
"""The util class of resolve snapshot from scan params for time travel."""

Expand All @@ -45,18 +77,22 @@ def try_travel_to_snapshot(
Supports the following time travel options:
- scan.tag-name: Travel to a specific tag
- scan.snapshot-id: Travel to a specific snapshot id
- scan.timestamp-millis: Travel to the latest snapshot <= the given timestamp (ms)
- scan.timestamp: Travel by timestamp string (e.g. '2023-12-01 12:00:00')
- scan.watermark: Travel to the first snapshot with watermark >= the given value

Args:
options: The options containing time travel parameters
tag_manager: The tag manager
snapshot_manager: The snapshot manager, required when
``scan.snapshot-id`` is set
using snapshot-id, timestamp, or watermark based time travel

Returns:
The Snapshot to travel to, or None if no time travel option is set.

Raises:
ValueError: If more than one time travel option is set
ValueError: If more than one time travel option is set, or if the
required manager is not provided
"""

scan_handle_keys = [key for key in SCAN_KEYS if options.contains_key(key)]
Expand Down Expand Up @@ -86,5 +122,42 @@ def try_travel_to_snapshot(
if snapshot is None:
raise ValueError(f"Snapshot id '{snapshot_id}' doesn't exist.")
return snapshot
elif key == CoreOptions.SCAN_TIMESTAMP_MILLIS.key():
if snapshot_manager is None:
raise ValueError(
"snapshot_manager is required to resolve scan.timestamp-millis"
)
timestamp_millis = int(core_options.scan_timestamp_millis())
snapshot = snapshot_manager.earlier_or_equal_time_mills(timestamp_millis)
if snapshot is None:
raise ValueError(
f"No snapshot found with timestamp earlier than or equal to {timestamp_millis}ms."
)
return snapshot
elif key == CoreOptions.SCAN_TIMESTAMP.key():
if snapshot_manager is None:
raise ValueError(
"snapshot_manager is required to resolve scan.timestamp"
)
timestamp_str = core_options.scan_timestamp()
timestamp_millis = _parse_timestamp_to_millis(timestamp_str)
snapshot = snapshot_manager.earlier_or_equal_time_mills(timestamp_millis)
if snapshot is None:
raise ValueError(
f"No snapshot found with timestamp earlier than or equal to '{timestamp_str}'."
)
return snapshot
elif key == CoreOptions.SCAN_WATERMARK.key():
if snapshot_manager is None:
raise ValueError(
"snapshot_manager is required to resolve scan.watermark"
)
watermark = int(core_options.scan_watermark())
snapshot = snapshot_manager.later_or_equal_watermark(watermark)
if snapshot is None:
raise ValueError(
f"No snapshot found with watermark greater than or equal to {watermark}."
)
return snapshot
else:
raise ValueError(f"Unsupported time travel mode: {key}")
Loading
Loading