@@ -19,6 +19,7 @@ import (
1919 "github.com/sei-protocol/sei-db/config"
2020 "github.com/sei-protocol/sei-db/proto"
2121 "github.com/sei-protocol/sei-db/ss/types"
22+ "github.com/sei-protocol/sei-db/ss/util"
2223 "github.com/sei-protocol/sei-db/stream/changelog"
2324 "golang.org/x/exp/slices"
2425)
@@ -34,11 +35,16 @@ const (
3435 earliestVersionKey = "s/_earliest"
3536 latestMigratedKeyMetadata = "s/_latestMigratedKey"
3637 latestMigratedModuleMetadata = "s/_latestMigratedModule"
38+ lastRangeHashKey = "s/_hash:latestRange"
3739 tombstoneVal = "TOMBSTONE"
3840
3941 // TODO: Make configurable
4042 ImportCommitBatchSize = 10000
4143 PruneCommitBatchSize = 50
44+ DeleteCommitBatchSize = 50
45+
46+ // Number of workers to use for hash computation
47+ HashComputationWorkers = 10
4248)
4349
4450var (
@@ -63,6 +69,11 @@ type Database struct {
6369
6470 // Pending changes to be written to the DB
6571 pendingChanges chan VersionedChangesets
72+
73+ // Cache for last range hashed
74+ lastRangeHashedCache int64
75+ lastRangeHashedMu sync.RWMutex
76+ hashComputationMu sync.Mutex
6677}
6778
6879type VersionedChangesets struct {
@@ -120,6 +131,14 @@ func New(dataDir string, config config.StateStoreConfig) (*Database, error) {
120131 earliestVersion : earliestVersion ,
121132 pendingChanges : make (chan VersionedChangesets , config .AsyncWriteBuffer ),
122133 }
134+
135+ // Initialize the lastRangeHashed cache
136+ lastHashed , err := retrieveLastRangeHashed (db )
137+ if err != nil {
138+ return nil , fmt .Errorf ("failed to retrieve last range hashed: %w" , err )
139+ }
140+ database .lastRangeHashedCache = lastHashed
141+
123142 if config .DedicatedChangelog {
124143 streamHandler , _ := changelog .NewStream (
125144 logger .NewNopLogger (),
@@ -196,6 +215,28 @@ func (db *Database) GetEarliestVersion() (int64, error) {
196215 return db .earliestVersion , nil
197216}
198217
218+ func (db * Database ) SetLastRangeHashed (latestHashed int64 ) error {
219+ var ts [VersionSize ]byte
220+ binary .LittleEndian .PutUint64 (ts [:], uint64 (latestHashed ))
221+
222+ // Update the cache first
223+ db .lastRangeHashedMu .Lock ()
224+ db .lastRangeHashedCache = latestHashed
225+ db .lastRangeHashedMu .Unlock ()
226+
227+ return db .storage .Set ([]byte (lastRangeHashKey ), ts [:], defaultWriteOpts )
228+ }
229+
230+ // GetLastRangeHashed returns the highest block that has been fully hashed in ranges.
231+ func (db * Database ) GetLastRangeHashed () (int64 , error ) {
232+ // Return the cached value
233+ db .lastRangeHashedMu .RLock ()
234+ cachedValue := db .lastRangeHashedCache
235+ db .lastRangeHashedMu .RUnlock ()
236+
237+ return cachedValue , nil
238+ }
239+
199240// Retrieves earliest version from db
200241func retrieveEarliestVersion (db * pebble.DB ) (int64 , error ) {
201242 bz , closer , err := db .Get ([]byte (earliestVersionKey ))
@@ -353,6 +394,114 @@ func (db *Database) ApplyChangesetAsync(version int64, changesets []*proto.Named
353394 Version : version ,
354395 Changesets : changesets ,
355396 }
397+
398+ if db .config .HashRange > 0 {
399+ go func (ver int64 ) {
400+ // Try to acquire lock, return immediately if already locked
401+ if ! db .hashComputationMu .TryLock () {
402+ return
403+ }
404+ defer db .hashComputationMu .Unlock ()
405+
406+ if err := db .computeMissingRanges (ver ); err != nil {
407+ fmt .Printf ("maybeComputeMissingRanges error: %v\n " , err )
408+ }
409+ }(version )
410+ }
411+
412+ return nil
413+ }
414+
415+ func (db * Database ) computeMissingRanges (latestVersion int64 ) error {
416+ lastHashed , err := db .GetLastRangeHashed ()
417+ if err != nil {
418+ return fmt .Errorf ("failed to get last hashed range: %w" , err )
419+ }
420+
421+ // Keep filling full chunks until we can't
422+ for {
423+ nextTarget := lastHashed + db .config .HashRange
424+
425+ // If we haven't reached the next full chunk boundary yet, stop.
426+ // We do NOT do partial chunks.
427+ if nextTarget > latestVersion {
428+ break
429+ }
430+
431+ // We have a full chunk from (lastHashed+1) .. nextTarget
432+ begin := lastHashed + 1
433+ end := nextTarget
434+ if err := db .computeHashForRange (begin , end ); err != nil {
435+ return err
436+ }
437+
438+ // Mark that we've completed that chunk
439+ lastHashed = end
440+ if err := db .SetLastRangeHashed (lastHashed ); err != nil {
441+ return err
442+ }
443+ }
444+
445+ return nil
446+ }
447+
448+ func (db * Database ) computeHashForRange (beginBlock , endBlock int64 ) error {
449+ chunkSize := endBlock - beginBlock + 1
450+ if chunkSize <= 0 {
451+ // Nothing to do
452+ return nil
453+ }
454+
455+ // Use constant number of workers
456+ numOfWorkers := HashComputationWorkers
457+
458+ // Calculate blocks per worker
459+ blocksPerWorker := chunkSize / int64 (numOfWorkers )
460+ if blocksPerWorker < 1 {
461+ blocksPerWorker = 1
462+ }
463+
464+ for _ , moduleName := range util .Modules {
465+ dataCh := make (chan types.RawSnapshotNode , 10_000 )
466+
467+ hashCalculator := util .NewXorHashCalculator (blocksPerWorker , numOfWorkers , dataCh )
468+
469+ go func (mod string ) {
470+ defer close (dataCh )
471+
472+ _ , err := db .RawIterate (mod , func (key , value []byte , ver int64 ) bool {
473+ // Only feed data whose version is in [beginBlock..endBlock]
474+ if ver >= beginBlock && ver <= endBlock {
475+ dataCh <- types.RawSnapshotNode {
476+ StoreKey : mod ,
477+ Key : key ,
478+ Value : value ,
479+ Version : ver - beginBlock ,
480+ }
481+ }
482+ return false
483+ })
484+ if err != nil {
485+ panic (fmt .Errorf ("error scanning module %s: %w" , mod , err ))
486+ }
487+ }(moduleName )
488+
489+ allHashes := hashCalculator .ComputeHashes ()
490+ if len (allHashes ) == 0 {
491+ continue
492+ }
493+
494+ finalHash := allHashes [len (allHashes )- 1 ]
495+
496+ if err := db .WriteBlockRangeHash (moduleName , beginBlock , endBlock , finalHash ); err != nil {
497+ return fmt .Errorf (
498+ "failed to write block-range hash for module %q in [%d..%d]: %w" ,
499+ moduleName , beginBlock , endBlock , err ,
500+ )
501+ }
502+
503+ }
504+
356505 return nil
357506}
358507
@@ -671,6 +820,7 @@ func (db *Database) RawImport(ch <-chan types.RawSnapshotNode) error {
671820func (db * Database ) RawIterate (storeKey string , fn func (key []byte , value []byte , version int64 ) bool ) (bool , error ) {
672821 // Iterate through all keys and values for a store
673822 lowerBound := MVCCEncode (prependStoreKey (storeKey , nil ), 0 )
823+ prefix := storePrefix (storeKey )
674824
675825 itr , err := db .storage .NewIter (& pebble.IterOptions {LowerBound : lowerBound })
676826 if err != nil {
@@ -693,10 +843,13 @@ func (db *Database) RawIterate(storeKey string, fn func(key []byte, value []byte
693843 }
694844
695845 // Only iterate through module
696- if storeKey != "" && ! bytes .HasPrefix (currKey , storePrefix ( storeKey ) ) {
846+ if storeKey != "" && ! bytes .HasPrefix (currKey , prefix ) {
697847 break
698848 }
699849
850+ // Parse prefix out of the key
851+ parsedKey := currKey [len (prefix ):]
852+
700853 currVersionDecoded , err := decodeUint64Ascending (currVersion )
701854 if err != nil {
702855 return false , err
@@ -713,7 +866,7 @@ func (db *Database) RawIterate(storeKey string, fn func(key []byte, value []byte
713866 }
714867
715868 // Call callback fn
716- if fn (currKey , valBz , currVersionDecoded ) {
869+ if fn (parsedKey , valBz , currVersionDecoded ) {
717870 return true , nil
718871 }
719872
@@ -722,6 +875,50 @@ func (db *Database) RawIterate(storeKey string, fn func(key []byte, value []byte
722875 return false , nil
723876}
724877
878+ func (db * Database ) DeleteKeysAtVersion (module string , version int64 ) error {
879+
880+ batch , err := NewBatch (db .storage , version )
881+ if err != nil {
882+ return fmt .Errorf ("failed to create deletion batch for module %q: %w" , module , err )
883+ }
884+
885+ deleteCounter := 0
886+
887+ _ , err = db .RawIterate (module , func (key , value []byte , ver int64 ) bool {
888+ if ver == version {
889+ if err := batch .HardDelete (module , key ); err != nil {
890+ fmt .Printf ("Error physically deleting key %q in module %q: %v\n " , key , module , err )
891+ return true // stop iteration on error
892+ }
893+ deleteCounter ++
894+ if deleteCounter >= DeleteCommitBatchSize {
895+ if err := batch .Write (); err != nil {
896+ fmt .Printf ("Error writing deletion batch for module %q: %v\n " , module , err )
897+ return true
898+ }
899+ deleteCounter = 0
900+ batch , err = NewBatch (db .storage , version )
901+ if err != nil {
902+ fmt .Printf ("Error creating a new deletion batch for module %q: %v\n " , module , err )
903+ return true
904+ }
905+ }
906+ }
907+ return false
908+ })
909+ if err != nil {
910+ return fmt .Errorf ("error iterating module %q for deletion: %w" , module , err )
911+ }
912+
913+ // Commit any remaining deletions.
914+ if batch .Size () > 0 {
915+ if err := batch .Write (); err != nil {
916+ return fmt .Errorf ("error writing final deletion batch for module %q: %w" , module , err )
917+ }
918+ }
919+ return nil
920+ }
921+
725922func storePrefix (storeKey string ) []byte {
726923 return []byte (fmt .Sprintf (StorePrefixTpl , storeKey ))
727924}
@@ -818,3 +1015,20 @@ func (db *Database) WriteBlockRangeHash(storeKey string, beginBlockRange, endBlo
8181015 }
8191016 return nil
8201017}
1018+
1019+ func retrieveLastRangeHashed (db * pebble.DB ) (int64 , error ) {
1020+ bz , closer , err := db .Get ([]byte (lastRangeHashKey ))
1021+ if err != nil {
1022+ if errors .Is (err , pebble .ErrNotFound ) {
1023+ // means we haven't hashed anything yet
1024+ return 0 , nil
1025+ }
1026+ return 0 , err
1027+ }
1028+ defer closer .Close ()
1029+
1030+ if len (bz ) == 0 {
1031+ return 0 , nil
1032+ }
1033+ return int64 (binary .LittleEndian .Uint64 (bz )), nil
1034+ }
0 commit comments