Skip to content
This repository was archived by the owner on Jan 20, 2026. It is now read-only.

Commit a309e97

Browse files
authored
Fix async flush logic and flaky unit test (#122)
## Describe your changes and provide context This PR will fix 3 issues: 1. Flaky unit test is caused by async writes not committing latest version in time before db is closed, resulting in the recovery process being skipped since latestVersion = 0 2. Async write is only enabled properly when useDedicatedChangelog=true, however, we want all async writes are going through the extra WAL file to avoid data loss so that we can recover during initilization. 3. Fix closing order, we should close the channel first, wait for all pending changes to be flushed to WAL and then closing the WAL ## Testing performed to validate your change
1 parent fb9ecb0 commit a309e97

6 files changed

Lines changed: 67 additions & 74 deletions

File tree

config/config.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,6 @@ type StateStoreConfig struct {
6262
// default to empty
6363
DBDirectory string `mapstructure:"db-directory"`
6464

65-
// DedicatedChangelog defines if we should use a separate changelog for SS store other than sharing with SC
66-
DedicatedChangelog bool `mapstructure:"dedicated-changelog"`
67-
6865
// Backend defines the backend database used for state-store
6966
// Supported backends: pebbledb, rocksdb
7067
// defaults to pebbledb
@@ -101,7 +98,6 @@ func DefaultStateCommitConfig() StateCommitConfig {
10198
return StateCommitConfig{
10299
Enable: true,
103100
AsyncCommitBuffer: DefaultAsyncCommitBuffer,
104-
CacheSize: DefaultCacheSize,
105101
SnapshotInterval: DefaultSnapshotInterval,
106102
SnapshotKeepRecent: DefaultSnapshotKeepRecent,
107103
}

ss/pebbledb/db.go

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -152,34 +152,35 @@ func New(dataDir string, config config.StateStoreConfig) (*Database, error) {
152152
}
153153
database.lastRangeHashedCache = lastHashed
154154

155-
if config.DedicatedChangelog {
156-
if config.KeepRecent < 0 {
157-
return nil, errors.New("KeepRecent must be non-negative")
158-
}
159-
streamHandler, _ := changelog.NewStream(
160-
logger.NewNopLogger(),
161-
utils.GetChangelogPath(dataDir),
162-
changelog.Config{
163-
DisableFsync: true,
164-
ZeroCopy: true,
165-
KeepRecent: uint64(config.KeepRecent),
166-
PruneInterval: 300 * time.Second,
167-
},
168-
)
169-
database.streamHandler = streamHandler
170-
go database.writeAsyncInBackground()
171-
}
155+
if config.KeepRecent < 0 {
156+
return nil, errors.New("KeepRecent must be non-negative")
157+
}
158+
streamHandler, _ := changelog.NewStream(
159+
logger.NewNopLogger(),
160+
utils.GetChangelogPath(dataDir),
161+
changelog.Config{
162+
DisableFsync: true,
163+
ZeroCopy: true,
164+
KeepRecent: uint64(config.KeepRecent),
165+
PruneInterval: time.Duration(config.PruneIntervalSeconds) * time.Second,
166+
},
167+
)
168+
database.streamHandler = streamHandler
169+
go database.writeAsyncInBackground()
170+
172171
return database, nil
173172
}
174173

175174
func (db *Database) Close() error {
176175
if db.streamHandler != nil {
176+
// First, stop accepting new pending changes and drain the worker
177+
close(db.pendingChanges)
178+
// Wait for the async writes to finish
179+
db.asyncWriteWG.Wait()
180+
// Now close the WAL stream
177181
_ = db.streamHandler.Close()
178182
db.streamHandler = nil
179-
close(db.pendingChanges)
180183
}
181-
// Wait for the async writes to finish
182-
db.asyncWriteWG.Wait()
183184
err := db.storage.Close()
184185
db.storage = nil
185186
return err

ss/pebbledb/hash_test.go

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,11 @@ func setupTestDB(t *testing.T) (*Database, string) {
2222

2323
// Set up config with hash range enabled
2424
cfg := config.StateStoreConfig{
25-
HashRange: 10, // 10 blocks per hash range
26-
AsyncWriteBuffer: 100,
27-
KeepRecent: 100,
28-
KeepLastVersion: true,
29-
ImportNumWorkers: 4,
30-
DedicatedChangelog: false,
25+
HashRange: 10, // 10 blocks per hash range
26+
AsyncWriteBuffer: 100,
27+
KeepRecent: 100,
28+
KeepLastVersion: true,
29+
ImportNumWorkers: 4,
3130
}
3231

3332
db, err := New(tempDir, cfg)
@@ -340,7 +339,7 @@ func TestAsyncComputeMissingRanges(t *testing.T) {
340339
require.NoError(t, err)
341340

342341
// Wait a bit for the async computation to complete
343-
time.Sleep(200 * time.Millisecond)
342+
time.Sleep(500 * time.Millisecond)
344343

345344
// We should now have hashed up to version 30 (3 complete ranges)
346345
lastHashed, err := db.GetLastRangeHashed()

ss/rocksdb/db.go

Lines changed: 14 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -110,20 +110,18 @@ func New(dataDir string, config config.StateStoreConfig) (*Database, error) {
110110
pendingChanges: make(chan VersionedChangesets, config.AsyncWriteBuffer),
111111
}
112112

113-
if config.DedicatedChangelog {
114-
streamHandler, _ := changelog.NewStream(
115-
logger.NewNopLogger(),
116-
utils.GetChangelogPath(dataDir),
117-
changelog.Config{
118-
DisableFsync: true,
119-
ZeroCopy: true,
120-
KeepRecent: uint64(config.KeepRecent),
121-
PruneInterval: 300 * time.Second,
122-
},
123-
)
124-
database.streamHandler = streamHandler
125-
go database.writeAsyncInBackground()
126-
}
113+
streamHandler, _ := changelog.NewStream(
114+
logger.NewNopLogger(),
115+
utils.GetChangelogPath(dataDir),
116+
changelog.Config{
117+
DisableFsync: true,
118+
ZeroCopy: true,
119+
KeepRecent: uint64(config.KeepRecent),
120+
PruneInterval: time.Duration(config.PruneIntervalSeconds) * time.Second,
121+
},
122+
)
123+
database.streamHandler = streamHandler
124+
go database.writeAsyncInBackground()
127125

128126
return database, nil
129127
}
@@ -495,16 +493,15 @@ func (db *Database) WriteBlockRangeHash(storeKey string, beginBlockRange, endBlo
495493

496494
func (db *Database) Close() error {
497495
if db.streamHandler != nil {
498-
// Close the changelog stream first
499-
db.streamHandler.Close()
500496
// Close the pending changes channel to signal the background goroutine to stop
501497
close(db.pendingChanges)
502498
// Wait for the async writes to finish processing all buffered items
503499
db.asyncWriteWG.Wait()
500+
// Close the changelog stream first
501+
_ = db.streamHandler.Close()
504502
// Only set to nil after background goroutine has finished
505503
db.streamHandler = nil
506504
}
507-
508505
db.storage.Close()
509506
db.storage = nil
510507
db.cfHandle = nil

ss/store.go

Lines changed: 20 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -41,17 +41,16 @@ func NewStateStore(logger logger.Logger, homeDir string, ssConfig config.StateSt
4141
if err != nil {
4242
return nil, err
4343
}
44+
4445
// Handle auto recovery for DB running with async mode
45-
if ssConfig.DedicatedChangelog {
46-
changelogPath := utils.GetChangelogPath(utils.GetStateStorePath(homeDir, ssConfig.Backend))
47-
if ssConfig.DBDirectory != "" {
48-
changelogPath = utils.GetChangelogPath(ssConfig.DBDirectory)
49-
}
50-
err := RecoverStateStore(logger, changelogPath, stateStore)
51-
if err != nil {
52-
return nil, err
53-
}
46+
changelogPath := utils.GetChangelogPath(utils.GetStateStorePath(homeDir, ssConfig.Backend))
47+
if ssConfig.DBDirectory != "" {
48+
changelogPath = utils.GetChangelogPath(ssConfig.DBDirectory)
49+
}
50+
if err := RecoverStateStore(logger, changelogPath, stateStore); err != nil {
51+
return nil, err
5452
}
53+
5554
// Start the pruning manager for DB
5655
pruningManager := pruning.NewPruningManager(logger, stateStore, int64(ssConfig.KeepRecent), int64(ssConfig.PruneIntervalSeconds))
5756
pruningManager.Start()
@@ -62,9 +61,6 @@ func NewStateStore(logger logger.Logger, homeDir string, ssConfig config.StateSt
6261
func RecoverStateStore(logger logger.Logger, changelogPath string, stateStore types.StateStore) error {
6362
ssLatestVersion := stateStore.GetLatestVersion()
6463
logger.Info(fmt.Sprintf("Recovering from changelog %s with latest SS version %d", changelogPath, ssLatestVersion))
65-
if ssLatestVersion <= 0 {
66-
return nil
67-
}
6864
streamHandler, err := changelog.NewStream(logger, changelogPath, changelog.Config{})
6965
if err != nil {
7066
return err
@@ -84,15 +80,20 @@ func RecoverStateStore(logger logger.Logger, changelogPath string, stateStore ty
8480
// Look backward to find where we should start replay from
8581
curVersion := lastEntry.Version
8682
curOffset := lastOffset
87-
for curVersion > ssLatestVersion && curOffset > firstOffset {
88-
curOffset--
89-
curEntry, errRead := streamHandler.ReadAt(curOffset)
90-
if errRead != nil {
91-
return err
83+
if ssLatestVersion > 0 {
84+
for curVersion > ssLatestVersion && curOffset > firstOffset {
85+
curOffset--
86+
curEntry, errRead := streamHandler.ReadAt(curOffset)
87+
if errRead != nil {
88+
return err
89+
}
90+
curVersion = curEntry.Version
9291
}
93-
curVersion = curEntry.Version
92+
} else {
93+
// Fresh store (or no applied versions) – start from the first offset
94+
curOffset = firstOffset
9495
}
95-
// Replay from the offset where the offset where the version is larger than SS store latest version
96+
// Replay from the offset where the version is larger than SS store latest version
9697
targetStartOffset := curOffset
9798
logger.Info(fmt.Sprintf("Start replaying changelog to recover StateStore from offset %d to %d", targetStartOffset, lastOffset))
9899
if targetStartOffset < lastOffset {

ss/store_test.go

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,15 @@ import (
1515

1616
func TestNewStateStore(t *testing.T) {
1717
tempDir := os.TempDir()
18-
homeDir := filepath.Join(tempDir, "seidb")
18+
homeDir := filepath.Join(tempDir, "pebbledb")
1919
ssConfig := config.StateStoreConfig{
20-
DedicatedChangelog: true,
21-
Backend: string(PebbleDBBackend),
22-
AsyncWriteBuffer: 50,
23-
KeepRecent: 500,
20+
Backend: string(PebbleDBBackend),
21+
AsyncWriteBuffer: 100,
22+
KeepRecent: 500,
2423
}
2524
stateStore, err := NewStateStore(logger.NewNopLogger(), homeDir, ssConfig)
2625
require.NoError(t, err)
27-
for i := 1; i < 20; i++ {
26+
for i := 1; i < 50; i++ {
2827
var changesets []*proto.NamedChangeSet
2928
kvPair := &iavl.KVPair{
3029
Delete: false,
@@ -51,7 +50,7 @@ func TestNewStateStore(t *testing.T) {
5150
require.NoError(t, err)
5251

5352
// Make sure key and values can be found
54-
for i := 1; i < 20; i++ {
53+
for i := 1; i < 50; i++ {
5554
value, err := stateStore.Get("storeA", int64(i), []byte(fmt.Sprintf("key%d", i)))
5655
require.NoError(t, err)
5756
require.Equal(t, fmt.Sprintf("value%d", i), string(value))

0 commit comments

Comments
 (0)