Skip to content
This repository was archived by the owner on Jan 20, 2026. It is now read-only.

Commit ae39c82

Browse files
committed
Fix async flush logic and flaky unit test
1 parent 0ae35ba commit ae39c82

5 files changed

Lines changed: 52 additions & 56 deletions

File tree

config/config.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,6 @@ type StateStoreConfig struct {
6262
// default to empty
6363
DBDirectory string `mapstructure:"db-directory"`
6464

65-
// DedicatedChangelog defines if we should use a separate changelog for SS store other than sharing with SC
66-
DedicatedChangelog bool `mapstructure:"dedicated-changelog"`
67-
6865
// Backend defines the backend database used for state-store
6966
// Supported backends: pebbledb, rocksdb
7067
// defaults to pebbledb
@@ -101,7 +98,6 @@ func DefaultStateCommitConfig() StateCommitConfig {
10198
return StateCommitConfig{
10299
Enable: true,
103100
AsyncCommitBuffer: DefaultAsyncCommitBuffer,
104-
CacheSize: DefaultCacheSize,
105101
SnapshotInterval: DefaultSnapshotInterval,
106102
SnapshotKeepRecent: DefaultSnapshotKeepRecent,
107103
}

ss/pebbledb/db.go

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -152,34 +152,35 @@ func New(dataDir string, config config.StateStoreConfig) (*Database, error) {
152152
}
153153
database.lastRangeHashedCache = lastHashed
154154

155-
if config.DedicatedChangelog {
156-
if config.KeepRecent < 0 {
157-
return nil, errors.New("KeepRecent must be non-negative")
158-
}
159-
streamHandler, _ := changelog.NewStream(
160-
logger.NewNopLogger(),
161-
utils.GetChangelogPath(dataDir),
162-
changelog.Config{
163-
DisableFsync: true,
164-
ZeroCopy: true,
165-
KeepRecent: uint64(config.KeepRecent),
166-
PruneInterval: 300 * time.Second,
167-
},
168-
)
169-
database.streamHandler = streamHandler
170-
go database.writeAsyncInBackground()
171-
}
155+
if config.KeepRecent < 0 {
156+
return nil, errors.New("KeepRecent must be non-negative")
157+
}
158+
streamHandler, _ := changelog.NewStream(
159+
logger.NewNopLogger(),
160+
utils.GetChangelogPath(dataDir),
161+
changelog.Config{
162+
DisableFsync: true,
163+
ZeroCopy: true,
164+
KeepRecent: uint64(config.KeepRecent),
165+
PruneInterval: time.Duration(config.PruneIntervalSeconds) * time.Second,
166+
},
167+
)
168+
database.streamHandler = streamHandler
169+
go database.writeAsyncInBackground()
170+
172171
return database, nil
173172
}
174173

175174
func (db *Database) Close() error {
175+
// First, stop accepting new pending changes and drain the worker
176+
close(db.pendingChanges)
177+
// Wait for the async writes to finish
178+
db.asyncWriteWG.Wait()
179+
// Now close the WAL stream
176180
if db.streamHandler != nil {
177181
_ = db.streamHandler.Close()
178182
db.streamHandler = nil
179-
close(db.pendingChanges)
180183
}
181-
// Wait for the async writes to finish
182-
db.asyncWriteWG.Wait()
183184
err := db.storage.Close()
184185
db.storage = nil
185186
return err

ss/pebbledb/hash_test.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,11 @@ func setupTestDB(t *testing.T) (*Database, string) {
2222

2323
// Set up config with hash range enabled
2424
cfg := config.StateStoreConfig{
25-
HashRange: 10, // 10 blocks per hash range
26-
AsyncWriteBuffer: 100,
27-
KeepRecent: 100,
28-
KeepLastVersion: true,
29-
ImportNumWorkers: 4,
30-
DedicatedChangelog: false,
25+
HashRange: 10, // 10 blocks per hash range
26+
AsyncWriteBuffer: 100,
27+
KeepRecent: 100,
28+
KeepLastVersion: true,
29+
ImportNumWorkers: 4,
3130
}
3231

3332
db, err := New(tempDir, cfg)

ss/store.go

Lines changed: 20 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -41,17 +41,16 @@ func NewStateStore(logger logger.Logger, homeDir string, ssConfig config.StateSt
4141
if err != nil {
4242
return nil, err
4343
}
44+
4445
// Handle auto recovery for DB running with async mode
45-
if ssConfig.DedicatedChangelog {
46-
changelogPath := utils.GetChangelogPath(utils.GetStateStorePath(homeDir, ssConfig.Backend))
47-
if ssConfig.DBDirectory != "" {
48-
changelogPath = utils.GetChangelogPath(ssConfig.DBDirectory)
49-
}
50-
err := RecoverStateStore(logger, changelogPath, stateStore)
51-
if err != nil {
52-
return nil, err
53-
}
46+
changelogPath := utils.GetChangelogPath(utils.GetStateStorePath(homeDir, ssConfig.Backend))
47+
if ssConfig.DBDirectory != "" {
48+
changelogPath = utils.GetChangelogPath(ssConfig.DBDirectory)
49+
}
50+
if err := RecoverStateStore(logger, changelogPath, stateStore); err != nil {
51+
return nil, err
5452
}
53+
5554
// Start the pruning manager for DB
5655
pruningManager := pruning.NewPruningManager(logger, stateStore, int64(ssConfig.KeepRecent), int64(ssConfig.PruneIntervalSeconds))
5756
pruningManager.Start()
@@ -62,9 +61,6 @@ func NewStateStore(logger logger.Logger, homeDir string, ssConfig config.StateSt
6261
func RecoverStateStore(logger logger.Logger, changelogPath string, stateStore types.StateStore) error {
6362
ssLatestVersion := stateStore.GetLatestVersion()
6463
logger.Info(fmt.Sprintf("Recovering from changelog %s with latest SS version %d", changelogPath, ssLatestVersion))
65-
if ssLatestVersion <= 0 {
66-
return nil
67-
}
6864
streamHandler, err := changelog.NewStream(logger, changelogPath, changelog.Config{})
6965
if err != nil {
7066
return err
@@ -84,15 +80,20 @@ func RecoverStateStore(logger logger.Logger, changelogPath string, stateStore ty
8480
// Look backward to find where we should start replay from
8581
curVersion := lastEntry.Version
8682
curOffset := lastOffset
87-
for curVersion > ssLatestVersion && curOffset > firstOffset {
88-
curOffset--
89-
curEntry, errRead := streamHandler.ReadAt(curOffset)
90-
if errRead != nil {
91-
return err
83+
if ssLatestVersion > 0 {
84+
for curVersion > ssLatestVersion && curOffset > firstOffset {
85+
curOffset--
86+
curEntry, errRead := streamHandler.ReadAt(curOffset)
87+
if errRead != nil {
88+
return err
89+
}
90+
curVersion = curEntry.Version
9291
}
93-
curVersion = curEntry.Version
92+
} else {
93+
// Fresh store (or no applied versions) – start from the first offset
94+
curOffset = firstOffset
9495
}
95-
// Replay from the offset where the offset where the version is larger than SS store latest version
96+
// Replay from the offset where the version is larger than SS store latest version
9697
targetStartOffset := curOffset
9798
logger.Info(fmt.Sprintf("Start replaying changelog to recover StateStore from offset %d to %d", targetStartOffset, lastOffset))
9899
if targetStartOffset < lastOffset {

ss/store_test.go

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,15 @@ import (
1515

1616
func TestNewStateStore(t *testing.T) {
1717
tempDir := os.TempDir()
18-
homeDir := filepath.Join(tempDir, "seidb")
18+
homeDir := filepath.Join(tempDir, "pebbledb")
1919
ssConfig := config.StateStoreConfig{
20-
DedicatedChangelog: true,
21-
Backend: string(PebbleDBBackend),
22-
AsyncWriteBuffer: 50,
23-
KeepRecent: 500,
20+
Backend: string(PebbleDBBackend),
21+
AsyncWriteBuffer: 100,
22+
KeepRecent: 500,
2423
}
2524
stateStore, err := NewStateStore(logger.NewNopLogger(), homeDir, ssConfig)
2625
require.NoError(t, err)
27-
for i := 1; i < 20; i++ {
26+
for i := 1; i < 50; i++ {
2827
var changesets []*proto.NamedChangeSet
2928
kvPair := &iavl.KVPair{
3029
Delete: false,
@@ -51,7 +50,7 @@ func TestNewStateStore(t *testing.T) {
5150
require.NoError(t, err)
5251

5352
// Make sure key and values can be found
54-
for i := 1; i < 20; i++ {
53+
for i := 1; i < 50; i++ {
5554
value, err := stateStore.Get("storeA", int64(i), []byte(fmt.Sprintf("key%d", i)))
5655
require.NoError(t, err)
5756
require.Equal(t, fmt.Sprintf("value%d", i), string(value))

0 commit comments

Comments
 (0)