66 "fmt"
77 "os"
88 "path/filepath"
9+ "runtime"
910 "sort"
1011 "strconv"
1112 "strings"
@@ -17,7 +18,6 @@ import (
1718 errorutils "github.com/sei-protocol/sei-db/common/errors"
1819 "github.com/sei-protocol/sei-db/common/logger"
1920 "github.com/sei-protocol/sei-db/common/utils"
20- "github.com/sei-protocol/sei-db/config"
2121 "github.com/sei-protocol/sei-db/proto"
2222 "github.com/sei-protocol/sei-db/stream/changelog"
2323 "github.com/sei-protocol/sei-db/stream/types"
@@ -209,8 +209,10 @@ func OpenDB(logger logger.Logger, targetVersion int64, opts Options) (*DB, error
209209 return nil , errorutils .Join (err , db .Close ())
210210 }
211211 }
212- if db .streamHandler == nil {
213- fmt .Println ("[Debug] DB steam handler is nil??" )
212+
213+ // We need to prune snapshots during start up to avoid snapshot leaks
214+ if ! db .readOnly {
215+ db .pruneSnapshots ()
214216 }
215217 return db , nil
216218}
@@ -617,7 +619,8 @@ func (db *DB) Close() error {
617619 db .mtx .Lock ()
618620 defer db .mtx .Unlock ()
619621 errs := []error {}
620-
622+ db .pruneSnapshotLock .Lock ()
623+ defer db .pruneSnapshotLock .Unlock ()
621624 // Close stream handler
622625 if db .streamHandler != nil {
623626 err := db .streamHandler .Close ()
@@ -801,7 +804,8 @@ func initEmptyDB(dir string, initialVersion uint32) error {
801804 tmp := NewEmptyMultiTree (initialVersion , 0 )
802805 snapshotDir := snapshotName (0 )
803806 // create tmp worker pool
804- pool := pond .New (config .DefaultSnapshotWriterLimit , config .DefaultSnapshotWriterLimit * 10 )
807+ concurrency := runtime .NumCPU ()
808+ pool := pond .New (concurrency , concurrency * 10 )
805809 defer pool .Stop ()
806810
807811 if err := tmp .WriteSnapshot (context .Background (), filepath .Join (dir , snapshotDir ), pool ); err != nil {
0 commit comments