Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions pyiceberg/table/maintenance.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,26 @@ def expire_snapshots(self) -> ExpireSnapshots:
from pyiceberg.table.update.snapshot import ExpireSnapshots

return ExpireSnapshots(transaction=Transaction(self.tbl, autocommit=True))

def compact(self) -> None:
"""Compact the table's data files by reading and overwriting the entire table.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be data and delete files. but generally it compacts the entire table

Copy link
Copy Markdown
Author

@qzyu999 qzyu999 Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @kevinjqliu, made the update to the docstring here: 9fd51a8.


Note: This is a full-table compaction that leverages Arrow for binpacking.
It currently reads the entire table into memory via `.to_arrow()`.

This reads all existing data into memory and writes it back out using the
target file size settings (write.target-file-size-bytes), atomically
dropping the old files and replacing them with fewer, larger files.
"""
# Read the current table state into memory
arrow_table = self.tbl.scan().to_arrow()

Comment on lines +56 to +59
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this going to be a problem to load the entire table in memory? as the tables are quite big. Is there a way to just do this per partition or some other way that wouldn't cause memory issues? for example perform compaction on last months data

# Guard: if the table is completely empty, there's nothing to compact.
# Doing an overwrite with an empty table would result in deleting everything.
if arrow_table.num_rows == 0:
logger.info("Table contains no rows, skipping compaction.")
return

# Overwrite the table atomically (REPLACE operation)
with self.tbl.transaction() as txn:
txn.overwrite(arrow_table, snapshot_properties={"snapshot-type": "replace", "replace-operation": "compaction"})
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think we should have a replace operation instead
https://iceberg.apache.org/javadoc/latest/org/apache/iceberg/DataOperations.html#REPLACE

we might want to create the .replace() first

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @kevinjqliu, thanks for the insight, I agree with what you're saying in terms of building a replace rather than just reusing the overwrite. I've refactored the compaction run to properly use a .replace() API, following the design of the Java Iceberg implementation.

The approach is to create a new _RewriteFiles in pyiceberg/table/update/snapshot.py, which utilizes the new Operation.REPLACE from pyiceberg/table/update/snapshots.py. The _RewriteFiles utilizes the replace(), which effectively mimics the _OverwriteFiles operation, with the exception that it uses Operation.REPLACE instead of Operation.OVERWRITE. This allows MaintenanceTable.compact() to do a proper txn.replace() rather than reuse txn.overwrite().

I also think it's worth noting that by adding Operation.REPLACE, we make room for the needed rewrite manifests (#270) and delete orphan files functionality (#1200).

106 changes: 106 additions & 0 deletions tests/table/test_maintenance.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import random

import pyarrow as pa

from pyiceberg.catalog import Catalog
from pyiceberg.exceptions import NoSuchNamespaceError
from pyiceberg.partitioning import PartitionField, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.transforms import IdentityTransform


def test_maintenance_compact(catalog: Catalog) -> None:
# Setup Schema and specs
from pyiceberg.types import LongType, NestedField, StringType

schema = Schema(
NestedField(1, "id", LongType()),
NestedField(2, "category", StringType()),
NestedField(3, "value", LongType()),
)
spec = PartitionSpec(PartitionField(source_id=2, field_id=1000, transform=IdentityTransform(), name="category"))

# Create the namespace and table
try:
catalog.create_namespace("default")
except NoSuchNamespaceError:
pass
table = catalog.create_table(
"default.test_compaction",
schema=schema,
partition_spec=spec,
)

# Append many small data files
categories = ["cat1", "cat2", "cat3"]
for i in range(12):
table.append(
pa.table(
{
"id": list(range(i * 10, (i + 1) * 10)),
"category": [categories[i % 3]] * 10,
"value": [random.randint(1, 100) for _ in range(10)],
}
)
)

# Verify state before compaction
before_files = list(table.scan().plan_files())
assert len(before_files) == 12
assert table.scan().to_arrow().num_rows == 120

# Execute Compaction
table.maintenance.compact()

# Verify state after compaction
table.refresh()
after_files = list(table.scan().plan_files())
assert len(after_files) == 3 # Should be 1 optimized data file per partition
assert table.scan().to_arrow().num_rows == 120

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since its a small result set, we should verify the data is the same too

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @kevinjqliu, made a change in 6420027 to check that the columns and the primary keys remain the same before/after compaction.

# Ensure snapshot properties specify the replace-operation
new_snapshot = table.current_snapshot()
assert new_snapshot is not None
assert new_snapshot.summary is not None
assert new_snapshot.summary.get("snapshot-type") == "replace"
assert new_snapshot.summary.get("replace-operation") == "compaction"


def test_maintenance_compact_empty_table(catalog: Catalog) -> None:
from pyiceberg.types import LongType, NestedField, StringType

schema = Schema(
NestedField(1, "id", LongType()),
NestedField(2, "category", StringType()),
)

try:
catalog.create_namespace("default")
except NoSuchNamespaceError:
pass

table = catalog.create_table("default.test_compaction_empty", schema=schema)
before_snapshots = len(table.history())

# Should safely return doing nothing
table.maintenance.compact()

table.refresh()
after_snapshots = len(table.history())
assert before_snapshots == after_snapshots # No new snapshot should be made