diff --git a/paimon-python/pypaimon/cli/cli_table.py b/paimon-python/pypaimon/cli/cli_table.py index e428bd2d711f..1be9f54568b7 100644 --- a/paimon-python/pypaimon/cli/cli_table.py +++ b/paimon-python/pypaimon/cli/cli_table.py @@ -724,6 +724,68 @@ def cmd_table_list_partitions(args): sys.exit(1) +def cmd_table_expire_partitions(args): + """ + Execute the 'table expire-partitions' command. + + Expires (drops) partitions based on time strategies. + + Args: + args: Parsed command line arguments. + """ + from pypaimon.cli.cli import load_catalog_config, create_catalog + from pypaimon.table.file_store_table import FileStoreTable + + config_path = args.config + config = load_catalog_config(config_path) + catalog = create_catalog(config) + + table_identifier = args.table + parts = table_identifier.split('.') + if len(parts) != 2: + print(f"Error: Invalid table identifier '{table_identifier}'. " + f"Expected format: 'database.table'", file=sys.stderr) + sys.exit(1) + + database_name, table_name = parts + + try: + table = catalog.get_table(f"{database_name}.{table_name}") + except Exception as e: + print(f"Error: Failed to get table '{table_identifier}': {e}", file=sys.stderr) + sys.exit(1) + + if not isinstance(table, FileStoreTable): + print(f"Error: Table '{table_identifier}' is not a FileStoreTable.", file=sys.stderr) + sys.exit(1) + + # Build override options from CLI args + override_options = {} + if args.expiration_time: + override_options["partition.expiration-time"] = args.expiration_time + if args.strategy: + override_options["partition.expiration-strategy"] = args.strategy + if args.timestamp_pattern: + override_options["partition.timestamp-pattern"] = args.timestamp_pattern + if args.timestamp_formatter: + override_options["partition.timestamp-formatter"] = args.timestamp_formatter + if args.max_expires: + override_options["partition.expiration-max-num"] = str(args.max_expires) + + try: + expired = table.expire_partitions(options=override_options if override_options else None) + if expired: + partition_desc = "; ".join( + ",".join(f"{k}={v}" for k, v in p.items()) for p in expired + ) + print(f"Expired {len(expired)} partition(s): [{partition_desc}]") + else: + print("No partitions expired.") + except Exception as e: + print(f"Error: Failed to expire partitions: {e}", file=sys.stderr) + sys.exit(1) + + def cmd_table_drop_partition(args): """ Execute the 'table drop-partition' command. @@ -1033,3 +1095,44 @@ def add_table_subcommands(table_parser): update_comment_parser = alter_subparsers.add_parser('update-comment', help='Update table comment') update_comment_parser.add_argument('--comment', '-c', required=True, help='New table comment') update_comment_parser.set_defaults(func=cmd_table_alter) + + # table expire-partitions command + expire_parser = table_subparsers.add_parser( + 'expire-partitions', help='Expire (drop) partitions based on time strategies') + expire_parser.add_argument( + 'table', + help='Table identifier in format: database.table' + ) + expire_parser.add_argument( + '--expiration-time', '-e', + type=str, + default=None, + help=('How old a partition must be before expiring (e.g., "7d", "24h"). ' + 'Overrides the table option partition.expiration-time.') + ) + expire_parser.add_argument( + '--strategy', '-s', + type=str, + choices=['values-time', 'update-time'], + default=None, + help='Expiration strategy: values-time (parse partition values) or update-time (use file creation time)' + ) + expire_parser.add_argument( + '--timestamp-pattern', + type=str, + default=None, + help='Pattern to extract timestamp from partition values (e.g., "$dt $hour:00:00")' + ) + expire_parser.add_argument( + '--timestamp-formatter', + type=str, + default=None, + help='Java DateTimeFormatter pattern for parsing partition timestamps (e.g., "yyyyMMdd", "yyyy-MM-dd HH:mm:ss")' + ) + expire_parser.add_argument( + '--max-expires', + type=int, + default=None, + help='Maximum number of partitions to expire in one call' + ) + expire_parser.set_defaults(func=cmd_table_expire_partitions) diff --git a/paimon-python/pypaimon/common/options/core_options.py b/paimon-python/pypaimon/common/options/core_options.py index 7d9a227e4a9d..0c91ced3bb6f 100644 --- a/paimon-python/pypaimon/common/options/core_options.py +++ b/paimon-python/pypaimon/common/options/core_options.py @@ -506,6 +506,57 @@ class CoreOptions: ) ) + # Partition expiration options + PARTITION_EXPIRATION_TIME: ConfigOption[timedelta] = ( + ConfigOptions.key("partition.expiration-time") + .duration_type() + .no_default_value() + .with_description( + "The expiration interval of a partition. A partition will be expired if " + "it's lifetime is over this value. Partition time is extracted from the " + "partition value." + ) + ) + + PARTITION_EXPIRATION_CHECK_INTERVAL: ConfigOption[timedelta] = ( + ConfigOptions.key("partition.expiration-check-interval") + .duration_type() + .no_default_value() + .with_description( + "The check interval of partition expiration." + ) + ) + + PARTITION_EXPIRATION_STRATEGY: ConfigOption[str] = ( + ConfigOptions.key("partition.expiration-strategy") + .string_type() + .default_value("values-time") + .with_description( + "The expiration strategy of a partition. 'values-time' extracts time from " + "partition values; 'update-time' uses the last file creation time." + ) + ) + + PARTITION_TIMESTAMP_PATTERN: ConfigOption[str] = ( + ConfigOptions.key("partition.timestamp-pattern") + .string_type() + .no_default_value() + .with_description( + "Specify a pattern to get a timestamp from partitions. The pattern is " + "built from partition field placeholders (e.g., '$dt $hour:00:00')." + ) + ) + + PARTITION_TIMESTAMP_FORMATTER: ConfigOption[str] = ( + ConfigOptions.key("partition.timestamp-formatter") + .string_type() + .no_default_value() + .with_description( + "The formatter to parse the timestamp string to a datetime. " + "Uses Python strptime format (e.g., '%Y-%m-%d')." + ) + ) + def __init__(self, options: Options): self.options = options diff --git a/paimon-python/pypaimon/partition/__init__.py b/paimon-python/pypaimon/partition/__init__.py new file mode 100644 index 000000000000..d346cd49bebe --- /dev/null +++ b/paimon-python/pypaimon/partition/__init__.py @@ -0,0 +1,32 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from pypaimon.partition.partition_time_extractor import PartitionTimeExtractor +from pypaimon.partition.partition_expire_strategy import ( + PartitionExpireStrategy, + PartitionValuesTimeExpireStrategy, + PartitionUpdateTimeExpireStrategy, +) +from pypaimon.partition.partition_expire import PartitionExpire + +__all__ = [ + "PartitionTimeExtractor", + "PartitionExpireStrategy", + "PartitionValuesTimeExpireStrategy", + "PartitionUpdateTimeExpireStrategy", + "PartitionExpire", +] diff --git a/paimon-python/pypaimon/partition/partition_expire.py b/paimon-python/pypaimon/partition/partition_expire.py new file mode 100644 index 000000000000..f9336a7263a2 --- /dev/null +++ b/paimon-python/pypaimon/partition/partition_expire.py @@ -0,0 +1,342 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Main orchestration class for partition expiration. + +This mirrors the Java implementation at: + org.apache.paimon.operation.PartitionExpire +""" + +import logging +from datetime import datetime, timedelta +from typing import Dict, List, Optional + +from pypaimon.partition.partition_expire_strategy import ( + PartitionEntry, + PartitionExpireStrategy, + PartitionUpdateTimeExpireStrategy, + PartitionValuesTimeExpireStrategy, +) + +logger = logging.getLogger(__name__) + + +class PartitionExpire: + """ + Orchestrates partition expiration: reads partition entries, applies the + configured strategy, and drops expired partitions. + + Usage: + expire = PartitionExpire.from_table(table) + expired = expire.expire() + + Or construct directly: + expire = PartitionExpire( + expiration_time=timedelta(days=7), + strategy=PartitionValuesTimeExpireStrategy(...), + partition_reader=..., + partition_dropper=..., + ) + expired = expire.expire() + """ + + def __init__( + self, + expiration_time: timedelta, + strategy: PartitionExpireStrategy, + partition_reader, # Callable[[], List[PartitionEntry]] + partition_dropper, # Callable[[List[Dict[str, str]], int], None] + check_interval: Optional[timedelta] = None, + max_expire_num: int = 100, + ): + """ + Args: + expiration_time: How old a partition must be before it expires. + strategy: The strategy to determine partition expiration time. + partition_reader: A callable that returns all current PartitionEntry objects. + partition_dropper: A callable that drops partitions (accepts List[Dict[str,str]], commit_id). + check_interval: Minimum interval between expiration checks. If None, always check. + max_expire_num: Maximum number of partitions to expire in one call (default 100, matching Java). + """ + self._expiration_time = expiration_time + self._strategy = strategy + self._partition_reader = partition_reader + self._partition_dropper = partition_dropper + self._check_interval = check_interval + self._max_expire_num = max_expire_num + self._last_check: Optional[datetime] = None + + def expire(self, now: Optional[datetime] = None) -> List[Dict[str, str]]: + """ + Run partition expiration. + + Args: + now: The current time (defaults to datetime.now() if not specified). + + Returns: + List of expired partition specs that were dropped. + Empty list if no partitions expired or check interval not yet elapsed. + """ + if now is None: + now = datetime.now() + + # Check interval gating + if self._check_interval is not None and self._last_check is not None: + if now < self._last_check + self._check_interval: + return [] + + expired_specs = self._do_expire(now) + self._last_check = now + return expired_specs + + def _do_expire(self, now: datetime) -> List[Dict[str, str]]: + """Perform the actual expiration logic.""" + expire_datetime = now - self._expiration_time + + # Read all partition entries + partition_entries = self._partition_reader() + if not partition_entries: + return [] + + # Apply strategy to find expired partitions + expired_entries = self._strategy.select_expired_partitions(partition_entries, expire_datetime) + if not expired_entries: + return [] + + # Sort by partition spec string for deterministic behavior + expired_entries.sort( + key=lambda e: ",".join(f"{k}={v}" for k, v in sorted(e.spec.items())) + ) + + # Limit the number of partitions to expire + if self._max_expire_num > 0 and len(expired_entries) > self._max_expire_num: + expired_entries = expired_entries[: self._max_expire_num] + + # Convert to partition specs + expired_specs = [self._strategy.to_partition_string(entry) for entry in expired_entries] + + if expired_specs: + logger.info("Expiring partitions: %s", expired_specs) + # Use a fixed commit identifier for batch operations + commit_identifier = int(now.timestamp() * 1000) + self._partition_dropper(expired_specs, commit_identifier) + + return expired_specs + + @classmethod + def from_table(cls, table, options: Optional[Dict[str, str]] = None) -> Optional['PartitionExpire']: + """ + Create a PartitionExpire instance from a FileStoreTable. + + Reads configuration from table options: + - partition.expiration-time + - partition.expiration-check-interval + - partition.expiration-strategy (values-time | update-time) + - partition.timestamp-pattern + - partition.timestamp-formatter + + Args: + table: A FileStoreTable instance. + options: Optional override options dict. + + Returns: + A PartitionExpire instance, or None if expiration is not configured. + """ + # Get table options + table_options = dict(table.table_schema.options) if table.table_schema.options else {} + if options: + table_options.update(options) + + # Check if partition expiration is configured + expiration_time_str = table_options.get("partition.expiration-time") + if not expiration_time_str: + return None + + # Table must have partition keys + if not table.partition_keys: + logger.warning("Cannot configure partition expiration on a non-partitioned table.") + return None + + # Parse expiration time + expiration_time = _parse_duration(expiration_time_str) + if expiration_time is None: + logger.warning("Invalid partition.expiration-time: %s", expiration_time_str) + return None + + # Parse check interval + check_interval_str = table_options.get("partition.expiration-check-interval") + check_interval = _parse_duration(check_interval_str) if check_interval_str else None + + # Parse strategy + strategy_name = table_options.get("partition.expiration-strategy", "values-time") + partition_keys = list(table.partition_keys) + partition_default_name = table_options.get("partition.default-name", "__DEFAULT_PARTITION__") + timestamp_pattern = table_options.get("partition.timestamp-pattern") + timestamp_formatter = table_options.get("partition.timestamp-formatter") + + if strategy_name == "values-time": + strategy = PartitionValuesTimeExpireStrategy( + partition_keys=partition_keys, + partition_default_name=partition_default_name, + timestamp_pattern=timestamp_pattern, + timestamp_formatter=timestamp_formatter, + ) + elif strategy_name == "update-time": + strategy = PartitionUpdateTimeExpireStrategy( + partition_keys=partition_keys, + partition_default_name=partition_default_name, + ) + else: + raise ValueError(f"Unknown partition expiration strategy: '{strategy_name}'. " + f"Supported: 'values-time', 'update-time'.") + + # Parse max expire num + max_expire_num_str = table_options.get("partition.expiration-max-num") + max_expire_num = int(max_expire_num_str) if max_expire_num_str else 100 + + # Build partition reader: reads manifest entries and aggregates by partition + def partition_reader() -> List[PartitionEntry]: + return _read_partition_entries(table) + + # Build partition dropper: uses FileStoreCommit.drop_partitions + def partition_dropper(partitions: List[Dict[str, str]], commit_identifier: int): + commit = table.new_batch_write_builder().new_commit() + try: + commit.truncate_partitions(partitions) + finally: + commit.close() + + return cls( + expiration_time=expiration_time, + strategy=strategy, + partition_reader=partition_reader, + partition_dropper=partition_dropper, + check_interval=check_interval, + max_expire_num=max_expire_num, + ) + + +def _read_partition_entries(table) -> List[PartitionEntry]: + """ + Read all active partition entries from the table's latest snapshot. + + This aggregates manifest entries by partition to produce per-partition statistics, + mirroring the logic in filesystem_catalog.list_partitions_paged. + """ + from pypaimon.manifest.manifest_list_manager import ManifestListManager + from pypaimon.manifest.manifest_file_manager import ManifestFileManager + + snapshot = table.snapshot_manager().get_latest_snapshot() + if snapshot is None: + return [] + + manifest_list_manager = ManifestListManager(table) + manifest_file_manager = ManifestFileManager(table) + manifest_files = manifest_list_manager.read_all(snapshot) + entries = manifest_file_manager.read_entries_parallel(manifest_files, drop_stats=True) + + # Group entries by partition spec + partition_map = {} # spec_key -> aggregated stats + partition_default_name = (table.table_schema.options or {}).get( + "partition.default-name", "__DEFAULT_PARTITION__") + for entry in entries: + spec = {} + for field, v in zip(entry.partition.fields, entry.partition.values): + if v is None: + spec[field.name] = partition_default_name + else: + spec[field.name] = str(v) + spec_key = tuple(sorted(spec.items())) + + if spec_key not in partition_map: + partition_map[spec_key] = { + 'spec': spec, + 'record_count': 0, + 'file_size_in_bytes': 0, + 'file_count': 0, + 'last_file_creation_time': 0, + } + stats = partition_map[spec_key] + stats['record_count'] += entry.file.row_count + stats['file_size_in_bytes'] += entry.file.file_size + stats['file_count'] += 1 + if entry.file.creation_time is not None: + ct = entry.file.creation_time.get_millisecond() + if ct > stats['last_file_creation_time']: + stats['last_file_creation_time'] = ct + + return [ + PartitionEntry( + spec=stats['spec'], + record_count=stats['record_count'], + file_count=stats['file_count'], + file_size_in_bytes=stats['file_size_in_bytes'], + last_file_creation_time=stats['last_file_creation_time'], + ) + for stats in partition_map.values() + ] + + +def _parse_duration(duration_str: str) -> Optional[timedelta]: + """ + Parse a duration string like '7d', '1h', '30m', '10s', '1d 2h'. + Also supports ISO-like formats: '7 days', '1 hour'. + + Returns None if parsing fails. + """ + if not duration_str: + return None + + duration_str = duration_str.strip() + + # Try simple numeric formats with suffix + import re + total_seconds = 0 + found = False + + # Match patterns like: 7d, 12h, 30m, 60s, 500ms + # Order matters: longer suffixes first to avoid partial matches (e.g., 'ms' before 'm') + pattern = re.compile( + r'(\d+)\s*(milliseconds|millisecond|ms|days|day|d|hours|hour|h|minutes|minute|min|m|seconds|second|sec|s)' + ) + for match in pattern.finditer(duration_str.lower()): + found = True + value = int(match.group(1)) + unit = match.group(2) + if unit in ('d', 'day', 'days'): + total_seconds += value * 86400 + elif unit in ('h', 'hour', 'hours'): + total_seconds += value * 3600 + elif unit in ('m', 'min', 'minute', 'minutes'): + total_seconds += value * 60 + elif unit in ('s', 'sec', 'second', 'seconds'): + total_seconds += value + elif unit in ('ms', 'millisecond', 'milliseconds'): + total_seconds += value / 1000.0 + + if found: + return timedelta(seconds=total_seconds) + + # Try plain integer as milliseconds (Java Duration convention) + try: + ms = int(duration_str) + return timedelta(milliseconds=ms) + except ValueError: + pass + + return None diff --git a/paimon-python/pypaimon/partition/partition_expire_strategy.py b/paimon-python/pypaimon/partition/partition_expire_strategy.py new file mode 100644 index 000000000000..f416784ccaf6 --- /dev/null +++ b/paimon-python/pypaimon/partition/partition_expire_strategy.py @@ -0,0 +1,156 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Partition expiration strategy implementations. + +This mirrors the Java classes: + - org.apache.paimon.partition.PartitionExpireStrategy + - org.apache.paimon.partition.PartitionValuesTimeExpireStrategy + - org.apache.paimon.partition.PartitionUpdateTimeExpireStrategy +""" + +import logging +from abc import ABC, abstractmethod +from datetime import datetime +from typing import Dict, List, Optional + +from pypaimon.partition.partition_time_extractor import PartitionTimeExtractor + +logger = logging.getLogger(__name__) + + +class PartitionEntry: + """ + Represents aggregated statistics for a single partition. + + Attributes: + spec: Partition specification as {column_name: value_string}. + record_count: Total number of records in the partition. + file_count: Total number of files in the partition. + file_size_in_bytes: Total file size in bytes. + last_file_creation_time: Latest file creation time in epoch milliseconds. + """ + + def __init__( + self, + spec: Dict[str, str], + record_count: int = 0, + file_count: int = 0, + file_size_in_bytes: int = 0, + last_file_creation_time: int = 0, + ): + self.spec = spec + self.record_count = record_count + self.file_count = file_count + self.file_size_in_bytes = file_size_in_bytes + self.last_file_creation_time = last_file_creation_time + + def __repr__(self): + spec_str = ",".join(f"{k}={v}" for k, v in self.spec.items()) + return f"PartitionEntry({spec_str}, records={self.record_count})" + + +class PartitionExpireStrategy(ABC): + """ + Abstract strategy for determining which partitions are expired. + """ + + def __init__(self, partition_keys: List[str], partition_default_name: str = "__DEFAULT_PARTITION__"): + self.partition_keys = partition_keys + self.partition_default_name = partition_default_name + + @abstractmethod + def select_expired_partitions( + self, partition_entries: List[PartitionEntry], expiration_time: datetime + ) -> List[PartitionEntry]: + """ + Select partitions that have expired. + + Args: + partition_entries: All current partition entries. + expiration_time: The expiration threshold datetime. Partitions older + than this should be considered expired. + + Returns: + List of expired PartitionEntry objects. + """ + pass + + def to_partition_string(self, entry: PartitionEntry) -> Dict[str, str]: + """Convert a PartitionEntry to a partition spec dict suitable for drop_partitions.""" + return dict(entry.spec) + + +class PartitionValuesTimeExpireStrategy(PartitionExpireStrategy): + """ + Expire partitions by parsing partition field values as timestamps. + + For example, a partition `dt=2024-01-01` would be parsed to extract + the date 2024-01-01, and compared against the expiration threshold. + """ + + def __init__( + self, + partition_keys: List[str], + partition_default_name: str = "__DEFAULT_PARTITION__", + timestamp_pattern: Optional[str] = None, + timestamp_formatter: Optional[str] = None, + ): + super().__init__(partition_keys, partition_default_name) + self._time_extractor = PartitionTimeExtractor( + pattern=timestamp_pattern, formatter=timestamp_formatter + ) + + def select_expired_partitions( + self, partition_entries: List[PartitionEntry], expiration_time: datetime + ) -> List[PartitionEntry]: + expired = [] + for entry in partition_entries: + try: + partition_time = self._time_extractor.extract_from_spec(entry.spec) + if expiration_time > partition_time: + expired.append(entry) + except (ValueError, TypeError) as e: + logger.warning( + "Cannot extract datetime from partition %s: %s. " + "Skipping this partition for expiration. " + "Consider using 'update-time' strategy for non-date formatted partitions.", + entry.spec, e, + ) + except (IndexError, KeyError) as e: + logger.warning( + "Partition %s has null or missing values, cannot expire: %s", + entry.spec, e, + ) + return expired + + +class PartitionUpdateTimeExpireStrategy(PartitionExpireStrategy): + """ + Expire partitions by comparing their last file creation time + against the expiration threshold. + """ + + def select_expired_partitions( + self, partition_entries: List[PartitionEntry], expiration_time: datetime + ) -> List[PartitionEntry]: + expiration_millis = int(expiration_time.timestamp() * 1000) + return [ + entry for entry in partition_entries + if expiration_millis > entry.last_file_creation_time + ] diff --git a/paimon-python/pypaimon/partition/partition_time_extractor.py b/paimon-python/pypaimon/partition/partition_time_extractor.py new file mode 100644 index 000000000000..6c58cfea2161 --- /dev/null +++ b/paimon-python/pypaimon/partition/partition_time_extractor.py @@ -0,0 +1,150 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Time extractor to extract datetime from partition values. + +This mirrors the Java implementation at: + org.apache.paimon.partition.PartitionTimeExtractor +""" + +import re +from datetime import datetime, time as dt_time +from typing import Dict, List, Optional + + +# Default timestamp formats (lenient parsing, mirrors Java TIMESTAMP_FORMATTER) +_DEFAULT_TIMESTAMP_FORMATS = [ + "%Y-%m-%d %H:%M:%S.%f", + "%Y-%m-%d %H:%M:%S", + "%Y-%m-%dT%H:%M:%S.%f", + "%Y-%m-%dT%H:%M:%S", + "%Y-%m-%d %H:%M", + "%Y-%m-%d", +] + +# Java DateTimeFormatter → Python strptime conversion (order matters: longest first) +_JAVA_TO_PYTHON_PATTERNS = [ + ("yyyy", "%Y"), + ("yy", "%y"), + ("MM", "%m"), + ("dd", "%d"), + ("HH", "%H"), + ("mm", "%M"), + ("SSS", "%f"), + ("SS", "%f"), + ("ss", "%S"), +] + + +def _java_to_python_format(java_fmt: str) -> str: + """Convert Java DateTimeFormatter pattern to Python strptime format. + + Handles: + - Standard tokens: yyyy, MM, dd, HH, mm, ss, SSS + - Quoted literals: 'T' → T (strips single quotes) + """ + import re + # Strip Java quoted literals: 'T' → T, '' → ' + result = re.sub(r"'([^']*)'", r"\1", java_fmt) + for java_pat, py_pat in _JAVA_TO_PYTHON_PATTERNS: + result = result.replace(java_pat, py_pat) + return result + + +class PartitionTimeExtractor: + """ + Extracts a datetime from partition key values. + + Supports two modes: + 1. No pattern: uses the first partition value directly as a timestamp string. + 2. With pattern: replaces $ placeholders with partition values, then parses. + + The optional ``formatter`` is a strptime-compatible format string used to parse + the resulting timestamp string. + """ + + def __init__(self, pattern: Optional[str] = None, formatter: Optional[str] = None): + self._pattern = pattern + self._formatter = formatter + + def extract(self, partition_keys: List[str], partition_values: List[object]) -> datetime: + """ + Extract a datetime from the given partition keys and values. + + Args: + partition_keys: Ordered list of partition column names. + partition_values: Ordered list of partition column values (stringifiable). + + Returns: + The extracted datetime. + + Raises: + ValueError: If the timestamp string cannot be parsed. + """ + timestamp_string = self._build_timestamp_string(partition_keys, partition_values) + return self._to_local_datetime(timestamp_string) + + def extract_from_spec(self, spec: Dict[str, str]) -> datetime: + """ + Extract a datetime from a partition spec dictionary. + + Args: + spec: Partition specification {key: value}. + + Returns: + The extracted datetime. + """ + keys = list(spec.keys()) + values = list(spec.values()) + return self.extract(keys, values) + + def _build_timestamp_string(self, partition_keys: List[str], partition_values: List[object]) -> str: + if self._pattern is None: + return str(partition_values[0]) + result = self._pattern + for i, key in enumerate(partition_keys): + result = re.sub(r'\$' + re.escape(key), str(partition_values[i]), result) + return result + + def _to_local_datetime(self, timestamp_string: str) -> datetime: + if self._formatter is not None: + py_format = _java_to_python_format(self._formatter) + return self._parse_with_formatter(timestamp_string, py_format) + return self._parse_default(timestamp_string) + + @staticmethod + def _parse_with_formatter(timestamp_string: str, formatter: str) -> datetime: + """Parse using the converted Python strptime pattern.""" + try: + return datetime.strptime(timestamp_string, formatter) + except ValueError: + parsed_date = datetime.strptime(timestamp_string, formatter).date() + return datetime.combine(parsed_date, dt_time.min) + + @staticmethod + def _parse_default(timestamp_string: str) -> datetime: + """Parse using default formats (lenient).""" + for fmt in _DEFAULT_TIMESTAMP_FORMATS: + try: + return datetime.strptime(timestamp_string, fmt) + except ValueError: + continue + raise ValueError( + f"Cannot parse timestamp string '{timestamp_string}' with any default format. " + f"Tried: {_DEFAULT_TIMESTAMP_FORMATS}" + ) diff --git a/paimon-python/pypaimon/table/file_store_table.py b/paimon-python/pypaimon/table/file_store_table.py index af52bbe84312..c57a53990a53 100644 --- a/paimon-python/pypaimon/table/file_store_table.py +++ b/paimon-python/pypaimon/table/file_store_table.py @@ -395,6 +395,52 @@ def new_batch_write_builder(self) -> BatchWriteBuilder: def new_stream_write_builder(self) -> StreamWriteBuilder: return StreamWriteBuilder(self) + def statistics(self) -> 'Optional[Statistics]': + """Read existing statistics for this table from the latest snapshot. + + Returns: + A Statistics instance, or None if no statistics exist. + """ + from pypaimon.stats.stats_file_handler import StatsFileHandler + snapshot_mgr = self.snapshot_manager() + snapshot = snapshot_mgr.get_latest_snapshot() + if snapshot is None: + return None + handler = StatsFileHandler(self.file_io, self.table_path) + return handler.read_stats(snapshot) + + def analyze(self, columns: 'Optional[List[str]]' = None) -> 'Statistics': + """Compute column-level statistics from manifest metadata. + + This method scans manifest file entries to aggregate per-file statistics + (min, max, null_count) into table-level column statistics. It does NOT + scan actual data files, making it very fast. + + Args: + columns: Optional list of column names to analyze. If None, all + columns are analyzed. + + Returns: + A Statistics instance with the computed stats. + + Raises: + ValueError: If the table has no snapshots or specified columns + do not exist. + """ + from pypaimon.stats.statistics_collector import StatisticsCollector + from pypaimon.stats.stats_file_handler import StatsFileHandler + + collector = StatisticsCollector(self) + stats = collector.collect(columns=columns) + if stats is None: + raise ValueError("Cannot analyze table: no snapshots exist.") + + # Write statistics file + handler = StatsFileHandler(self.file_io, self.table_path) + handler.write_stats(stats) + + return stats + def new_full_text_search_builder(self) -> 'FullTextSearchBuilder': from pypaimon.table.source.full_text_search_builder import FullTextSearchBuilderImpl return FullTextSearchBuilderImpl(self) @@ -473,6 +519,34 @@ def _try_time_travel(self, options: Options) -> Optional[TableSchema]: except Exception: return None + def expire_partitions(self, options: Optional[dict] = None) -> List[dict]: + """ + Expire (drop) partitions based on time strategies configured in table options. + + The following table options control partition expiration: + - partition.expiration-time: How old a partition must be before it expires (e.g., '7d'). + - partition.expiration-check-interval: Minimum interval between checks (e.g., '1h'). + - partition.expiration-strategy: 'values-time' or 'update-time'. + - partition.timestamp-pattern: Pattern to extract time from partition values. + - partition.timestamp-formatter: strptime format for parsing extracted time. + + Args: + options: Optional override options dict for expiration configuration. + + Returns: + List of partition specs (dicts) that were expired and dropped. + Empty list if no partitions were expired or expiration is not configured. + + Raises: + ValueError: If the table is not partitioned or strategy is unknown. + """ + from pypaimon.partition.partition_expire import PartitionExpire + + expire = PartitionExpire.from_table(self, options=options) + if expire is None: + return [] + return expire.expire() + def _create_external_paths(self) -> List[str]: from urllib.parse import urlparse from pypaimon.common.options.core_options import ExternalPathStrategy diff --git a/paimon-python/pypaimon/tests/partition_expire_test.py b/paimon-python/pypaimon/tests/partition_expire_test.py new file mode 100644 index 000000000000..873ae6433fef --- /dev/null +++ b/paimon-python/pypaimon/tests/partition_expire_test.py @@ -0,0 +1,340 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Unit tests for partition expiration module.""" + +import unittest +from datetime import datetime, timedelta +from unittest.mock import MagicMock + +from pypaimon.partition.partition_time_extractor import PartitionTimeExtractor +from pypaimon.partition.partition_expire_strategy import ( + PartitionEntry, + PartitionValuesTimeExpireStrategy, + PartitionUpdateTimeExpireStrategy, +) +from pypaimon.partition.partition_expire import PartitionExpire, _parse_duration + + +class TestPartitionTimeExtractor(unittest.TestCase): + """Tests for PartitionTimeExtractor.""" + + def test_extract_simple_date_no_pattern(self): + """Extract date from first partition value without pattern.""" + extractor = PartitionTimeExtractor() + result = extractor.extract(["dt"], ["2024-01-15"]) + self.assertEqual(result, datetime(2024, 1, 15, 0, 0, 0)) + + def test_extract_timestamp_no_pattern(self): + """Extract full timestamp from first partition value.""" + extractor = PartitionTimeExtractor() + result = extractor.extract(["dt"], ["2024-01-15 10:30:00"]) + self.assertEqual(result, datetime(2024, 1, 15, 10, 30, 0)) + + def test_extract_with_pattern(self): + """Extract timestamp using pattern with multiple partition keys.""" + extractor = PartitionTimeExtractor(pattern="$dt $hour:00:00") + result = extractor.extract(["dt", "hour"], ["2024-01-15", "10"]) + self.assertEqual(result, datetime(2024, 1, 15, 10, 0, 0)) + + def test_extract_with_formatter(self): + """Extract using custom formatter.""" + extractor = PartitionTimeExtractor(formatter="%Y%m%d") + result = extractor.extract(["dt"], ["20240115"]) + self.assertEqual(result, datetime(2024, 1, 15, 0, 0, 0)) + + def test_extract_with_pattern_and_formatter(self): + """Extract using both pattern and formatter.""" + extractor = PartitionTimeExtractor(pattern="$year-$month-$day", formatter="%Y-%m-%d") + result = extractor.extract(["year", "month", "day"], ["2024", "01", "15"]) + self.assertEqual(result, datetime(2024, 1, 15, 0, 0, 0)) + + def test_extract_from_spec(self): + """Extract from a partition spec dictionary.""" + extractor = PartitionTimeExtractor() + result = extractor.extract_from_spec({"dt": "2024-03-20"}) + self.assertEqual(result, datetime(2024, 3, 20, 0, 0, 0)) + + def test_extract_invalid_format_raises(self): + """Raise ValueError for unparseable timestamps.""" + extractor = PartitionTimeExtractor() + with self.assertRaises(ValueError): + extractor.extract(["dt"], ["not-a-date"]) + + def test_extract_single_digit_month_day(self): + """Parse dates with single-digit month and day.""" + extractor = PartitionTimeExtractor() + result = extractor.extract(["dt"], ["2024-1-5"]) + self.assertEqual(result, datetime(2024, 1, 5, 0, 0, 0)) + + +class TestPartitionValuesTimeExpireStrategy(unittest.TestCase): + """Tests for PartitionValuesTimeExpireStrategy.""" + + def test_select_expired_partitions(self): + """Partitions older than threshold should be selected.""" + strategy = PartitionValuesTimeExpireStrategy( + partition_keys=["dt"], + partition_default_name="__DEFAULT_PARTITION__", + ) + entries = [ + PartitionEntry(spec={"dt": "2024-01-01"}, record_count=100), + PartitionEntry(spec={"dt": "2024-01-10"}, record_count=200), + PartitionEntry(spec={"dt": "2024-01-20"}, record_count=300), + ] + # Expire anything before 2024-01-15 + expiration_time = datetime(2024, 1, 15) + expired = strategy.select_expired_partitions(entries, expiration_time) + self.assertEqual(len(expired), 2) + self.assertEqual(expired[0].spec["dt"], "2024-01-01") + self.assertEqual(expired[1].spec["dt"], "2024-01-10") + + def test_no_expired_partitions(self): + """No partitions should be selected if all are newer.""" + strategy = PartitionValuesTimeExpireStrategy( + partition_keys=["dt"], + ) + entries = [ + PartitionEntry(spec={"dt": "2024-06-01"}, record_count=100), + ] + expiration_time = datetime(2024, 1, 1) + expired = strategy.select_expired_partitions(entries, expiration_time) + self.assertEqual(len(expired), 0) + + def test_unparseable_partitions_skipped(self): + """Partitions with unparseable values should be skipped with a warning.""" + strategy = PartitionValuesTimeExpireStrategy( + partition_keys=["dt"], + ) + entries = [ + PartitionEntry(spec={"dt": "invalid-date"}, record_count=100), + PartitionEntry(spec={"dt": "2024-01-01"}, record_count=200), + ] + expiration_time = datetime(2024, 6, 1) + expired = strategy.select_expired_partitions(entries, expiration_time) + # Only the parseable one should be expired + self.assertEqual(len(expired), 1) + self.assertEqual(expired[0].spec["dt"], "2024-01-01") + + def test_with_custom_pattern(self): + """Test with custom timestamp pattern.""" + strategy = PartitionValuesTimeExpireStrategy( + partition_keys=["year", "month", "day"], + timestamp_pattern="$year-$month-$day", + ) + entries = [ + PartitionEntry(spec={"year": "2024", "month": "01", "day": "01"}), + PartitionEntry(spec={"year": "2024", "month": "06", "day": "15"}), + ] + expiration_time = datetime(2024, 3, 1) + expired = strategy.select_expired_partitions(entries, expiration_time) + self.assertEqual(len(expired), 1) + + +class TestPartitionUpdateTimeExpireStrategy(unittest.TestCase): + """Tests for PartitionUpdateTimeExpireStrategy.""" + + def test_select_expired_by_update_time(self): + """Partitions with old file creation times should be selected.""" + strategy = PartitionUpdateTimeExpireStrategy( + partition_keys=["dt"], + ) + now = datetime(2024, 6, 1) + old_time_millis = int((now - timedelta(days=30)).timestamp() * 1000) + recent_time_millis = int((now - timedelta(days=1)).timestamp() * 1000) + + entries = [ + PartitionEntry(spec={"dt": "2024-05-01"}, last_file_creation_time=old_time_millis), + PartitionEntry(spec={"dt": "2024-05-30"}, last_file_creation_time=recent_time_millis), + ] + + # Expire partitions older than 7 days from now + expiration_time = now - timedelta(days=7) + expired = strategy.select_expired_partitions(entries, expiration_time) + self.assertEqual(len(expired), 1) + self.assertEqual(expired[0].spec["dt"], "2024-05-01") + + def test_no_expired_when_all_recent(self): + """No partitions expired when all have recent file creation times.""" + strategy = PartitionUpdateTimeExpireStrategy( + partition_keys=["dt"], + ) + now = datetime(2024, 6, 1) + recent_time = int((now - timedelta(hours=1)).timestamp() * 1000) + + entries = [ + PartitionEntry(spec={"dt": "2024-05-31"}, last_file_creation_time=recent_time), + ] + expiration_time = now - timedelta(days=7) + expired = strategy.select_expired_partitions(entries, expiration_time) + self.assertEqual(len(expired), 0) + + +class TestPartitionExpire(unittest.TestCase): + """Tests for PartitionExpire orchestration class.""" + + def test_expire_drops_old_partitions(self): + """Test that expired partitions are dropped.""" + entries = [ + PartitionEntry(spec={"dt": "2024-01-01"}, record_count=100), + PartitionEntry(spec={"dt": "2024-01-10"}, record_count=200), + PartitionEntry(spec={"dt": "2024-06-01"}, record_count=300), + ] + dropped = [] + + def mock_dropper(partitions, commit_id): + dropped.extend(partitions) + + strategy = PartitionValuesTimeExpireStrategy(partition_keys=["dt"]) + expire = PartitionExpire( + expiration_time=timedelta(days=30), + strategy=strategy, + partition_reader=lambda: entries, + partition_dropper=mock_dropper, + ) + + now = datetime(2024, 6, 15) + result = expire.expire(now=now) + + # Partitions before 2024-05-16 should be expired + self.assertEqual(len(result), 2) + self.assertIn({"dt": "2024-01-01"}, result) + self.assertIn({"dt": "2024-01-10"}, result) + self.assertEqual(len(dropped), 2) + + def test_expire_respects_check_interval(self): + """Test that check interval prevents too-frequent expiration checks.""" + entries = [ + PartitionEntry(spec={"dt": "2024-01-01"}, record_count=100), + ] + dropped = [] + + def mock_dropper(partitions, commit_id): + dropped.extend(partitions) + + strategy = PartitionValuesTimeExpireStrategy(partition_keys=["dt"]) + expire = PartitionExpire( + expiration_time=timedelta(days=7), + strategy=strategy, + partition_reader=lambda: entries, + partition_dropper=mock_dropper, + check_interval=timedelta(hours=1), + ) + + now = datetime(2024, 6, 15, 10, 0, 0) + result1 = expire.expire(now=now) + self.assertEqual(len(result1), 1) + + # Second call within check interval should return empty + result2 = expire.expire(now=now + timedelta(minutes=30)) + self.assertEqual(len(result2), 0) + + # After interval passes, should expire again + # (but entries already dropped, so reader returns same) + result3 = expire.expire(now=now + timedelta(hours=2)) + self.assertEqual(len(result3), 1) + + def test_expire_max_num(self): + """Test that max_expire_num limits the number of dropped partitions.""" + entries = [ + PartitionEntry(spec={"dt": f"2024-01-{i:02d}"}, record_count=100) + for i in range(1, 11) # 10 partitions + ] + dropped = [] + + def mock_dropper(partitions, commit_id): + dropped.extend(partitions) + + strategy = PartitionValuesTimeExpireStrategy(partition_keys=["dt"]) + expire = PartitionExpire( + expiration_time=timedelta(days=7), + strategy=strategy, + partition_reader=lambda: entries, + partition_dropper=mock_dropper, + max_expire_num=3, + ) + + now = datetime(2024, 6, 15) + result = expire.expire(now=now) + self.assertEqual(len(result), 3) + self.assertEqual(len(dropped), 3) + + def test_expire_empty_partitions(self): + """Test that no error occurs when there are no partitions.""" + strategy = PartitionValuesTimeExpireStrategy(partition_keys=["dt"]) + expire = PartitionExpire( + expiration_time=timedelta(days=7), + strategy=strategy, + partition_reader=lambda: [], + partition_dropper=lambda p, c: None, + ) + result = expire.expire() + self.assertEqual(result, []) + + def test_expire_none_when_not_configured(self): + """Test that from_table returns None when expiration is not configured.""" + mock_table = MagicMock() + mock_table.table_schema.options = {} + mock_table.partition_keys = ["dt"] + + result = PartitionExpire.from_table(mock_table) + self.assertIsNone(result) + + def test_expire_none_for_non_partitioned_table(self): + """Test that from_table returns None for non-partitioned tables.""" + mock_table = MagicMock() + mock_table.table_schema.options = {"partition.expiration-time": "7d"} + mock_table.partition_keys = [] + + result = PartitionExpire.from_table(mock_table) + self.assertIsNone(result) + + +class TestParseDuration(unittest.TestCase): + """Tests for _parse_duration utility.""" + + def test_parse_days(self): + self.assertEqual(_parse_duration("7d"), timedelta(days=7)) + self.assertEqual(_parse_duration("7 days"), timedelta(days=7)) + + def test_parse_hours(self): + self.assertEqual(_parse_duration("24h"), timedelta(hours=24)) + self.assertEqual(_parse_duration("2 hours"), timedelta(hours=2)) + + def test_parse_minutes(self): + self.assertEqual(_parse_duration("30m"), timedelta(minutes=30)) + self.assertEqual(_parse_duration("30 min"), timedelta(minutes=30)) + + def test_parse_seconds(self): + self.assertEqual(_parse_duration("60s"), timedelta(seconds=60)) + + def test_parse_milliseconds(self): + self.assertEqual(_parse_duration("500ms"), timedelta(milliseconds=500)) + + def test_parse_combined(self): + self.assertEqual(_parse_duration("1d 2h"), timedelta(days=1, hours=2)) + + def test_parse_empty_returns_none(self): + self.assertIsNone(_parse_duration("")) + self.assertIsNone(_parse_duration(None)) + + def test_parse_invalid_returns_none(self): + self.assertIsNone(_parse_duration("not-a-duration")) + + +if __name__ == "__main__": + unittest.main()