Skip to content

Commit 04ba61b

Browse files
fix: data race in Close/signalEviction, persistence flush on StopPersistence
- atomic.Bool guard prevents send on closed evictSignal channel - StopPersistence drains AOF channel and flushes buffered writes before stopping - Added Flush() to PersistenceEngine interface and HybridEngine - All 9 scenario tests pass with -race
1 parent 3fd5061 commit 04ba61b

3 files changed

Lines changed: 53 additions & 1 deletion

File tree

internal/persistence/hybrid_engine.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@ type PersistenceEngine interface {
2222
Start(ctx context.Context) error
2323
Stop() error
2424

25+
// Flush ensures all buffered writes are persisted to disk
26+
Flush() error
27+
2528
// Maintenance
2629
Compact() error
2730
GetStats() *PersistenceStats
@@ -451,3 +454,21 @@ func (he *HybridEngine) Compact() error {
451454
// This will be implemented when we have cache data access
452455
return fmt.Errorf("compaction requires cache integration")
453456
}
457+
458+
// Flush ensures all buffered AOF writes are persisted to disk
459+
func (he *HybridEngine) Flush() error {
460+
if he.aofManager == nil {
461+
return nil
462+
}
463+
he.aofManager.mu.Lock()
464+
defer he.aofManager.mu.Unlock()
465+
if he.aofManager.writer != nil {
466+
if err := he.aofManager.writer.Flush(); err != nil {
467+
return err
468+
}
469+
if he.aofManager.currentLog != nil {
470+
return he.aofManager.currentLog.Sync()
471+
}
472+
}
473+
return nil
474+
}

internal/storage/basic_store.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"fmt"
88
"reflect"
99
"sync"
10+
"sync/atomic"
1011
"time"
1112
"unsafe"
1213

@@ -100,6 +101,7 @@ type BasicStore struct {
100101
// Background eviction
101102
evictSignal chan struct{} // Signal background evictor to run
102103
evictDone chan struct{} // Closed when background evictor exits
104+
closing atomic.Bool // Set to true during Close() to prevent sends on closed channels
103105

104106
// Background AOF
105107
aofChan chan *persistence.LogEntry // Buffered channel for async AOF writes
@@ -600,6 +602,9 @@ func (s *BasicStore) Delete(key string) error {
600602

601603
// signalEviction sends a non-blocking signal to the background evictor
602604
func (s *BasicStore) signalEviction() {
605+
if s.closing.Load() {
606+
return // Store is shutting down, don't send on closed channel
607+
}
603608
select {
604609
case s.evictSignal <- struct{}{}:
605610
default:
@@ -740,6 +745,9 @@ func (s *BasicStore) IsTombstoned(key string) bool {
740745

741746
// Close shuts down the store and cleans up resources
742747
func (s *BasicStore) Close() error {
748+
// Mark as closing to prevent signalEviction from sending on closed channel
749+
s.closing.Store(true)
750+
743751
// Stop cleanup goroutine and background evictor
744752
select {
745753
case s.stopCleanup <- true:
@@ -754,6 +762,11 @@ func (s *BasicStore) Close() error {
754762
close(s.aofChan)
755763
<-s.aofDone
756764

765+
// Flush persistence engine to ensure all buffered writes hit disk
766+
if s.persistEngine != nil {
767+
_ = s.persistEngine.Flush()
768+
}
769+
757770
// Clear all items
758771
_ = s.Clear()
759772

internal/storage/persistence_integration.go

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,30 @@ func (s *BasicStore) getSnapshotData() map[string]interface{} {
4747
return data
4848
}
4949

50-
// StopPersistence gracefully stops the persistence engine
50+
// StopPersistence gracefully stops the persistence engine.
51+
// Drains the background AOF channel and flushes buffered writes before stopping.
5152
func (s *BasicStore) StopPersistence() error {
5253
if s.persistEngine == nil {
5354
return nil
5455
}
5556

57+
// Drain any pending AOF channel entries to the persistence engine
58+
for {
59+
select {
60+
case entry, ok := <-s.aofChan:
61+
if !ok {
62+
goto done
63+
}
64+
_ = s.persistEngine.WriteEntry(entry)
65+
default:
66+
goto done
67+
}
68+
}
69+
done:
70+
71+
// Flush buffered writes to disk
72+
_ = s.persistEngine.Flush()
73+
5674
return s.persistEngine.Stop()
5775
}
5876

0 commit comments

Comments
 (0)