Skip to content

Commit 570ecbe

Browse files
committed
fix: raise on unresolvable snapshots and fix _starting_snapshot_id propagation
Address review feedback on commit retry validation: - _validate_concurrency() now raises ValidationException when parent_snapshot_id or starting_snapshot_id is non-null but cannot be resolved, instead of silently skipping validation. - Fix _starting_snapshot_id not being propagated from _DeleteFiles to _OverwriteFiles in Transaction.delete(), which caused the conflict detection window to collapse when both producers are involved. - Move ValidationException import to module level per project convention. - Add commit retry and isolation level properties to configuration docs. Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
1 parent a47b47e commit 570ecbe

4 files changed

Lines changed: 113 additions & 4 deletions

File tree

mkdocs/docs/configuration.md

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,26 @@ Iceberg tables support table properties to configure table behavior.
8585

8686
<!-- prettier-ignore-end -->
8787

88+
### Commit retry options
89+
90+
When a concurrent commit is detected, PyIceberg automatically retries the operation with exponential backoff. If the retry detects a real data conflict (e.g. concurrent deletes on the same partition), it raises `ValidationException` instead of retrying.
91+
92+
| Key | Options | Default | Description |
93+
| -------------------------------- | ---------------- | --------- | ------------------------------------------------------------------ |
94+
| `commit.retry.num-retries` | Integer | 4 | Maximum number of retry attempts after a commit conflict |
95+
| `commit.retry.min-wait-ms` | Integer (ms) | 100 | Minimum wait time before the first retry |
96+
| `commit.retry.max-wait-ms` | Integer (ms) | 60000 | Maximum wait time between retries (caps exponential backoff) |
97+
| `commit.retry.total-timeout-ms` | Integer (ms) | 1800000 | Total time allowed for all retry attempts before giving up |
98+
99+
### Isolation level options
100+
101+
These properties control conflict detection behavior during concurrent writes.
102+
103+
| Key | Options | Default | Description |
104+
| -------------------------------- | -------------------------------- | ------------ | ----------------------------------------------------------------------------------------------- |
105+
| `write.delete.isolation-level` | `{serializable,snapshot}` | serializable | Isolation level for delete operations. Under `serializable`, concurrent appends to affected partitions cause `ValidationException`. Under `snapshot`, only conflicting deletes are rejected. |
106+
| `write.update.isolation-level` | `{serializable,snapshot}` | serializable | Isolation level for overwrite operations. Same semantics as `write.delete.isolation-level`. |
107+
88108
## FileIO
89109

90110
Iceberg works with the concept of a FileIO which is a pluggable module for reading, writing, and deleting files. By default, PyIceberg will try to initialize the FileIO that's suitable for the scheme (`s3://`, `gs://`, etc.) and will use the first one that's installed.

pyiceberg/table/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -816,6 +816,7 @@ def delete(
816816
).overwrite() as overwrite_snapshot:
817817
if _isolation_level_property is not None:
818818
overwrite_snapshot._isolation_level_property = _isolation_level_property
819+
overwrite_snapshot._starting_snapshot_id = delete_snapshot._starting_snapshot_id
819820
overwrite_snapshot.commit_uuid = commit_uuid
820821
overwrite_snapshot.delete_by_predicate(delete_filter, case_sensitive)
821822
for original_data_file, replaced_data_files in replaced_files:

pyiceberg/table/update/snapshot.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
from typing import TYPE_CHECKING, Generic
2828

2929
from pyiceberg.avro.codecs import AvroCompressionCodec
30+
from pyiceberg.exceptions import ValidationException
3031
from pyiceberg.expressions import AlwaysFalse, BooleanExpression, Or
3132
from pyiceberg.expressions.visitors import (
3233
ROWS_MIGHT_NOT_MATCH,
@@ -579,12 +580,12 @@ def _validate_concurrency(self) -> None:
579580
table = self._transaction._table
580581
parent_snapshot = table.metadata.snapshot_by_id(self._parent_snapshot_id)
581582
if parent_snapshot is None:
582-
return
583+
raise ValidationException(f"Cannot find parent snapshot {self._parent_snapshot_id} in table metadata")
583584

584585
starting_snapshot_id = self._starting_snapshot_id if self._starting_snapshot_id is not None else self._parent_snapshot_id
585586
starting_snapshot = table.metadata.snapshot_by_id(starting_snapshot_id)
586587
if starting_snapshot is None:
587-
return
588+
raise ValidationException(f"Cannot find starting snapshot {starting_snapshot_id} in table metadata")
588589

589590
isolation_level_str = table.metadata.properties.get(
590591
self._isolation_level_property, TableProperties.WRITE_ISOLATION_LEVEL_DEFAULT
@@ -797,12 +798,12 @@ def _validate_concurrency(self) -> None:
797798
table = self._transaction._table
798799
parent_snapshot = table.metadata.snapshot_by_id(self._parent_snapshot_id)
799800
if parent_snapshot is None:
800-
return
801+
raise ValidationException(f"Cannot find parent snapshot {self._parent_snapshot_id} in table metadata")
801802

802803
starting_snapshot_id = self._starting_snapshot_id if self._starting_snapshot_id is not None else self._parent_snapshot_id
803804
starting_snapshot = table.metadata.snapshot_by_id(starting_snapshot_id)
804805
if starting_snapshot is None:
805-
return
806+
raise ValidationException(f"Cannot find starting snapshot {starting_snapshot_id} in table metadata")
806807

807808
isolation_level_str = table.metadata.properties.get(
808809
self._isolation_level_property, TableProperties.WRITE_ISOLATION_LEVEL_DEFAULT

tests/table/test_commit_retry.py

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -595,3 +595,90 @@ def capturing_clean_all(self_producer: Any) -> None:
595595
for producer in captured_producers:
596596
assert producer._written_manifests == []
597597
assert producer._uncommitted_manifests == []
598+
599+
600+
def test_mixed_delete_overwrite_starts_from_catalog_snapshot(catalog: Catalog) -> None:
601+
"""Mixed full-file and partial deletes should validate from the original table snapshot."""
602+
from pyiceberg.partitioning import PartitionField, PartitionSpec
603+
from pyiceberg.table.update.snapshot import _DeleteFiles, _OverwriteFiles
604+
from pyiceberg.transforms import IdentityTransform
605+
606+
catalog.create_namespace("default")
607+
schema = Schema(
608+
NestedField(1, "category", StringType(), required=False),
609+
NestedField(2, "value", LongType(), required=False),
610+
)
611+
spec = PartitionSpec(PartitionField(source_id=1, field_id=1000, transform=IdentityTransform(), name="category"))
612+
table = catalog.create_table("default.mixed_delete_start_snapshot", schema=schema, partition_spec=spec)
613+
614+
import pyarrow as pa
615+
616+
# Partition "a" has one row (will be fully deleted) and partition "b" has two rows (partial delete)
617+
table.append(pa.table({"category": ["a", "b", "b"], "value": [1, 2, 3]}))
618+
619+
base_snapshot_id = table.metadata.current_snapshot_id
620+
621+
tx = Transaction(table, autocommit=False)
622+
# "value == 1" deletes the entire file in partition "a" (full-file delete)
623+
# "value == 2" partially deletes from partition "b" (CoW rewrite)
624+
tx.delete("value <= 2")
625+
626+
assert len(tx._snapshot_producers) == 2
627+
628+
delete_producer = next(p for p in tx._snapshot_producers if isinstance(p, _DeleteFiles))
629+
overwrite_producer = next(p for p in tx._snapshot_producers if isinstance(p, _OverwriteFiles))
630+
631+
assert delete_producer._starting_snapshot_id == base_snapshot_id
632+
assert overwrite_producer._starting_snapshot_id == base_snapshot_id
633+
634+
635+
def test_validate_concurrency_raises_on_missing_parent_snapshot(catalog: Catalog) -> None:
636+
"""Validation should raise when parent_snapshot_id is non-null but cannot be resolved."""
637+
catalog.create_namespace("default")
638+
schema = _test_schema()
639+
table = catalog.create_table("default.missing_parent_test", schema=schema)
640+
641+
import pyarrow as pa
642+
643+
table.append(pa.table({"x": [1, 2, 3]}))
644+
645+
from pyiceberg.table.update.snapshot import _DeleteFiles
646+
647+
tx = Transaction(table, autocommit=False)
648+
producer = _DeleteFiles(
649+
operation=Operation.DELETE,
650+
transaction=tx,
651+
io=table.io,
652+
)
653+
654+
# Artificially set a non-existent parent snapshot ID
655+
producer._parent_snapshot_id = 99999999
656+
657+
with pytest.raises(ValidationException, match="Cannot find parent snapshot"):
658+
producer._validate_concurrency()
659+
660+
661+
def test_validate_concurrency_raises_on_missing_starting_snapshot(catalog: Catalog) -> None:
662+
"""Validation should raise when starting_snapshot_id is non-null but cannot be resolved."""
663+
catalog.create_namespace("default")
664+
schema = _test_schema()
665+
table = catalog.create_table("default.missing_starting_test", schema=schema)
666+
667+
import pyarrow as pa
668+
669+
table.append(pa.table({"x": [1, 2, 3]}))
670+
671+
from pyiceberg.table.update.snapshot import _DeleteFiles
672+
673+
tx = Transaction(table, autocommit=False)
674+
producer = _DeleteFiles(
675+
operation=Operation.DELETE,
676+
transaction=tx,
677+
io=table.io,
678+
)
679+
680+
# parent is valid but starting is corrupted
681+
producer._starting_snapshot_id = 99999999
682+
683+
with pytest.raises(ValidationException, match="Cannot find starting snapshot"):
684+
producer._validate_concurrency()

0 commit comments

Comments
 (0)