Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 103 additions & 0 deletions paimon-python/pypaimon/cli/cli_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -724,6 +724,68 @@ def cmd_table_list_partitions(args):
sys.exit(1)


def cmd_table_expire_partitions(args):
"""
Execute the 'table expire-partitions' command.

Expires (drops) partitions based on time strategies.

Args:
args: Parsed command line arguments.
"""
from pypaimon.cli.cli import load_catalog_config, create_catalog
from pypaimon.table.file_store_table import FileStoreTable

config_path = args.config
config = load_catalog_config(config_path)
catalog = create_catalog(config)

table_identifier = args.table
parts = table_identifier.split('.')
if len(parts) != 2:
print(f"Error: Invalid table identifier '{table_identifier}'. "
f"Expected format: 'database.table'", file=sys.stderr)
sys.exit(1)

database_name, table_name = parts

try:
table = catalog.get_table(f"{database_name}.{table_name}")
except Exception as e:
print(f"Error: Failed to get table '{table_identifier}': {e}", file=sys.stderr)
sys.exit(1)

if not isinstance(table, FileStoreTable):
print(f"Error: Table '{table_identifier}' is not a FileStoreTable.", file=sys.stderr)
sys.exit(1)

# Build override options from CLI args
override_options = {}
if args.expiration_time:
override_options["partition.expiration-time"] = args.expiration_time
if args.strategy:
override_options["partition.expiration-strategy"] = args.strategy
if args.timestamp_pattern:
override_options["partition.timestamp-pattern"] = args.timestamp_pattern
if args.timestamp_formatter:
override_options["partition.timestamp-formatter"] = args.timestamp_formatter
if args.max_expires:
override_options["partition.expiration-max-num"] = str(args.max_expires)

try:
expired = table.expire_partitions(options=override_options if override_options else None)
if expired:
partition_desc = "; ".join(
",".join(f"{k}={v}" for k, v in p.items()) for p in expired
)
print(f"Expired {len(expired)} partition(s): [{partition_desc}]")
else:
print("No partitions expired.")
except Exception as e:
print(f"Error: Failed to expire partitions: {e}", file=sys.stderr)
sys.exit(1)


def cmd_table_drop_partition(args):
"""
Execute the 'table drop-partition' command.
Expand Down Expand Up @@ -1033,3 +1095,44 @@ def add_table_subcommands(table_parser):
update_comment_parser = alter_subparsers.add_parser('update-comment', help='Update table comment')
update_comment_parser.add_argument('--comment', '-c', required=True, help='New table comment')
update_comment_parser.set_defaults(func=cmd_table_alter)

# table expire-partitions command
expire_parser = table_subparsers.add_parser(
'expire-partitions', help='Expire (drop) partitions based on time strategies')
expire_parser.add_argument(
'table',
help='Table identifier in format: database.table'
)
expire_parser.add_argument(
'--expiration-time', '-e',
type=str,
default=None,
help=('How old a partition must be before expiring (e.g., "7d", "24h"). '
'Overrides the table option partition.expiration-time.')
)
expire_parser.add_argument(
'--strategy', '-s',
type=str,
choices=['values-time', 'update-time'],
default=None,
help='Expiration strategy: values-time (parse partition values) or update-time (use file creation time)'
)
expire_parser.add_argument(
'--timestamp-pattern',
type=str,
default=None,
help='Pattern to extract timestamp from partition values (e.g., "$dt $hour:00:00")'
)
expire_parser.add_argument(
'--timestamp-formatter',
type=str,
default=None,
help='Java DateTimeFormatter pattern for parsing partition timestamps (e.g., "yyyyMMdd", "yyyy-MM-dd HH:mm:ss")'
)
expire_parser.add_argument(
'--max-expires',
type=int,
default=None,
help='Maximum number of partitions to expire in one call'
)
expire_parser.set_defaults(func=cmd_table_expire_partitions)
51 changes: 51 additions & 0 deletions paimon-python/pypaimon/common/options/core_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,57 @@ class CoreOptions:
)
)

# Partition expiration options
PARTITION_EXPIRATION_TIME: ConfigOption[timedelta] = (
ConfigOptions.key("partition.expiration-time")
.duration_type()
.no_default_value()
.with_description(
"The expiration interval of a partition. A partition will be expired if "
"it's lifetime is over this value. Partition time is extracted from the "
"partition value."
)
)

PARTITION_EXPIRATION_CHECK_INTERVAL: ConfigOption[timedelta] = (
ConfigOptions.key("partition.expiration-check-interval")
.duration_type()
.no_default_value()
.with_description(
"The check interval of partition expiration."
)
)

PARTITION_EXPIRATION_STRATEGY: ConfigOption[str] = (
ConfigOptions.key("partition.expiration-strategy")
.string_type()
.default_value("values-time")
.with_description(
"The expiration strategy of a partition. 'values-time' extracts time from "
"partition values; 'update-time' uses the last file creation time."
)
)

PARTITION_TIMESTAMP_PATTERN: ConfigOption[str] = (
ConfigOptions.key("partition.timestamp-pattern")
.string_type()
.no_default_value()
.with_description(
"Specify a pattern to get a timestamp from partitions. The pattern is "
"built from partition field placeholders (e.g., '$dt $hour:00:00')."
)
)

PARTITION_TIMESTAMP_FORMATTER: ConfigOption[str] = (
ConfigOptions.key("partition.timestamp-formatter")
.string_type()
.no_default_value()
.with_description(
"The formatter to parse the timestamp string to a datetime. "
"Uses Python strptime format (e.g., '%Y-%m-%d')."
)
)

def __init__(self, options: Options):
self.options = options

Expand Down
32 changes: 32 additions & 0 deletions paimon-python/pypaimon/partition/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from pypaimon.partition.partition_time_extractor import PartitionTimeExtractor
from pypaimon.partition.partition_expire_strategy import (
PartitionExpireStrategy,
PartitionValuesTimeExpireStrategy,
PartitionUpdateTimeExpireStrategy,
)
from pypaimon.partition.partition_expire import PartitionExpire

__all__ = [
"PartitionTimeExtractor",
"PartitionExpireStrategy",
"PartitionValuesTimeExpireStrategy",
"PartitionUpdateTimeExpireStrategy",
"PartitionExpire",
]
Loading
Loading