diff --git a/paimon-python/pypaimon/common/options/core_options.py b/paimon-python/pypaimon/common/options/core_options.py index 7d9a227e4a9d..21d751c0c748 100644 --- a/paimon-python/pypaimon/common/options/core_options.py +++ b/paimon-python/pypaimon/common/options/core_options.py @@ -271,6 +271,36 @@ class CoreOptions: ) ) + SCAN_TIMESTAMP_MILLIS: ConfigOption[int] = ( + ConfigOptions.key("scan.timestamp-millis") + .long_type() + .no_default_value() + .with_description( + "Optional timestamp in milliseconds used for time travel to the " + "latest snapshot equal to or earlier than the given timestamp." + ) + ) + + SCAN_TIMESTAMP: ConfigOption[str] = ( + ConfigOptions.key("scan.timestamp") + .string_type() + .no_default_value() + .with_description( + "Optional timestamp string (e.g. '2023-12-01 12:00:00') used for " + "time travel. Will be converted to milliseconds internally." + ) + ) + + SCAN_WATERMARK: ConfigOption[int] = ( + ConfigOptions.key("scan.watermark") + .long_type() + .no_default_value() + .with_description( + "Optional watermark used for time travel to the first snapshot " + "with watermark greater than or equal to the given value." + ) + ) + SOURCE_SPLIT_TARGET_SIZE: ConfigOption[MemorySize] = ( ConfigOptions.key("source.split.target-size") .memory_type() @@ -625,6 +655,15 @@ def scan_tag_name(self, default=None): def scan_snapshot_id(self, default=None): return self.options.get(CoreOptions.SCAN_SNAPSHOT_ID, default) + def scan_timestamp_millis(self, default=None): + return self.options.get(CoreOptions.SCAN_TIMESTAMP_MILLIS, default) + + def scan_timestamp(self, default=None): + return self.options.get(CoreOptions.SCAN_TIMESTAMP, default) + + def scan_watermark(self, default=None): + return self.options.get(CoreOptions.SCAN_WATERMARK, default) + def source_split_target_size(self, default=None): return self.options.get(CoreOptions.SOURCE_SPLIT_TARGET_SIZE, default).get_bytes() diff --git a/paimon-python/pypaimon/read/table_scan.py b/paimon-python/pypaimon/read/table_scan.py index bc610134e0d0..623261803503 100755 --- a/paimon-python/pypaimon/read/table_scan.py +++ b/paimon-python/pypaimon/read/table_scan.py @@ -57,7 +57,18 @@ def _create_file_scanner(self) -> FileScanner: options = self.table.options.options snapshot_manager = self.table.snapshot_manager() manifest_list_manager = ManifestListManager(self.table) - if options.contains(CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP): + + from pypaimon.snapshot.time_travel_util import TimeTravelUtil, SCAN_KEYS + has_time_travel = any(options.contains_key(key) for key in SCAN_KEYS) + has_incremental = options.contains(CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP) + + if has_incremental and has_time_travel: + raise ValueError( + "incremental-between-timestamp cannot be used together with " + "point-in-time scan options: %s" % SCAN_KEYS + ) + + if has_incremental: ts = options.get(CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP).split(",") if len(ts) != 2: raise ValueError( @@ -106,35 +117,21 @@ def incremental_manifest(): return manifests, end_snapshot return FileScanner(self.table, incremental_manifest, self.predicate, self.limit) - elif options.contains(CoreOptions.SCAN_TAG_NAME): # Handle tag-based reading - tag_name = options.get(CoreOptions.SCAN_TAG_NAME) - - def tag_manifest_scanner(): - tag_manager = self.table.tag_manager() - tag = tag_manager.get_or_throw(tag_name) - snapshot = tag.trim_to_snapshot() - return manifest_list_manager.read_all(snapshot), snapshot - - return FileScanner( - self.table, - tag_manifest_scanner, - self.predicate, - self.limit - ) - elif options.contains(CoreOptions.SCAN_SNAPSHOT_ID): # Handle snapshot-id-based reading - snapshot_id = int(options.get(CoreOptions.SCAN_SNAPSHOT_ID)) - def snapshot_id_manifest_scanner(): - snapshot = snapshot_manager.get_snapshot_by_id(snapshot_id) + if has_time_travel: + def time_travel_manifest_scanner(): + snapshot = TimeTravelUtil.try_travel_to_snapshot( + options, self.table.tag_manager(), snapshot_manager + ) if snapshot is None: raise ValueError( - "Snapshot id %d does not exist" % snapshot_id + "Could not resolve time travel snapshot from scan options." ) return manifest_list_manager.read_all(snapshot), snapshot return FileScanner( self.table, - snapshot_id_manifest_scanner, + time_travel_manifest_scanner, self.predicate, self.limit ) diff --git a/paimon-python/pypaimon/snapshot/snapshot_manager.py b/paimon-python/pypaimon/snapshot/snapshot_manager.py index c8f1f4818eb5..41a1ac81ba90 100644 --- a/paimon-python/pypaimon/snapshot/snapshot_manager.py +++ b/paimon-python/pypaimon/snapshot/snapshot_manager.py @@ -257,6 +257,56 @@ def list_snapshots(self) -> List[Snapshot]: snapshots.append(snapshot) return snapshots + def later_or_equal_watermark(self, watermark: int) -> Optional[Snapshot]: + """ + Find the first snapshot with watermark >= the given value. + + Args: + watermark: The watermark value to compare against + + Returns: + The first snapshot with watermark >= the given value, or None if + no such snapshot exists + """ + earliest_snap = self.try_get_earliest_snapshot() + latest_snap = self.get_latest_snapshot() + + if earliest_snap is None or latest_snap is None: + return None + + earliest = earliest_snap.id + latest = latest_snap.id + result = None + + while earliest <= latest: + mid = earliest + (latest - earliest) // 2 + snapshot = self.get_snapshot_by_id(mid) + + if snapshot is None: + found = False + for i in range(mid + 1, latest + 1): + snapshot = self.get_snapshot_by_id(i) + if snapshot is not None: + mid = i + found = True + break + if not found: + latest = mid - 1 + continue + + snap_watermark = snapshot.watermark + if snap_watermark is None: + earliest = mid + 1 + continue + + if snap_watermark >= watermark: + result = snapshot + latest = mid - 1 + else: + earliest = mid + 1 + + return result + def get_snapshot_by_id(self, snapshot_id: int) -> Optional[Snapshot]: """ Get a snapshot by its ID. diff --git a/paimon-python/pypaimon/snapshot/time_travel_util.py b/paimon-python/pypaimon/snapshot/time_travel_util.py index de927f5f9910..8a8609b147ae 100644 --- a/paimon-python/pypaimon/snapshot/time_travel_util.py +++ b/paimon-python/pypaimon/snapshot/time_travel_util.py @@ -17,6 +17,7 @@ """The util class of resolve snapshot from scan params for time travel.""" +from datetime import datetime from typing import Optional from pypaimon.common.options.core_options import CoreOptions @@ -25,11 +26,42 @@ from pypaimon.tag.tag_manager import TagManager SCAN_KEYS = [ - CoreOptions.SCAN_TAG_NAME.key(), CoreOptions.SCAN_SNAPSHOT_ID.key(), + CoreOptions.SCAN_TAG_NAME.key(), + CoreOptions.SCAN_WATERMARK.key(), + CoreOptions.SCAN_TIMESTAMP.key(), + CoreOptions.SCAN_TIMESTAMP_MILLIS.key(), ] +def _parse_timestamp_to_millis(timestamp_str: str) -> int: + """Parse a timestamp string to milliseconds since epoch using local timezone. + + Consistent with Java's TimeZone.getDefault() behavior in TimeTravelUtil. + + Supports formats: + - '2023-12-01 12:00:00' + - '2023-12-01T12:00:00' + - '2023-12-01 12:00:00.123' + """ + formats = [ + "%Y-%m-%d %H:%M:%S.%f", + "%Y-%m-%d %H:%M:%S", + "%Y-%m-%dT%H:%M:%S.%f", + "%Y-%m-%dT%H:%M:%S", + ] + for fmt in formats: + try: + dt = datetime.strptime(timestamp_str, fmt) + return int(dt.timestamp() * 1000) + except ValueError: + continue + raise ValueError( + f"Cannot parse timestamp '{timestamp_str}'. " + f"Expected format: 'yyyy-MM-dd HH:mm:ss' or 'yyyy-MM-dd HH:mm:ss.SSS'" + ) + + class TimeTravelUtil: """The util class of resolve snapshot from scan params for time travel.""" @@ -45,18 +77,22 @@ def try_travel_to_snapshot( Supports the following time travel options: - scan.tag-name: Travel to a specific tag - scan.snapshot-id: Travel to a specific snapshot id + - scan.timestamp-millis: Travel to the latest snapshot <= the given timestamp (ms) + - scan.timestamp: Travel by timestamp string (e.g. '2023-12-01 12:00:00') + - scan.watermark: Travel to the first snapshot with watermark >= the given value Args: options: The options containing time travel parameters tag_manager: The tag manager snapshot_manager: The snapshot manager, required when - ``scan.snapshot-id`` is set + using snapshot-id, timestamp, or watermark based time travel Returns: The Snapshot to travel to, or None if no time travel option is set. Raises: - ValueError: If more than one time travel option is set + ValueError: If more than one time travel option is set, or if the + required manager is not provided """ scan_handle_keys = [key for key in SCAN_KEYS if options.contains_key(key)] @@ -86,5 +122,42 @@ def try_travel_to_snapshot( if snapshot is None: raise ValueError(f"Snapshot id '{snapshot_id}' doesn't exist.") return snapshot + elif key == CoreOptions.SCAN_TIMESTAMP_MILLIS.key(): + if snapshot_manager is None: + raise ValueError( + "snapshot_manager is required to resolve scan.timestamp-millis" + ) + timestamp_millis = int(core_options.scan_timestamp_millis()) + snapshot = snapshot_manager.earlier_or_equal_time_mills(timestamp_millis) + if snapshot is None: + raise ValueError( + f"No snapshot found with timestamp earlier than or equal to {timestamp_millis}ms." + ) + return snapshot + elif key == CoreOptions.SCAN_TIMESTAMP.key(): + if snapshot_manager is None: + raise ValueError( + "snapshot_manager is required to resolve scan.timestamp" + ) + timestamp_str = core_options.scan_timestamp() + timestamp_millis = _parse_timestamp_to_millis(timestamp_str) + snapshot = snapshot_manager.earlier_or_equal_time_mills(timestamp_millis) + if snapshot is None: + raise ValueError( + f"No snapshot found with timestamp earlier than or equal to '{timestamp_str}'." + ) + return snapshot + elif key == CoreOptions.SCAN_WATERMARK.key(): + if snapshot_manager is None: + raise ValueError( + "snapshot_manager is required to resolve scan.watermark" + ) + watermark = int(core_options.scan_watermark()) + snapshot = snapshot_manager.later_or_equal_watermark(watermark) + if snapshot is None: + raise ValueError( + f"No snapshot found with watermark greater than or equal to {watermark}." + ) + return snapshot else: raise ValueError(f"Unsupported time travel mode: {key}") diff --git a/paimon-python/pypaimon/tests/time_travel_util_test.py b/paimon-python/pypaimon/tests/time_travel_util_test.py index 10372164c789..7b8b139f1200 100644 --- a/paimon-python/pypaimon/tests/time_travel_util_test.py +++ b/paimon-python/pypaimon/tests/time_travel_util_test.py @@ -22,9 +22,11 @@ class _StubSnapshot: - def __init__(self, snapshot_id, schema_id=0): + def __init__(self, snapshot_id, schema_id=0, time_millis=0, watermark=None): self.id = snapshot_id self.schema_id = schema_id + self.time_millis = time_millis + self.watermark = watermark class _StubSnapshotManager: @@ -34,6 +36,31 @@ def __init__(self, snapshots): def get_snapshot_by_id(self, snapshot_id): return self._snapshots.get(snapshot_id) + def try_get_earliest_snapshot(self): + if not self._snapshots: + return None + return self._snapshots[min(self._snapshots.keys())] + + def get_latest_snapshot(self): + if not self._snapshots: + return None + return self._snapshots[max(self._snapshots.keys())] + + def earlier_or_equal_time_mills(self, timestamp): + result = None + for snap in sorted(self._snapshots.values(), key=lambda s: s.id): + if snap.time_millis <= timestamp: + result = snap + else: + break + return result + + def later_or_equal_watermark(self, watermark): + for snap in sorted(self._snapshots.values(), key=lambda s: s.id): + if snap.watermark is not None and snap.watermark >= watermark: + return snap + return None + class _StubTagManager: def __init__(self, tags): @@ -87,10 +114,117 @@ def test_rejects_setting_snapshot_id_and_tag_name_together(self): ) def test_scan_keys_contains_both_options(self): - # Sanity check: SCAN_KEYS must enumerate both time-travel modes, + # Sanity check: SCAN_KEYS must enumerate all time-travel modes, # otherwise the mutual-exclusion guard above would not trigger. self.assertIn('scan.snapshot-id', SCAN_KEYS) self.assertIn('scan.tag-name', SCAN_KEYS) + self.assertIn('scan.timestamp-millis', SCAN_KEYS) + self.assertIn('scan.timestamp', SCAN_KEYS) + self.assertIn('scan.watermark', SCAN_KEYS) + + def test_resolves_timestamp_millis(self): + snap1 = _StubSnapshot(1, time_millis=1000) + snap2 = _StubSnapshot(2, time_millis=2000) + snap3 = _StubSnapshot(3, time_millis=3000) + mgr = _StubSnapshotManager([snap1, snap2, snap3]) + result = TimeTravelUtil.try_travel_to_snapshot( + Options({'scan.timestamp-millis': '2500'}), + _StubTagManager({}), + mgr, + ) + self.assertIs(result, snap2) + + def test_resolves_timestamp_millis_exact_match(self): + snap1 = _StubSnapshot(1, time_millis=1000) + snap2 = _StubSnapshot(2, time_millis=2000) + mgr = _StubSnapshotManager([snap1, snap2]) + result = TimeTravelUtil.try_travel_to_snapshot( + Options({'scan.timestamp-millis': '2000'}), + _StubTagManager({}), + mgr, + ) + self.assertIs(result, snap2) + + def test_timestamp_millis_no_match_raises(self): + snap1 = _StubSnapshot(1, time_millis=5000) + mgr = _StubSnapshotManager([snap1]) + with self.assertRaises(ValueError): + TimeTravelUtil.try_travel_to_snapshot( + Options({'scan.timestamp-millis': '1000'}), + _StubTagManager({}), + mgr, + ) + + def test_timestamp_millis_without_snapshot_manager_raises(self): + with self.assertRaises(ValueError): + TimeTravelUtil.try_travel_to_snapshot( + Options({'scan.timestamp-millis': '1000'}), + _StubTagManager({}), + None, + ) + + def test_resolves_timestamp_string(self): + # Compute expected millis in local timezone, consistent with Java behavior + from datetime import datetime + expected_millis = int(datetime(2023, 12, 1, 0, 0, 0).timestamp() * 1000) + snap1 = _StubSnapshot(1, time_millis=expected_millis) + snap2 = _StubSnapshot(2, time_millis=expected_millis + 100000) + mgr = _StubSnapshotManager([snap1, snap2]) + result = TimeTravelUtil.try_travel_to_snapshot( + Options({'scan.timestamp': '2023-12-01 00:00:00'}), + _StubTagManager({}), + mgr, + ) + self.assertIs(result, snap1) + + def test_timestamp_string_invalid_format_raises(self): + snap1 = _StubSnapshot(1, time_millis=1000) + mgr = _StubSnapshotManager([snap1]) + with self.assertRaises(ValueError): + TimeTravelUtil.try_travel_to_snapshot( + Options({'scan.timestamp': 'not-a-timestamp'}), + _StubTagManager({}), + mgr, + ) + + def test_resolves_watermark(self): + snap1 = _StubSnapshot(1, watermark=100) + snap2 = _StubSnapshot(2, watermark=200) + snap3 = _StubSnapshot(3, watermark=300) + mgr = _StubSnapshotManager([snap1, snap2, snap3]) + result = TimeTravelUtil.try_travel_to_snapshot( + Options({'scan.watermark': '200'}), + _StubTagManager({}), + mgr, + ) + self.assertIs(result, snap2) + + def test_watermark_no_match_raises(self): + snap1 = _StubSnapshot(1, watermark=100) + mgr = _StubSnapshotManager([snap1]) + with self.assertRaises(ValueError): + TimeTravelUtil.try_travel_to_snapshot( + Options({'scan.watermark': '500'}), + _StubTagManager({}), + mgr, + ) + + def test_watermark_without_snapshot_manager_raises(self): + with self.assertRaises(ValueError): + TimeTravelUtil.try_travel_to_snapshot( + Options({'scan.watermark': '100'}), + _StubTagManager({}), + None, + ) + + def test_rejects_multiple_time_travel_options(self): + mgr = _StubSnapshotManager([_StubSnapshot(1, time_millis=1000)]) + with self.assertRaises(ValueError): + TimeTravelUtil.try_travel_to_snapshot( + Options({'scan.timestamp-millis': '1000', 'scan.snapshot-id': '1'}), + _StubTagManager({}), + mgr, + ) if __name__ == '__main__':