From cc0c3acd34f52027aa3de86eca1a5b4602465709 Mon Sep 17 00:00:00 2001 From: kanthi subramanian Date: Sun, 8 Mar 2026 19:38:33 -0500 Subject: [PATCH 1/7] Add support for writing to branches(snapshot producer) --- cmd/iceberg/main.go | 72 ++++++++++++------ cmd/iceberg/output.go | 36 +++++++-- table/metadata.go | 14 ++++ table/metadata_builder_internal_test.go | 30 ++++++++ table/snapshot_producers.go | 41 ++++++---- table/snapshot_producers_test.go | 47 ++++++++++-- table/transaction.go | 99 +++++++++++++++++++------ table/transaction_test.go | 88 ++++++++++++++++++++++ 8 files changed, 358 insertions(+), 69 deletions(-) diff --git a/cmd/iceberg/main.go b/cmd/iceberg/main.go index 128385914..390fa02a4 100644 --- a/cmd/iceberg/main.go +++ b/cmd/iceberg/main.go @@ -23,6 +23,7 @@ import ( "fmt" "log" "os" + "strconv" "strings" "github.com/apache/iceberg-go" @@ -46,6 +47,7 @@ Usage: iceberg create [options] (namespace | table) IDENTIFIER iceberg drop [options] (namespace | table) IDENTIFIER iceberg files [options] TABLE_ID [--history] + iceberg add-files [options] TABLE_ID FILES... [--branch TEXT] [--ignore-duplicates] iceberg rename [options] iceberg properties [options] get (namespace | table) IDENTIFIER [PROPNAME] iceberg properties [options] set (namespace | table) IDENTIFIER PROPNAME VALUE @@ -62,6 +64,7 @@ Commands: location Return the location of the table. drop Operations to drop a namespace or table. files List all the files of the table. + add-files Add existing data files to a table. rename Rename a table. properties Properties on tables/namespaces. @@ -69,6 +72,7 @@ Arguments: PARENT Catalog parent namespace IDENTIFIER fully qualified namespace or table TABLE_ID full path to a table + FILES one or more data file paths to add (for add-files) PROPNAME name of a property VALUE value to set @@ -91,7 +95,9 @@ Options: --partition-spec TEXT specify partition spec as comma-separated field names(for create table use only) Ex:"field1,field2" --sort-order TEXT specify sort order as field:direction[:null-order] format(for create table use only) - Ex:"field1:asc,field2:desc:nulls-first,field3:asc:nulls-last"` + Ex:"field1:asc,field2:desc:nulls-first,field3:asc:nulls-last" + --branch TEXT target branch for add-files [default: main] + --ignore-duplicates allow adding files already referenced by the table` type Config struct { List bool `docopt:"list"` @@ -104,6 +110,7 @@ type Config struct { Create bool `docopt:"create"` Drop bool `docopt:"drop"` Files bool `docopt:"files"` + AddFiles bool `docopt:"add-files"` Rename bool `docopt:"rename"` Get bool `docopt:"get"` @@ -116,27 +123,30 @@ type Config struct { RenameFrom string `docopt:""` RenameTo string `docopt:""` - Parent string `docopt:"PARENT"` - Ident string `docopt:"IDENTIFIER"` - TableID string `docopt:"TABLE_ID"` - PropName string `docopt:"PROPNAME"` - Value string `docopt:"VALUE"` - - Catalog string `docopt:"--catalog"` - URI string `docopt:"--uri"` - Output string `docopt:"--output"` - History bool `docopt:"--history"` - Cred string `docopt:"--credential"` - Token string `docopt:"--token"` - Warehouse string `docopt:"--warehouse"` - Config string `docopt:"--config"` - Scope string `docopt:"--scope"` - Description string `docopt:"--description"` - LocationURI string `docopt:"--location-uri"` - SchemaStr string `docopt:"--schema"` - TableProps string `docopt:"--properties"` - PartitionSpec string `docopt:"--partition-spec"` - SortOrder string `docopt:"--sort-order"` + Parent string `docopt:"PARENT"` + Ident string `docopt:"IDENTIFIER"` + TableID string `docopt:"TABLE_ID"` + FilesToAdd []string `docopt:"FILES"` + PropName string `docopt:"PROPNAME"` + Value string `docopt:"VALUE"` + + Catalog string `docopt:"--catalog"` + URI string `docopt:"--uri"` + Output string `docopt:"--output"` + History bool `docopt:"--history"` + Cred string `docopt:"--credential"` + Token string `docopt:"--token"` + Warehouse string `docopt:"--warehouse"` + Config string `docopt:"--config"` + Scope string `docopt:"--scope"` + Description string `docopt:"--description"` + LocationURI string `docopt:"--location-uri"` + SchemaStr string `docopt:"--schema"` + TableProps string `docopt:"--properties"` + PartitionSpec string `docopt:"--partition-spec"` + SortOrder string `docopt:"--sort-order"` + Branch string `docopt:"--branch"` + IgnoreDuplicates bool `docopt:"--ignore-duplicates"` } func main() { @@ -342,6 +352,24 @@ func main() { case cfg.Files: tbl := loadTable(ctx, output, cat, cfg.TableID) output.Files(tbl, cfg.History) + case cfg.AddFiles: + tbl := loadTable(ctx, output, cat, cfg.TableID) + txn := tbl.NewTransaction() + var opts []table.WriteOpt + if cfg.Branch != "" { + opts = append(opts, table.WithBranch(cfg.Branch)) + } + err := txn.AddFiles(ctx, cfg.FilesToAdd, nil, cfg.IgnoreDuplicates, opts...) + if err != nil { + output.Error(err) + os.Exit(1) + } + _, err = txn.Commit(ctx) + if err != nil { + output.Error(err) + os.Exit(1) + } + output.Text("Added " + strconv.Itoa(len(cfg.FilesToAdd)) + " file(s) to " + cfg.TableID) } } diff --git a/cmd/iceberg/output.go b/cmd/iceberg/output.go index 695ba28d9..62d1dd55d 100644 --- a/cmd/iceberg/output.go +++ b/cmd/iceberg/output.go @@ -106,6 +106,23 @@ func (t textOutput) DescribeTable(tbl *table.Table) { WithData(pterm.TableData{ {"Current Snapshot", snap}, }).Render() + + refsData := pterm.TableData{{"Name", "Type", "Snapshot ID"}} + for name, ref := range tbl.Metadata().Refs() { + refsData = append(refsData, []string{ + name, + string(ref.SnapshotRefType), + strconv.FormatInt(ref.SnapshotID, 10), + }) + } + if len(refsData) > 1 { + pterm.Println("Refs") + pterm.DefaultTable. + WithHasHeader(true). + WithHeaderRowSeparator("-"). + WithData(refsData).Render() + } + pterm.DefaultTree.WithRoot(snapshotTreeNode).Render() pterm.Println("Properties") propTable.Render() @@ -241,12 +258,18 @@ func (j jsonOutput) Identifiers(idList []table.Identifier) { func (j jsonOutput) DescribeTable(tbl *table.Table) { type dataType struct { - Metadata table.Metadata `json:"metadata,omitempty"` - MetadataLocation string `json:"metadata-location,omitempty"` - SortOrder table.SortOrder `json:"sort-order,omitempty"` - CurrentSnapshot *table.Snapshot `json:"current-snapshot,omitempty"` - Spec iceberg.PartitionSpec `json:"spec,omitempty"` - Schema *iceberg.Schema `json:"schema,omitempty"` + Metadata table.Metadata `json:"metadata,omitempty"` + MetadataLocation string `json:"metadata-location,omitempty"` + SortOrder table.SortOrder `json:"sort-order,omitempty"` + CurrentSnapshot *table.Snapshot `json:"current-snapshot,omitempty"` + Spec iceberg.PartitionSpec `json:"spec,omitempty"` + Schema *iceberg.Schema `json:"schema,omitempty"` + Refs map[string]table.SnapshotRef `json:"refs,omitempty"` + } + + refs := make(map[string]table.SnapshotRef) + for name, ref := range tbl.Metadata().Refs() { + refs[name] = ref } data := dataType{ @@ -256,6 +279,7 @@ func (j jsonOutput) DescribeTable(tbl *table.Table) { CurrentSnapshot: tbl.CurrentSnapshot(), Spec: tbl.Spec(), Schema: tbl.Schema(), + Refs: refs, } if err := json.NewEncoder(os.Stdout).Encode(data); err != nil { j.Error(err) diff --git a/table/metadata.go b/table/metadata.go index 92b976cae..a27619251 100644 --- a/table/metadata.go +++ b/table/metadata.go @@ -317,6 +317,20 @@ func (b *MetadataBuilder) currentSnapshot() *Snapshot { return s } +// SnapshotIDForRef returns the snapshot ID for the given ref (branch or tag name). +// For MainBranch it returns currentSnapshotID; for other refs it looks up b.refs. +// Returns nil if the ref does not exist (e.g. a new branch not yet created). +func (b *MetadataBuilder) SnapshotIDForRef(refName string) *int64 { + if refName == MainBranch { + return b.currentSnapshotID + } + if ref, ok := b.refs[refName]; ok { + id := ref.SnapshotID + return &id + } + return nil +} + func (b *MetadataBuilder) AddSchema(schema *iceberg.Schema) error { if err := checkSchemaCompatibility(schema, b.formatVersion); err != nil { return err diff --git a/table/metadata_builder_internal_test.go b/table/metadata_builder_internal_test.go index 6c9fed32e..a845eb949 100644 --- a/table/metadata_builder_internal_test.go +++ b/table/metadata_builder_internal_test.go @@ -570,6 +570,36 @@ func TestSetBranchSnapshotCreatesBranchIfNotExists(t *testing.T) { require.Equal(t, int64(2), builder.updates[1].(*setSnapshotRefUpdate).SnapshotID) } +func TestSnapshotIDForRef(t *testing.T) { + builder := builderWithoutChanges(2) + schemaID := 0 + snapshot := Snapshot{ + SnapshotID: 2, + ParentSnapshotID: nil, + SequenceNumber: 0, + TimestampMs: builder.base.LastUpdatedMillis(), + ManifestList: "/snap-1.avro", + Summary: &Summary{Operation: OpAppend}, + SchemaID: &schemaID, + } + require.NoError(t, builder.AddSnapshot(&snapshot)) + require.NoError(t, builder.SetSnapshotRef(MainBranch, 2, BranchRef)) + require.NoError(t, builder.SetSnapshotRef("feature", 2, BranchRef)) + + // MainBranch returns currentSnapshotID + mainID := builder.SnapshotIDForRef(MainBranch) + require.NotNil(t, mainID) + require.Equal(t, int64(2), *mainID) + + // Other ref returns ref's snapshot ID + featureID := builder.SnapshotIDForRef("feature") + require.NotNil(t, featureID) + require.Equal(t, int64(2), *featureID) + + // Unknown ref returns nil + require.Nil(t, builder.SnapshotIDForRef("nonexistent")) +} + func TestRemoveSnapshotRemovesBranch(t *testing.T) { builder := builderWithoutChanges(2) schemaID := 0 diff --git a/table/snapshot_producers.go b/table/snapshot_producers.go index 8022f5a48..8ca6c1f2e 100644 --- a/table/snapshot_producers.go +++ b/table/snapshot_producers.go @@ -58,8 +58,8 @@ func newManifestListFileName(snapshotID int64, attempt int, commit uuid.UUID) st return fmt.Sprintf("snap-%d-%d-%s.avro", snapshotID, attempt, commit) } -func newFastAppendFilesProducer(op Operation, txn *Transaction, fs iceio.WriteFileIO, commitUUID *uuid.UUID, snapshotProps iceberg.Properties) *snapshotProducer { - prod := createSnapshotProducer(op, txn, fs, commitUUID, snapshotProps) +func newFastAppendFilesProducer(op Operation, txn *Transaction, fs iceio.WriteFileIO, commitUUID *uuid.UUID, snapshotProps iceberg.Properties, branch string) *snapshotProducer { + prod := createSnapshotProducer(op, txn, fs, commitUUID, snapshotProps, branch) prod.producerImpl = &fastAppendFiles{base: prod} return prod @@ -105,8 +105,8 @@ type overwriteFiles struct { base *snapshotProducer } -func newOverwriteFilesProducer(op Operation, txn *Transaction, fs iceio.WriteFileIO, commitUUID *uuid.UUID, snapshotProps iceberg.Properties) *snapshotProducer { - prod := createSnapshotProducer(op, txn, fs, commitUUID, snapshotProps) +func newOverwriteFilesProducer(op Operation, txn *Transaction, fs iceio.WriteFileIO, commitUUID *uuid.UUID, snapshotProps iceberg.Properties, branch string) *snapshotProducer { + prod := createSnapshotProducer(op, txn, fs, commitUUID, snapshotProps, branch) prod.producerImpl = &overwriteFiles{base: prod} return prod @@ -121,8 +121,11 @@ func (of *overwriteFiles) existingManifests() ([]iceberg.ManifestFile, error) { // determine if there are any existing manifest files existingFiles := make([]iceberg.ManifestFile, 0) - snap := of.base.txn.meta.currentSnapshot() - if snap == nil { + if of.base.parentSnapshotID <= 0 { + return existingFiles, nil + } + snap, err := of.base.txn.meta.SnapshotByID(of.base.parentSnapshotID) + if err != nil || snap == nil { return existingFiles, nil } @@ -387,8 +390,8 @@ type mergeAppendFiles struct { mergeEnabled bool } -func newMergeAppendFilesProducer(op Operation, txn *Transaction, fs iceio.WriteFileIO, commitUUID *uuid.UUID, snapshotProps iceberg.Properties) *snapshotProducer { - prod := createSnapshotProducer(op, txn, fs, commitUUID, snapshotProps) +func newMergeAppendFilesProducer(op Operation, txn *Transaction, fs iceio.WriteFileIO, commitUUID *uuid.UUID, snapshotProps iceberg.Properties, branch string) *snapshotProducer { + prod := createSnapshotProducer(op, txn, fs, commitUUID, snapshotProps, branch) prod.producerImpl = &mergeAppendFiles{ fastAppendFiles: fastAppendFiles{base: prod}, targetSizeBytes: txn.meta.props.GetInt(ManifestTargetSizeBytesKey, ManifestTargetSizeBytesDefault), @@ -437,9 +440,10 @@ type snapshotProducer struct { manifestCount atomic.Int32 deletedFiles map[string]iceberg.DataFile snapshotProps iceberg.Properties + branch string } -func createSnapshotProducer(op Operation, txn *Transaction, fs iceio.WriteFileIO, commitUUID *uuid.UUID, snapshotProps iceberg.Properties) *snapshotProducer { +func createSnapshotProducer(op Operation, txn *Transaction, fs iceio.WriteFileIO, commitUUID *uuid.UUID, snapshotProps iceberg.Properties, branch string) *snapshotProducer { var ( commit uuid.UUID parentSnapshot int64 = -1 @@ -451,8 +455,11 @@ func createSnapshotProducer(op Operation, txn *Transaction, fs iceio.WriteFileIO commit = *commitUUID } - if snap := txn.meta.currentSnapshot(); snap != nil { - parentSnapshot = snap.SnapshotID + if branch == "" { + branch = MainBranch + } + if sid := txn.meta.SnapshotIDForRef(branch); sid != nil && *sid > 0 { + parentSnapshot = *sid } return &snapshotProducer{ @@ -465,6 +472,7 @@ func createSnapshotProducer(op Operation, txn *Transaction, fs iceio.WriteFileIO addedFiles: []iceberg.DataFile{}, deletedFiles: make(map[string]iceberg.DataFile), snapshotProps: snapshotProps, + branch: branch, } } @@ -726,10 +734,17 @@ func (sp *snapshotProducer) commit() (_ []Update, _ []Requirement, err error) { TimestampMs: time.Now().UnixMilli(), } + var assertReq Requirement + if sp.parentSnapshotID > 0 { + parentID := sp.parentSnapshotID + assertReq = AssertRefSnapshotID(sp.branch, &parentID) + } else { + assertReq = AssertRefSnapshotID(sp.branch, nil) + } return []Update{ NewAddSnapshotUpdate(&snapshot), - NewSetSnapshotRefUpdate("main", sp.snapshotID, BranchRef, -1, -1, -1), + NewSetSnapshotRefUpdate(sp.branch, sp.snapshotID, BranchRef, -1, -1, -1), }, []Requirement{ - AssertRefSnapshotID("main", sp.txn.meta.currentSnapshotID), + assertReq, }, nil } diff --git a/table/snapshot_producers_test.go b/table/snapshot_producers_test.go index 1c6922f14..4469cbd65 100644 --- a/table/snapshot_producers_test.go +++ b/table/snapshot_producers_test.go @@ -167,7 +167,7 @@ func TestSnapshotProducerManifestsClosesWriterOnError(t *testing.T) { mem := newMemIO(manifestHeaderSize(t, 2, spec, schema), errLimitedWrite) txn := createTestTransaction(t, mem, spec) - sp := newFastAppendFilesProducer(OpAppend, txn, mem, nil, nil) + sp := newFastAppendFilesProducer(OpAppend, txn, mem, nil, nil, MainBranch) validPartition := map[int]any{1000: int32(1)} sp.appendDataFile(newTestDataFile(t, spec, "file://data-1.parquet", validPartition)) sp.appendDataFile(newTestDataFile(t, spec, "file://data-2.parquet", nil)) @@ -182,7 +182,7 @@ func TestManifestMergeManagerClosesWriterOnError(t *testing.T) { mem := newMemIO(manifestHeaderSize(t, 2, spec, schema), errLimitedWrite) txn := createTestTransaction(t, mem, spec) - sp := newFastAppendFilesProducer(OpAppend, txn, mem, nil, nil) + sp := newFastAppendFilesProducer(OpAppend, txn, mem, nil, nil, MainBranch) df := newTestDataFile(t, spec, "file://data-1.parquet", nil) entries := []iceberg.ManifestEntry{ iceberg.NewManifestEntry(iceberg.EntryStatusADDED, &sp.snapshotID, nil, nil, df), @@ -254,7 +254,7 @@ func TestOverwriteFilesExistingManifestsClosesWriterOnError(t *testing.T) { txn.meta.snapshotList = []Snapshot{snap} txn.meta.currentSnapshotID = &snapshotID - sp := newOverwriteFilesProducer(OpOverwrite, txn, mem, nil, nil) + sp := newOverwriteFilesProducer(OpOverwrite, txn, mem, nil, nil, MainBranch) sp.deleteDataFile(deletedFile) _, err = sp.existingManifests() @@ -369,7 +369,7 @@ func TestManifestWriterClosesUnderlyingFile(t *testing.T) { spec := iceberg.NewPartitionSpec() txn := createTestTransaction(t, trackIO, spec) - sp := newFastAppendFilesProducer(OpAppend, txn, trackIO, nil, nil) + sp := newFastAppendFilesProducer(OpAppend, txn, trackIO, nil, nil, MainBranch) df := newTestDataFile(t, spec, "file://data-1.parquet", nil) sp.appendDataFile(df) @@ -389,7 +389,7 @@ func TestCreateManifestClosesUnderlyingFile(t *testing.T) { txn := createTestTransaction(t, trackIO, spec) schema := simpleSchema() - sp := newFastAppendFilesProducer(OpAppend, txn, trackIO, nil, nil) + sp := newFastAppendFilesProducer(OpAppend, txn, trackIO, nil, nil, MainBranch) df := newTestDataFile(t, spec, "file://data-1.parquet", nil) entries := []iceberg.ManifestEntry{ iceberg.NewManifestEntry(iceberg.EntryStatusADDED, &sp.snapshotID, nil, nil, df), @@ -451,7 +451,7 @@ func TestOverwriteExistingManifestsClosesUnderlyingFile(t *testing.T) { txn.meta.snapshotList = []Snapshot{snap} txn.meta.currentSnapshotID = &snapshotID - sp := newOverwriteFilesProducer(OpOverwrite, txn, trackIO, nil, nil) + sp := newOverwriteFilesProducer(OpOverwrite, txn, trackIO, nil, nil, MainBranch) sp.deleteDataFile(deletedFile) trackIO.writers = make(map[string]*trackingWriteCloser) @@ -525,7 +525,7 @@ func TestManifestsClosesWriterWhenDeletedEntriesFails(t *testing.T) { spec := iceberg.NewPartitionSpec() txn := createTestTransaction(t, blockingIO, spec) - sp := createSnapshotProducer(OpAppend, txn, blockingIO, nil, nil) + sp := createSnapshotProducer(OpAppend, txn, blockingIO, nil, nil, MainBranch) errDeletedEntries := errors.New("simulated deletedEntries error") sp.producerImpl = &errorOnDeletedEntries{ base: sp, @@ -556,3 +556,36 @@ func TestManifestsClosesWriterWhenDeletedEntriesFails(t *testing.T) { require.Zero(t, writerCount, "expected no writers to be created when deletedEntries is called first") } } + +// TestCreateSnapshotProducerUsesBranch verifies that createSnapshotProducer sets +// branch and resolves parentSnapshotID from SnapshotIDForRef(branch). +func TestCreateSnapshotProducerUsesBranch(t *testing.T) { + builder := builderWithoutChanges(2) + schemaID := 0 + const snapID int64 = 42 + snapshot := Snapshot{ + SnapshotID: snapID, + ParentSnapshotID: nil, + SequenceNumber: 0, + TimestampMs: builder.base.LastUpdatedMillis(), + ManifestList: "table-location/metadata/snap-1.avro", + Summary: &Summary{Operation: OpAppend}, + SchemaID: &schemaID, + } + require.NoError(t, builder.AddSnapshot(&snapshot)) + require.NoError(t, builder.SetSnapshotRef(MainBranch, snapID, BranchRef)) + require.NoError(t, builder.SetSnapshotRef("feature", snapID, BranchRef)) + + meta, err := builder.Build() + require.NoError(t, err) + + mem := newMemIO(0, nil) + tbl := New(Identifier{"db", "tbl"}, meta, "metadata.json", func(context.Context) (iceio.IO, error) { + return mem, nil + }, nil) + txn := tbl.NewTransaction() + + sp := createSnapshotProducer(OpAppend, txn, mem, nil, nil, "feature") + require.Equal(t, "feature", sp.branch) + require.Equal(t, snapID, sp.parentSnapshotID) +} diff --git a/table/transaction.go b/table/transaction.go index 06b016312..f48779854 100644 --- a/table/transaction.go +++ b/table/transaction.go @@ -36,28 +36,60 @@ import ( "golang.org/x/sync/errgroup" ) +// WriteOpt is an option for write operations (Append, AppendTable, AddFiles, etc.). +type WriteOpt interface { + applyWriteOpt(*writeOpts) +} + +type writeOpts struct { + branch string +} + +func (o *writeOpts) applyWriteOpt(target *writeOpts) { + if o.branch != "" { + target.branch = o.branch + } +} + +// WithBranch sets the target branch for the write. Default is main. +func WithBranch(branch string) WriteOpt { + return &writeOpts{branch: branch} +} + +func resolveBranch(opts []WriteOpt) string { + var o writeOpts + for _, opt := range opts { + opt.applyWriteOpt(&o) + } + if o.branch == "" { + return MainBranch + } + return o.branch +} + type snapshotUpdate struct { txn *Transaction io io.WriteFileIO snapshotProps iceberg.Properties operation Operation + branch string } func (s snapshotUpdate) fastAppend() *snapshotProducer { - return newFastAppendFilesProducer(OpAppend, s.txn, s.io, nil, s.snapshotProps) + return newFastAppendFilesProducer(OpAppend, s.txn, s.io, nil, s.snapshotProps, s.branch) } func (s snapshotUpdate) mergeOverwrite(commitUUID *uuid.UUID) *snapshotProducer { op := s.operation - if s.operation == OpOverwrite && s.txn.meta.currentSnapshot() == nil { + if s.operation == OpOverwrite && s.txn.meta.SnapshotIDForRef(s.branch) == nil { op = OpAppend } - return newOverwriteFilesProducer(op, s.txn, s.io, commitUUID, s.snapshotProps) + return newOverwriteFilesProducer(op, s.txn, s.io, commitUUID, s.snapshotProps, s.branch) } func (s snapshotUpdate) mergeAppend() *snapshotProducer { - return newMergeAppendFilesProducer(OpAppend, s.txn, s.io, nil, s.snapshotProps) + return newMergeAppendFilesProducer(OpAppend, s.txn, s.io, nil, s.snapshotProps, s.branch) } type Transaction struct { @@ -119,9 +151,9 @@ func (t *Transaction) apply(updates []Update, reqs []Requirement) error { return nil } -func (t *Transaction) appendSnapshotProducer(afs io.IO, props iceberg.Properties) *snapshotProducer { +func (t *Transaction) appendSnapshotProducer(afs io.IO, props iceberg.Properties, branch string) *snapshotProducer { manifestMerge := t.meta.props.GetBool(ManifestMergeEnabledKey, ManifestMergeEnabledDefault) - updateSnapshot := t.updateSnapshot(afs, props, OpAppend) + updateSnapshot := t.updateSnapshot(afs, props, OpAppend, branch) if manifestMerge { return updateSnapshot.mergeAppend() } @@ -129,12 +161,16 @@ func (t *Transaction) appendSnapshotProducer(afs io.IO, props iceberg.Properties return updateSnapshot.fastAppend() } -func (t *Transaction) updateSnapshot(fs io.IO, props iceberg.Properties, operation Operation) snapshotUpdate { +func (t *Transaction) updateSnapshot(fs io.IO, props iceberg.Properties, operation Operation, branch string) snapshotUpdate { + if branch == "" { + branch = MainBranch + } return snapshotUpdate{ txn: t, io: fs.(io.WriteFileIO), snapshotProps: props, operation: operation, + branch: branch, } } @@ -301,19 +337,20 @@ func (t *Transaction) ExpireSnapshots(opts ...ExpireSnapshotsOpt) error { return t.apply(updates, reqs) } -func (t *Transaction) AppendTable(ctx context.Context, tbl arrow.Table, batchSize int64, snapshotProps iceberg.Properties) error { +func (t *Transaction) AppendTable(ctx context.Context, tbl arrow.Table, batchSize int64, snapshotProps iceberg.Properties, opts ...WriteOpt) error { rdr := array.NewTableReader(tbl, batchSize) defer rdr.Release() - return t.Append(ctx, rdr, snapshotProps) + return t.Append(ctx, rdr, snapshotProps, opts...) } -func (t *Transaction) Append(ctx context.Context, rdr array.RecordReader, snapshotProps iceberg.Properties) error { +func (t *Transaction) Append(ctx context.Context, rdr array.RecordReader, snapshotProps iceberg.Properties, opts ...WriteOpt) error { fs, err := t.tbl.fsF(ctx) if err != nil { return err } - appendFiles := t.appendSnapshotProducer(fs, snapshotProps) + branch := resolveBranch(opts) + appendFiles := t.appendSnapshotProducer(fs, snapshotProps, branch) itr := recordsToDataFiles(ctx, t.tbl.Location(), t.meta, recordWritingArgs{ sc: rdr.Schema(), itr: array.IterFromReader(rdr), @@ -344,10 +381,10 @@ func (t *Transaction) Append(ctx context.Context, rdr array.RecordReader, snapsh // operation is only valid if the data is exactly the same as the previous snapshot. // // For now, we'll keep using an overwrite operation. -func (t *Transaction) ReplaceDataFiles(ctx context.Context, filesToDelete, filesToAdd []string, snapshotProps iceberg.Properties) error { +func (t *Transaction) ReplaceDataFiles(ctx context.Context, filesToDelete, filesToAdd []string, snapshotProps iceberg.Properties, opts ...WriteOpt) error { if len(filesToDelete) == 0 { if len(filesToAdd) > 0 { - return t.AddFiles(ctx, filesToAdd, snapshotProps, false) + return t.AddFiles(ctx, filesToAdd, snapshotProps, false, opts...) } } @@ -372,7 +409,11 @@ func (t *Transaction) ReplaceDataFiles(ctx context.Context, filesToDelete, files return errors.New("add file paths must be unique for ReplaceDataFiles") } - s := t.meta.currentSnapshot() + branch := resolveBranch(opts) + var s *Snapshot + if sid := t.meta.SnapshotIDForRef(branch); sid != nil { + s, _ = t.meta.SnapshotByID(*sid) + } if s == nil { return fmt.Errorf("%w: cannot replace files in a table without an existing snapshot", ErrInvalidOperation) } @@ -413,7 +454,7 @@ func (t *Transaction) ReplaceDataFiles(ctx context.Context, filesToDelete, files } commitUUID := uuid.New() - updater := t.updateSnapshot(fs, snapshotProps, OpOverwrite).mergeOverwrite(&commitUUID) + updater := t.updateSnapshot(fs, snapshotProps, OpOverwrite, branch).mergeOverwrite(&commitUUID) for _, df := range markedForDeletion { updater.deleteDataFile(df) @@ -435,7 +476,7 @@ func (t *Transaction) ReplaceDataFiles(ctx context.Context, filesToDelete, files return t.apply(updates, reqs) } -func (t *Transaction) AddFiles(ctx context.Context, files []string, snapshotProps iceberg.Properties, ignoreDuplicates bool) error { +func (t *Transaction) AddFiles(ctx context.Context, files []string, snapshotProps iceberg.Properties, ignoreDuplicates bool, opts ...WriteOpt) error { set := make(map[string]string) for _, f := range files { set[f] = f @@ -484,7 +525,8 @@ func (t *Transaction) AddFiles(ctx context.Context, files []string, snapshotProp return err } - updater := t.updateSnapshot(fs, snapshotProps, OpAppend).fastAppend() + branch := resolveBranch(opts) + updater := t.updateSnapshot(fs, snapshotProps, OpAppend, branch).fastAppend() dataFiles := filesToDataFiles(ctx, fs, t.meta, slices.Values(files)) for df, err := range dataFiles { @@ -533,6 +575,7 @@ type overwriteOperation struct { concurrency int filter iceberg.BooleanExpression caseSensitive bool + branch string } // OverwriteOption applies options to overwrite operations @@ -568,6 +611,13 @@ func WithOverwriteCaseInsensitive() OverwriteOption { } } +// WithOverwriteBranch sets the target branch for the overwrite. Default is main. +func WithOverwriteBranch(branch string) OverwriteOption { + return func(op *overwriteOperation) { + op.branch = branch + } +} + // Overwrite overwrites the table data using a RecordReader. // // An optional filter (see WithOverwriteFilter) determines which existing data to delete or rewrite: @@ -592,12 +642,16 @@ func (t *Transaction) Overwrite(ctx context.Context, rdr array.RecordReader, sna concurrency: runtime.GOMAXPROCS(0), filter: iceberg.AlwaysTrue{}, caseSensitive: true, + branch: MainBranch, } for _, apply := range opts { apply(&overwrite) } + if overwrite.branch == "" { + overwrite.branch = MainBranch + } - updater, err := t.performCopyOnWriteDeletion(ctx, OpOverwrite, snapshotProps, overwrite.filter, overwrite.caseSensitive, overwrite.concurrency) + updater, err := t.performCopyOnWriteDeletion(ctx, OpOverwrite, snapshotProps, overwrite.filter, overwrite.caseSensitive, overwrite.concurrency, overwrite.branch) if err != nil { return err } @@ -628,7 +682,7 @@ func (t *Transaction) Overwrite(ctx context.Context, rdr array.RecordReader, sna return t.apply(updates, reqs) } -func (t *Transaction) performCopyOnWriteDeletion(ctx context.Context, operation Operation, snapshotProps iceberg.Properties, filter iceberg.BooleanExpression, caseSensitive bool, concurrency int) (*snapshotProducer, error) { +func (t *Transaction) performCopyOnWriteDeletion(ctx context.Context, operation Operation, snapshotProps iceberg.Properties, filter iceberg.BooleanExpression, caseSensitive bool, concurrency int, branch string) (*snapshotProducer, error) { fs, err := t.tbl.fsF(ctx) if err != nil { return nil, err @@ -646,8 +700,11 @@ func (t *Transaction) performCopyOnWriteDeletion(ctx context.Context, operation } } + if branch == "" { + branch = MainBranch + } commitUUID := uuid.New() - updater := t.updateSnapshot(fs, snapshotProps, operation).mergeOverwrite(&commitUUID) + updater := t.updateSnapshot(fs, snapshotProps, operation, branch).mergeOverwrite(&commitUUID) filesToDelete, filesToRewrite, err := t.classifyFilesForOverwrite(ctx, fs, filter, caseSensitive, concurrency) if err != nil { @@ -723,7 +780,7 @@ func (t *Transaction) Delete(ctx context.Context, filter iceberg.BooleanExpressi if writeDeleteMode != WriteModeCopyOnWrite { return fmt.Errorf("'%s' is set to '%s' but only '%s' is currently supported", WriteDeleteModeKey, writeDeleteMode, WriteModeCopyOnWrite) } - updater, err := t.performCopyOnWriteDeletion(ctx, OpDelete, snapshotProps, filter, deleteOp.caseSensitive, deleteOp.concurrency) + updater, err := t.performCopyOnWriteDeletion(ctx, OpDelete, snapshotProps, filter, deleteOp.caseSensitive, deleteOp.concurrency, MainBranch) if err != nil { return err } diff --git a/table/transaction_test.go b/table/transaction_test.go index 1f64adf1c..afceeda4b 100644 --- a/table/transaction_test.go +++ b/table/transaction_test.go @@ -535,6 +535,94 @@ func (s *SparkIntegrationTestSuite) TestDeleteInsensitive() { +----------+---------+---+`) } +// TestBranchWrites verifies that WithBranch writes to the given branch: main stays +// unchanged when appending to a branch, and scanning by ref returns the correct data. +func (s *SparkIntegrationTestSuite) TestBranchWrites() { + icebergSchema := iceberg.NewSchema(0, + iceberg.NestedField{ID: 1, Name: "id", Type: iceberg.PrimitiveTypes.Int32}, + iceberg.NestedField{ID: 2, Name: "value", Type: iceberg.PrimitiveTypes.String}, + ) + + tbl, err := s.cat.CreateTable(s.ctx, catalog.ToIdentifier("default", "go_test_branch_writes"), icebergSchema) + s.Require().NoError(err) + + arrowSchema, err := table.SchemaToArrowSchema(icebergSchema, nil, true, false) + s.Require().NoError(err) + + // 1) Append to main (default), commit + mainTable, err := array.TableFromJSON(memory.DefaultAllocator, arrowSchema, []string{ + `[{"id": 1, "value": "main-a"}, {"id": 2, "value": "main-b"}]`, + }) + s.Require().NoError(err) + defer mainTable.Release() + + tx := tbl.NewTransaction() + err = tx.AppendTable(s.ctx, mainTable, 2, nil) + s.Require().NoError(err) + tbl, err = tx.Commit(s.ctx) + s.Require().NoError(err) + + mainSnapID := tbl.CurrentSnapshot().SnapshotID + mainRows, err := tbl.Scan().ToArrowTable(s.ctx) + s.Require().NoError(err) + defer mainRows.Release() + s.Require().Equal(int64(2), mainRows.NumRows(), "main should have 2 rows") + + // 2) Append to branch "test-branch", commit + branchTable, err := array.TableFromJSON(memory.DefaultAllocator, arrowSchema, []string{ + `[{"id": 10, "value": "branch-x"}, {"id": 11, "value": "branch-y"}]`, + }) + s.Require().NoError(err) + defer branchTable.Release() + + tx = tbl.NewTransaction() + err = tx.AppendTable(s.ctx, branchTable, 2, nil, table.WithBranch("test-branch")) + s.Require().NoError(err) + tbl, err = tx.Commit(s.ctx) + s.Require().NoError(err) + + // 3) Main unchanged: same snapshot and row count + s.Require().Equal(mainSnapID, tbl.CurrentSnapshot().SnapshotID, "main ref should still point to first snapshot") + mainAfter, err := tbl.Scan().ToArrowTable(s.ctx) + s.Require().NoError(err) + defer mainAfter.Release() + s.Require().Equal(int64(2), mainAfter.NumRows(), "main should still have 2 rows after branch write") + + // 4) Branch has its own snapshot and the branch-only rows + branchScan, err := tbl.Scan().UseRef("test-branch") + s.Require().NoError(err) + branchRows, err := branchScan.ToArrowTable(s.ctx) + s.Require().NoError(err) + defer branchRows.Release() + s.Require().Equal(int64(2), branchRows.NumRows(), "test-branch should have 2 rows") + + // 5) Optional: append again to main; branch ref unchanged + moreMain, err := array.TableFromJSON(memory.DefaultAllocator, arrowSchema, []string{ + `[{"id": 3, "value": "main-c"}]`, + }) + s.Require().NoError(err) + defer moreMain.Release() + + tx = tbl.NewTransaction() + err = tx.AppendTable(s.ctx, moreMain, 1, nil) + s.Require().NoError(err) + tbl, err = tx.Commit(s.ctx) + s.Require().NoError(err) + + s.Require().NotEqual(mainSnapID, tbl.CurrentSnapshot().SnapshotID, "main should advance to new snapshot") + mainFinal, err := tbl.Scan().ToArrowTable(s.ctx) + s.Require().NoError(err) + defer mainFinal.Release() + s.Require().Equal(int64(3), mainFinal.NumRows(), "main should have 3 rows") + + branchAgain, err := tbl.Scan().UseRef("test-branch") + s.Require().NoError(err) + branchFinal, err := branchAgain.ToArrowTable(s.ctx) + s.Require().NoError(err) + defer branchFinal.Release() + s.Require().Equal(int64(2), branchFinal.NumRows(), "test-branch should still have 2 rows (isolation)") +} + func TestSparkIntegration(t *testing.T) { suite.Run(t, new(SparkIntegrationTestSuite)) } From 067723cb9b10ab84e028b0cdde722c3c768d6955 Mon Sep 17 00:00:00 2001 From: kanthi subramanian Date: Tue, 10 Mar 2026 14:29:21 -0500 Subject: [PATCH 2/7] Fixed merge conflict issues --- table/snapshot_producers.go | 2 +- table/transaction.go | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/table/snapshot_producers.go b/table/snapshot_producers.go index e6cdbdcf6..b83171682 100644 --- a/table/snapshot_producers.go +++ b/table/snapshot_producers.go @@ -444,7 +444,7 @@ type snapshotProducer struct { branch string } -func createSnapshotProducer(op Operation, txn *Transaction, fs iceio.WriteFileIO, commitUUID *uuid.UUID, snapshotProps iceberg.Properties) *snapshotProducer { +func createSnapshotProducer(op Operation, txn *Transaction, fs iceio.WriteFileIO, commitUUID *uuid.UUID, snapshotProps iceberg.Properties, branch string) *snapshotProducer { var ( commit uuid.UUID parentSnapshot int64 = -1 diff --git a/table/transaction.go b/table/transaction.go index 9bd3670f2..0d84acb9f 100644 --- a/table/transaction.go +++ b/table/transaction.go @@ -607,7 +607,7 @@ func (t *Transaction) AddDataFiles(ctx context.Context, dataFiles []iceberg.Data } } - appendFiles := t.appendSnapshotProducer(fs, snapshotProps) + appendFiles := t.appendSnapshotProducer(fs, snapshotProps, MainBranch) for _, df := range dataFiles { appendFiles.appendDataFile(df) } @@ -710,7 +710,7 @@ func (t *Transaction) ReplaceDataFilesWithDataFiles(ctx context.Context, filesTo } commitUUID := uuid.New() - updater := t.updateSnapshot(fs, snapshotProps, OpOverwrite).mergeOverwrite(&commitUUID) + updater := t.updateSnapshot(fs, snapshotProps, OpOverwrite, MainBranch).mergeOverwrite(&commitUUID) for _, df := range markedForDeletion { updater.deleteDataFile(df) @@ -958,7 +958,7 @@ func (t *Transaction) performCopyOnWriteDeletion(ctx context.Context, operation commitUUID := uuid.New() updater := t.updateSnapshot(fs, snapshotProps, operation, branch).mergeOverwrite(&commitUUID) - filesToDelete, filesToRewrite, err := t.classifyFilesForOverwrite(ctx, fs, filter, caseSensitive, concurrency) + filesToDelete, filesToRewrite, err := t.classifyFilesForDeletions(ctx, fs, filter, caseSensitive, concurrency) if err != nil { return nil, err } @@ -995,7 +995,7 @@ func (t *Transaction) performMergeOnReadDeletion(ctx context.Context, snapshotPr } commitUUID := uuid.New() - updater := t.updateSnapshot(fs, snapshotProps, OpDelete).mergeOverwrite(&commitUUID) + updater := t.updateSnapshot(fs, snapshotProps, OpDelete, MainBranch).mergeOverwrite(&commitUUID) filesToDelete, withPartialDeletions, err := t.classifyFilesForDeletions(ctx, fs, filter, caseSensitive, concurrency) if err != nil { From d0e3176cc9ebdd49280fd7755909573157a2425d Mon Sep 17 00:00:00 2001 From: kanthi subramanian Date: Tue, 10 Mar 2026 18:47:21 -0500 Subject: [PATCH 3/7] Fixed branch to newFastAppendFilesProducer --- table/snapshot_producers_test.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/table/snapshot_producers_test.go b/table/snapshot_producers_test.go index c35973f80..1d06a6c86 100644 --- a/table/snapshot_producers_test.go +++ b/table/snapshot_producers_test.go @@ -193,7 +193,7 @@ func TestCommitV3RowLineage(t *testing.T) { // Single data file with record count 1 (newTestDataFile uses 1, 1 for record count and file size). const expectedAddedRows = 1 - sp := newFastAppendFilesProducer(OpAppend, txn, trackIO, nil, nil) + sp := newFastAppendFilesProducer(OpAppend, txn, trackIO, nil, nil, MainBranch) df := newTestDataFile(t, spec, "file://data.parquet", nil) sp.appendDataFile(df) @@ -226,7 +226,7 @@ func TestCommitV3RowLineageTwoSequentialCommits(t *testing.T) { txn.meta.formatVersion = 3 // First commit: new table, append one file (1 row). - sp1 := newFastAppendFilesProducer(OpAppend, txn, memIO, nil, nil) + sp1 := newFastAppendFilesProducer(OpAppend, txn, memIO, nil, nil, MainBranch) sp1.appendDataFile(newTestDataFile(t, spec, "file://data-1.parquet", nil)) updates1, reqs1, err := sp1.commit() require.NoError(t, err, "first commit should succeed") @@ -244,7 +244,7 @@ func TestCommitV3RowLineageTwoSequentialCommits(t *testing.T) { tbl2 := New(ident, meta1, "metadata.json", func(context.Context) (iceio.IO, error) { return memIO, nil }, nil) txn2 := tbl2.NewTransaction() txn2.meta.formatVersion = 3 - sp2 := newFastAppendFilesProducer(OpAppend, txn2, memIO, nil, nil) + sp2 := newFastAppendFilesProducer(OpAppend, txn2, memIO, nil, nil, MainBranch) sp2.appendDataFile(newTestDataFile(t, spec, "file://data-2.parquet", nil)) updates2, reqs2, err := sp2.commit() require.NoError(t, err, "second commit should succeed") @@ -270,7 +270,7 @@ func TestCommitV3RowLineageDeltaIncludesExistingRows(t *testing.T) { txn.meta.formatVersion = 3 // First commit: one file (1 row). - sp1 := newFastAppendFilesProducer(OpAppend, txn, memIO, nil, nil) + sp1 := newFastAppendFilesProducer(OpAppend, txn, memIO, nil, nil, MainBranch) sp1.appendDataFile(newTestDataFile(t, spec, "file://data-1.parquet", nil)) updates1, reqs1, err := sp1.commit() require.NoError(t, err, "first commit should succeed") @@ -289,7 +289,7 @@ func TestCommitV3RowLineageDeltaIncludesExistingRows(t *testing.T) { } txn2.meta.props[ManifestMergeEnabledKey] = "true" txn2.meta.props[ManifestMinMergeCountKey] = "2" - sp2 := newMergeAppendFilesProducer(OpAppend, txn2, memIO, nil, nil) + sp2 := newMergeAppendFilesProducer(OpAppend, txn2, memIO, nil, nil, MainBranch) sp2.appendDataFile(newTestDataFile(t, spec, "file://data-2.parquet", nil)) updates2, reqs2, err := sp2.commit() require.NoError(t, err, "second commit (merge) should succeed") @@ -354,7 +354,7 @@ func TestCommitV3RowLineagePersistsManifestFirstRowID(t *testing.T) { txn.meta.formatVersion = 3 // Use multi-row files to make row-range starts obvious. - sp1 := newFastAppendFilesProducer(OpAppend, txn, memIO, nil, nil) + sp1 := newFastAppendFilesProducer(OpAppend, txn, memIO, nil, nil, MainBranch) sp1.appendDataFile(newTestDataFileWithCount(t, spec, "file://data-1.parquet", nil, 3)) updates1, reqs1, err := sp1.commit() require.NoError(t, err, "first commit should succeed") @@ -376,7 +376,7 @@ func TestCommitV3RowLineagePersistsManifestFirstRowID(t *testing.T) { tbl2 := New(ident, meta1, "metadata.json", func(context.Context) (iceio.IO, error) { return memIO, nil }, nil) txn2 := tbl2.NewTransaction() txn2.meta.formatVersion = 3 - sp2 := newFastAppendFilesProducer(OpAppend, txn2, memIO, nil, nil) + sp2 := newFastAppendFilesProducer(OpAppend, txn2, memIO, nil, nil, MainBranch) sp2.appendDataFile(newTestDataFileWithCount(t, spec, "file://data-2.parquet", nil, 5)) updates2, _, err := sp2.commit() require.NoError(t, err, "second commit should succeed") From 637ca8b1b51d5557160aa7c0ece4a295ff7b1004 Mon Sep 17 00:00:00 2001 From: kanthi subramanian Date: Tue, 10 Mar 2026 19:30:07 -0500 Subject: [PATCH 4/7] Fix test cases. --- cmd/iceberg/output_test.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/cmd/iceberg/output_test.go b/cmd/iceberg/output_test.go index 3cb20196e..442e25dba 100644 --- a/cmd/iceberg/output_test.go +++ b/cmd/iceberg/output_test.go @@ -121,6 +121,12 @@ Current Schema, id=1 Current Snapshot | append, {}: id=3055729675574597004, parent_id=3051729675574597004, schema_id=1, sequence_number=1, timestamp_ms=1555100955770, manifest_list=s3://a/b/2.avro +Refs +Name | Type | Snapshot ID +----------------------------------- +test | tag | 3051729675574597004 +main | branch | 3055729675574597004 + Snapshots ├──Snapshot 3051729675574597004, schema 1: s3://a/b/1.avro └──Snapshot 3055729675574597004, schema 1: s3://a/b/2.avro @@ -341,7 +347,7 @@ func Test_jsonOutput_DescribeTable(t *testing.T) { } }`, }, - expected: `{"metadata":{"last-sequence-number":34,"format-version":2,"table-uuid":"9c12d441-03fe-4693-9a96-a0705ddf69c1","location":"s3://bucket/test/location","last-updated-ms":1602638573590,"last-column-id":3,"schemas":[{"type":"struct","fields":[{"type":"long","id":1,"name":"x","required":true}],"schema-id":0,"identifier-field-ids":[]},{"type":"struct","fields":[{"type":"long","id":1,"name":"x","required":true},{"type":"long","id":2,"name":"y","required":true,"doc":"comment"},{"type":"long","id":3,"name":"z","required":true}],"schema-id":1,"identifier-field-ids":[1,2]}],"current-schema-id":1,"partition-specs":[{"spec-id":0,"fields":[{"source-id":1,"field-id":1000,"name":"x","transform":"identity"}]}],"default-spec-id":0,"last-partition-id":1000,"properties":{"read.split.target.size":"134217728"},"snapshots":[{"snapshot-id":3051729675574597004,"sequence-number":0,"timestamp-ms":1515100955770,"manifest-list":"s3://a/b/1.avro","summary":{"operation":"append"},"schema-id":1},{"snapshot-id":3055729675574597004,"parent-snapshot-id":3051729675574597004,"sequence-number":1,"timestamp-ms":1555100955770,"manifest-list":"s3://a/b/2.avro","summary":{"operation":"append"},"schema-id":1}],"current-snapshot-id":3055729675574597004,"snapshot-log":[{"snapshot-id":3051729675574597004,"timestamp-ms":1515100955770},{"snapshot-id":3055729675574597004,"timestamp-ms":1555100955770}],"metadata-log":[{"metadata-file":"s3://bucket/.../v1.json","timestamp-ms":1515100}],"sort-orders":[{"order-id":3,"fields":[{"source-id":2,"transform":"identity","direction":"asc","null-order":"nulls-first"},{"source-id":3,"transform":"bucket[4]","direction":"desc","null-order":"nulls-last"}]}],"default-sort-order-id":3,"refs":{"main":{"snapshot-id":3055729675574597004,"type":"branch"},"test":{"snapshot-id":3051729675574597004,"type":"tag","max-ref-age-ms":10000000}}},"sort-order":{"order-id":3,"fields":[{"source-id":2,"transform":"identity","direction":"asc","null-order":"nulls-first"},{"source-id":3,"transform":"bucket[4]","direction":"desc","null-order":"nulls-last"}]},"current-snapshot":{"snapshot-id":3055729675574597004,"parent-snapshot-id":3051729675574597004,"sequence-number":1,"timestamp-ms":1555100955770,"manifest-list":"s3://a/b/2.avro","summary":{"operation":"append"},"schema-id":1},"spec":{"spec-id":0,"fields":[{"source-id":1,"field-id":1000,"name":"x","transform":"identity"}]},"schema":{"type":"struct","fields":[{"type":"long","id":1,"name":"x","required":true},{"type":"long","id":2,"name":"y","required":true,"doc":"comment"},{"type":"long","id":3,"name":"z","required":true}],"schema-id":1,"identifier-field-ids":[1,2]}}`, + expected: `{"metadata":{"last-sequence-number":34,"format-version":2,"table-uuid":"9c12d441-03fe-4693-9a96-a0705ddf69c1","location":"s3://bucket/test/location","last-updated-ms":1602638573590,"last-column-id":3,"schemas":[{"type":"struct","fields":[{"type":"long","id":1,"name":"x","required":true}],"schema-id":0,"identifier-field-ids":[]},{"type":"struct","fields":[{"type":"long","id":1,"name":"x","required":true},{"type":"long","id":2,"name":"y","required":true,"doc":"comment"},{"type":"long","id":3,"name":"z","required":true}],"schema-id":1,"identifier-field-ids":[1,2]}],"current-schema-id":1,"partition-specs":[{"spec-id":0,"fields":[{"source-id":1,"field-id":1000,"name":"x","transform":"identity"}]}],"default-spec-id":0,"last-partition-id":1000,"properties":{"read.split.target.size":"134217728"},"snapshots":[{"snapshot-id":3051729675574597004,"sequence-number":0,"timestamp-ms":1515100955770,"manifest-list":"s3://a/b/1.avro","summary":{"operation":"append"},"schema-id":1},{"snapshot-id":3055729675574597004,"parent-snapshot-id":3051729675574597004,"sequence-number":1,"timestamp-ms":1555100955770,"manifest-list":"s3://a/b/2.avro","summary":{"operation":"append"},"schema-id":1}],"current-snapshot-id":3055729675574597004,"snapshot-log":[{"snapshot-id":3051729675574597004,"timestamp-ms":1515100955770},{"snapshot-id":3055729675574597004,"timestamp-ms":1555100955770}],"metadata-log":[{"metadata-file":"s3://bucket/.../v1.json","timestamp-ms":1515100}],"sort-orders":[{"order-id":3,"fields":[{"source-id":2,"transform":"identity","direction":"asc","null-order":"nulls-first"},{"source-id":3,"transform":"bucket[4]","direction":"desc","null-order":"nulls-last"}]}],"default-sort-order-id":3,"refs":{"main":{"snapshot-id":3055729675574597004,"type":"branch"},"test":{"snapshot-id":3051729675574597004,"type":"tag","max-ref-age-ms":10000000}}},"sort-order":{"order-id":3,"fields":[{"source-id":2,"transform":"identity","direction":"asc","null-order":"nulls-first"},{"source-id":3,"transform":"bucket[4]","direction":"desc","null-order":"nulls-last"}]},"current-snapshot":{"snapshot-id":3055729675574597004,"parent-snapshot-id":3051729675574597004,"sequence-number":1,"timestamp-ms":1555100955770,"manifest-list":"s3://a/b/2.avro","summary":{"operation":"append"},"schema-id":1},"spec":{"spec-id":0,"fields":[{"source-id":1,"field-id":1000,"name":"x","transform":"identity"}]},"schema":{"type":"struct","fields":[{"type":"long","id":1,"name":"x","required":true},{"type":"long","id":2,"name":"y","required":true,"doc":"comment"},{"type":"long","id":3,"name":"z","required":true}],"schema-id":1,"identifier-field-ids":[1,2]},"refs":{"main":{"snapshot-id":3055729675574597004,"type":"branch"},"test":{"snapshot-id":3051729675574597004,"type":"tag","max-ref-age-ms":10000000}}}`, }, { name: "Describe a table with empty objects", @@ -381,7 +387,7 @@ func Test_jsonOutput_DescribeTable(t *testing.T) { "refs": { } }`, }, - expected: `{"metadata":{"last-sequence-number":0,"format-version":2,"table-uuid":"9c12d441-03fe-4693-9a96-a0705ddf69c1","location":"s3://bucket/test/location","last-updated-ms":1602638573590,"last-column-id":3,"schemas":[{"type":"struct","fields":[{"type":"long","id":1,"name":"x","required":true}],"schema-id":0,"identifier-field-ids":[]},{"type":"struct","fields":[{"type":"long","id":1,"name":"x","required":true}],"schema-id":0,"identifier-field-ids":[]}],"current-schema-id":0,"partition-specs":[{"spec-id":0,"fields":[]}],"default-spec-id":0,"last-partition-id":1000,"properties":{"read.split.target.size":"134217728"},"sort-orders":[{"order-id":0,"fields":[]}],"default-sort-order-id":0},"sort-order":{"order-id":0,"fields":[]},"spec":{"spec-id":0,"fields":[]},"schema":{"type":"struct","fields":[{"type":"long","id":1,"name":"x","required":true}],"schema-id":0,"identifier-field-ids":[]}}`, + expected: `{"metadata":{"current-schema-id":0,"default-sort-order-id":0,"default-spec-id":0,"format-version":2,"last-column-id":3,"last-partition-id":1000,"last-sequence-number":0,"last-updated-ms":1602638573590,"location":"s3://bucket/test/location","partition-specs":[{"spec-id":0,"fields":[]}],"properties":{"read.split.target.size":"134217728"},"schemas":[{"type":"struct","fields":[{"type":"long","id":1,"name":"x","required":true}],"schema-id":0,"identifier-field-ids":[]},{"type":"struct","fields":[{"type":"long","id":1,"name":"x","required":true}],"schema-id":0,"identifier-field-ids":[]}],"sort-orders":[{"order-id":0,"fields":[]}],"table-uuid":"9c12d441-03fe-4693-9a96-a0705ddf69c1"},"schema":{"type":"struct","fields":[{"type":"long","id":1,"name":"x","required":true}],"schema-id":0,"identifier-field-ids":[]},"sort-order":{"order-id":0,"fields":[]},"spec":{"spec-id":0,"fields":[]}}`, }, } for _, tt := range tests { From 0f1ef23e2f6a56e2d36601bdbfa78831970fabaa Mon Sep 17 00:00:00 2001 From: kanthi subramanian Date: Tue, 10 Mar 2026 20:01:06 -0500 Subject: [PATCH 5/7] Fix lint errors --- table/metadata.go | 2 ++ table/snapshot_producers.go | 1 + table/transaction.go | 2 ++ 3 files changed, 5 insertions(+) diff --git a/table/metadata.go b/table/metadata.go index 313996d0d..4b2dcd18a 100644 --- a/table/metadata.go +++ b/table/metadata.go @@ -326,8 +326,10 @@ func (b *MetadataBuilder) SnapshotIDForRef(refName string) *int64 { } if ref, ok := b.refs[refName]; ok { id := ref.SnapshotID + return &id } + return nil } diff --git a/table/snapshot_producers.go b/table/snapshot_producers.go index b83171682..e9e395a85 100644 --- a/table/snapshot_producers.go +++ b/table/snapshot_producers.go @@ -786,6 +786,7 @@ func (sp *snapshotProducer) commit() (_ []Update, _ []Requirement, err error) { } else { assertReq = AssertRefSnapshotID(sp.branch, nil) } + return []Update{ NewAddSnapshotUpdate(&snapshot), NewSetSnapshotRefUpdate(sp.branch, sp.snapshotID, BranchRef, -1, -1, -1), diff --git a/table/transaction.go b/table/transaction.go index 0d84acb9f..c8156eea2 100644 --- a/table/transaction.go +++ b/table/transaction.go @@ -68,6 +68,7 @@ func resolveBranch(opts []WriteOpt) string { if o.branch == "" { return MainBranch } + return o.branch } @@ -169,6 +170,7 @@ func (t *Transaction) updateSnapshot(fs io.IO, props iceberg.Properties, operati if branch == "" { branch = MainBranch } + return snapshotUpdate{ txn: t, io: fs.(io.WriteFileIO), From 99dd69ab8e4f201bd1a278e80addd5cdde192d98 Mon Sep 17 00:00:00 2001 From: kanthi subramanian Date: Tue, 10 Mar 2026 20:40:59 -0500 Subject: [PATCH 6/7] sort the refs so its consistent in describe. --- cmd/iceberg/output.go | 16 +++++++++++++--- cmd/iceberg/output_test.go | 2 +- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/cmd/iceberg/output.go b/cmd/iceberg/output.go index 62d1dd55d..b767ae97f 100644 --- a/cmd/iceberg/output.go +++ b/cmd/iceberg/output.go @@ -23,6 +23,7 @@ import ( "fmt" "log" "os" + "sort" "strconv" "strings" @@ -108,11 +109,20 @@ func (t textOutput) DescribeTable(tbl *table.Table) { }).Render() refsData := pterm.TableData{{"Name", "Type", "Snapshot ID"}} + type refRow struct { + name string + ref table.SnapshotRef + } + var refRows []refRow for name, ref := range tbl.Metadata().Refs() { + refRows = append(refRows, refRow{name, ref}) + } + sort.Slice(refRows, func(i, j int) bool { return refRows[i].name < refRows[j].name }) + for _, r := range refRows { refsData = append(refsData, []string{ - name, - string(ref.SnapshotRefType), - strconv.FormatInt(ref.SnapshotID, 10), + r.name, + string(r.ref.SnapshotRefType), + strconv.FormatInt(r.ref.SnapshotID, 10), }) } if len(refsData) > 1 { diff --git a/cmd/iceberg/output_test.go b/cmd/iceberg/output_test.go index 442e25dba..45c00eb0a 100644 --- a/cmd/iceberg/output_test.go +++ b/cmd/iceberg/output_test.go @@ -124,8 +124,8 @@ Current Snapshot | append, {}: id=3055729675574597004, parent_id=305172967557459 Refs Name | Type | Snapshot ID ----------------------------------- -test | tag | 3051729675574597004 main | branch | 3055729675574597004 +test | tag | 3051729675574597004 Snapshots ├──Snapshot 3051729675574597004, schema 1: s3://a/b/1.avro From dc255dbf2bd19b43b732735aa7cd454e6248f584 Mon Sep 17 00:00:00 2001 From: kanthi subramanian Date: Sat, 14 Mar 2026 17:54:43 -0500 Subject: [PATCH 7/7] restore accidental removal of toManifestFile --- table/snapshot_producers.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/table/snapshot_producers.go b/table/snapshot_producers.go index e9e395a85..5b83236f2 100644 --- a/table/snapshot_producers.go +++ b/table/snapshot_producers.go @@ -185,7 +185,7 @@ func (of *overwriteFiles) existingManifests() ([]iceberg.ManifestFile, error) { return nil, err } - return wr.ToManifestFile(path, counter.Count) + return wr.ToManifestFile(path, counter.Count, iceberg.WithManifestFileContent(m.ManifestContent())) } mf, err := rewriteManifest(m, notDeleted)