Skip to content
This repository was archived by the owner on Jan 20, 2026. It is now read-only.

Commit dab8a8d

Browse files
committed
Merge branch 'UpdateRawIterate' of https://github.com/sei-protocol/sei-db into UpdateRawIterate
2 parents c3e348d + 7ed0e6a commit dab8a8d

2 files changed

Lines changed: 64 additions & 12 deletions

File tree

sc/memiavl/multitree.go

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -339,19 +339,38 @@ func (t *MultiTree) Catchup(stream types.Stream[proto.ChangelogEntry], endVersio
339339
return fmt.Errorf("target index %d is in the future, latest index: %d", endIndex, lastIndex)
340340
}
341341

342+
var replayCount = 0
342343
err = stream.Replay(firstIndex, endIndex, func(index uint64, entry proto.ChangelogEntry) error {
343-
if err := t.apply(entry); err != nil {
344-
return fmt.Errorf("apply rlog entry failed, %w", err)
344+
if err := t.ApplyUpgrades(entry.Upgrades); err != nil {
345+
return err
346+
}
347+
updatedTrees := make(map[string]bool)
348+
for _, cs := range entry.Changesets {
349+
treeName := cs.Name
350+
t.TreeByName(treeName).ApplyChangeSetAsync(cs.Changeset)
351+
updatedTrees[treeName] = true
345352
}
346-
if _, err := t.SaveVersion(false); err != nil {
347-
return fmt.Errorf("replay changeset failed to save version, %w", err)
353+
for _, tree := range t.trees {
354+
if _, found := updatedTrees[tree.Name]; !found {
355+
tree.ApplyChangeSetAsync(iavl.ChangeSet{})
356+
}
357+
}
358+
t.lastCommitInfo.Version = utils.NextVersion(t.lastCommitInfo.Version, t.initialVersion)
359+
t.lastCommitInfo.StoreInfos = []proto.StoreInfo{}
360+
replayCount++
361+
if replayCount%1000 == 0 {
362+
fmt.Printf("Replayed %d changelog entries\n", replayCount)
348363
}
349364
return nil
350365
})
366+
367+
for _, tree := range t.trees {
368+
tree.WaitToCompleteAsyncWrite()
369+
}
370+
351371
if err != nil {
352372
return err
353373
}
354-
355374
t.UpdateCommitInfo()
356375
return nil
357376
}

sc/memiavl/tree.go

Lines changed: 40 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import (
1717
var _ types.Tree = (*Tree)(nil)
1818
var emptyHash = sha256.New().Sum(nil)
1919

20-
// verify change sets by replay them to rebuild iavl tree and verify the root hashes
20+
// Tree verify change sets by replay them to rebuild iavl tree and verify the root hashes
2121
type Tree struct {
2222
version uint32
2323
// root node of empty tree is represented as `nil`
@@ -31,6 +31,9 @@ type Tree struct {
3131

3232
// sync.RWMutex is used to protect the tree for thread safety during snapshot reload
3333
mtx *sync.RWMutex
34+
35+
pendingChanges chan iavl.ChangeSet
36+
pendingWg *sync.WaitGroup
3437
}
3538

3639
// NewEmptyTree creates an empty tree at an arbitrary version.
@@ -43,8 +46,9 @@ func NewEmptyTree(version uint64, initialVersion uint32) *Tree {
4346
version: uint32(version),
4447
initialVersion: initialVersion,
4548
// no need to copy if the tree is not backed by snapshot
46-
zeroCopy: true,
47-
mtx: &sync.RWMutex{},
49+
zeroCopy: true,
50+
mtx: &sync.RWMutex{},
51+
pendingWg: &sync.WaitGroup{},
4852
}
4953
}
5054

@@ -62,10 +66,11 @@ func NewWithInitialVersion(initialVersion uint32) *Tree {
6266
// NewFromSnapshot mmap the blob files and create the root node.
6367
func NewFromSnapshot(snapshot *Snapshot, zeroCopy bool, _ int) *Tree {
6468
tree := &Tree{
65-
version: snapshot.Version(),
66-
snapshot: snapshot,
67-
zeroCopy: zeroCopy,
68-
mtx: &sync.RWMutex{},
69+
version: snapshot.Version(),
70+
snapshot: snapshot,
71+
zeroCopy: zeroCopy,
72+
mtx: &sync.RWMutex{},
73+
pendingWg: &sync.WaitGroup{},
6974
}
7075

7176
if !snapshot.IsEmpty() {
@@ -116,6 +121,31 @@ func (t *Tree) ApplyChangeSet(changeSet iavl.ChangeSet) {
116121
}
117122
}
118123

124+
func (t *Tree) ApplyChangeSetAsync(changeSet iavl.ChangeSet) {
125+
if t.pendingChanges == nil {
126+
t.StartBackgroundWrite()
127+
}
128+
t.pendingChanges <- changeSet
129+
}
130+
131+
func (t *Tree) StartBackgroundWrite() {
132+
t.pendingWg.Add(1)
133+
t.pendingChanges = make(chan iavl.ChangeSet, 1000)
134+
go func() {
135+
defer t.pendingWg.Done()
136+
for nextChange := range t.pendingChanges {
137+
t.ApplyChangeSet(nextChange)
138+
_, _, _ = t.SaveVersion(false)
139+
}
140+
}()
141+
}
142+
143+
func (t *Tree) WaitToCompleteAsyncWrite() {
144+
close(t.pendingChanges)
145+
t.pendingWg.Wait()
146+
t.pendingChanges = nil
147+
}
148+
119149
func (t *Tree) Set(key, value []byte) {
120150
t.mtx.Lock()
121151
defer t.mtx.Unlock()
@@ -270,6 +300,9 @@ func (t *Tree) Close() error {
270300
err = t.snapshot.Close()
271301
t.snapshot = nil
272302
}
303+
if t.pendingChanges != nil {
304+
close(t.pendingChanges)
305+
}
273306
t.root = nil
274307
return err
275308
}

0 commit comments

Comments
 (0)