From d334511d6a73027354f13ba1d743a27e55723ecf Mon Sep 17 00:00:00 2001 From: bang <3656828039@qq.com> Date: Mon, 1 Jun 2026 19:05:12 +0800 Subject: [PATCH] =?UTF-8?q?refactor(storage):=20=E5=88=A0=E9=99=A4?= =?UTF-8?q?=E6=9C=AA=E6=8E=A5=E5=85=A5=E5=86=99=E8=B7=AF=E5=BE=84=E7=9A=84?= =?UTF-8?q?=20storage=20WAL?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit storage/zstorage/WAL.go 只在恢复时被 Read, Write 仅测试调用—— 实际写路径 Engine.Put→MemTable.Put→skiplist.insert 从不落 WAL, 持久性由 Raft 日志提供。移除该死代码及 IWal 接口、wal_test, 并清理 MemTable 中 wal 字段/recoverFromWAL/Sync/Clear 及 Flush 内 的 WAL 清除调用; Close 保留 stopCh 关闭。 Co-Authored-By: Claude Opus 4.8 (1M context) --- storage/istorage/IWal.go | 9 -- storage/zstorage/WAL.go | 182 ----------------------------------- storage/zstorage/memtable.go | 41 +------- storage/zstorage/wal_test.go | 126 ------------------------ 4 files changed, 2 insertions(+), 356 deletions(-) delete mode 100644 storage/istorage/IWal.go delete mode 100644 storage/zstorage/WAL.go delete mode 100644 storage/zstorage/wal_test.go diff --git a/storage/istorage/IWal.go b/storage/istorage/IWal.go deleted file mode 100644 index 50709d8..0000000 --- a/storage/istorage/IWal.go +++ /dev/null @@ -1,9 +0,0 @@ -package istorage - -type IWal interface { - Write(entry LogEntry) error - Read() ([]LogEntry, error) - Close() error - Sync() error - Clear() error -} diff --git a/storage/zstorage/WAL.go b/storage/zstorage/WAL.go deleted file mode 100644 index 491fa39..0000000 --- a/storage/zstorage/WAL.go +++ /dev/null @@ -1,182 +0,0 @@ -package zstorage - -import ( - "encoding/binary" - "errors" - "hash/crc32" - "io" - "log/slog" - "os" - "sync" - - "github.com/NeverENG/BanDB/config" - "github.com/NeverENG/BanDB/storage/istorage" -) - -const headerLength = 12 - -var _ istorage.IWal = &WAL{} - -type WAL struct { - mu sync.Mutex - file *os.File - headerBuf [headerLength]byte -} - -func NewWAL() *WAL { - file, err := os.OpenFile(config.G.WALPath, os.O_APPEND|os.O_RDWR|os.O_CREATE, 0644) - if err != nil { - slog.Warn("cannot open WAL, running in disabled mode", "path", config.G.WALPath, "error", err) - return &WAL{file: nil} - } - slog.Info("WAL opened", "path", config.G.WALPath) - return &WAL{file: file} -} - -func (w *WAL) Write(entry istorage.LogEntry) error { - w.mu.Lock() - defer w.mu.Unlock() - - if w.file == nil { - return nil - } - - hasher := crc32.NewIEEE() - hasher.Write(entry.Key) - hasher.Write(entry.Value) - crc := hasher.Sum32() - - binary.BigEndian.PutUint32(w.headerBuf[:], crc) - binary.BigEndian.PutUint32(w.headerBuf[4:], uint32(len(entry.Key))) - binary.BigEndian.PutUint32(w.headerBuf[8:], uint32(len(entry.Value))) - - if _, err := w.file.Write(w.headerBuf[:]); err != nil { - slog.Error("write WAL header failed", "error", err) - return err - } - if _, err := w.file.Write(entry.Key); err != nil { - slog.Error("write WAL key failed", "error", err) - return err - } - if _, err := w.file.Write(entry.Value); err != nil { - slog.Error("write WAL value failed", "error", err) - return err - } - - return w.file.Sync() -} - -func (w *WAL) Read() ([]istorage.LogEntry, error) { - w.mu.Lock() - defer w.mu.Unlock() - - if w.file == nil { - return nil, nil - } - - if _, err := w.file.Seek(0, io.SeekStart); err != nil { - return nil, err - } - - entries := make([]istorage.LogEntry, 0) - - for { - header := make([]byte, headerLength) - _, err := io.ReadFull(w.file, header) - if err != nil { - if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) || errors.Is(err, os.ErrClosed) { - break - } - return entries, err - } - - crc := binary.BigEndian.Uint32(header[:]) - keyLen := binary.BigEndian.Uint32(header[4:]) - valueLen := binary.BigEndian.Uint32(header[8:]) - - key := make([]byte, keyLen) - if _, err := io.ReadFull(w.file, key); err != nil { - if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) { - break - } - return entries, err - } - - value := make([]byte, valueLen) - if _, err := io.ReadFull(w.file, value); err != nil { - if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) { - break - } - return entries, err - } - - hasher := crc32.NewIEEE() - hasher.Write(key) - hasher.Write(value) - if crc != hasher.Sum32() { - slog.Error("WAL data corruption detected") - return entries, errors.New("data corruption detected") - } - - entries = append(entries, istorage.LogEntry{Key: key, Value: value}) - } - - return entries, nil -} - -func (w *WAL) Close() error { - w.mu.Lock() - defer w.mu.Unlock() - - if w.file == nil { - return nil - } - return w.file.Close() -} - -func (w *WAL) Sync() error { - w.mu.Lock() - defer w.mu.Unlock() - - if w.file == nil { - return nil - } - return w.file.Sync() -} - -func (w *WAL) Clear() error { - w.mu.Lock() - defer w.mu.Unlock() - - if w.file == nil { - // WAL disabled mode: try to remove old file by path, then reopen - if err := os.Remove(config.G.WALPath); err != nil && !os.IsNotExist(err) { - return err - } - f, err := os.OpenFile(config.G.WALPath, os.O_APPEND|os.O_RDWR|os.O_CREATE, 0644) - if err != nil { - slog.Warn("WAL disabled, cannot reopen after clear", "error", err) - return nil - } - w.file = f - return nil - } - - path := w.file.Name() - if err := w.file.Close(); err != nil { - slog.Error("close WAL before clear failed", "error", err) - return err - } - - if err := os.Remove(path); err != nil && !os.IsNotExist(err) { - return err - } - - f, err := os.OpenFile(path, os.O_APPEND|os.O_RDWR|os.O_CREATE, 0644) - if err != nil { - slog.Error("reopen WAL after clear failed", "error", err) - return err - } - w.file = f - return w.file.Sync() -} diff --git a/storage/zstorage/memtable.go b/storage/zstorage/memtable.go index bc7187a..730e7da 100644 --- a/storage/zstorage/memtable.go +++ b/storage/zstorage/memtable.go @@ -39,7 +39,6 @@ type MemTable struct { compactCh chan bool stopCh chan struct{} - wal *WAL sst *SSTable } @@ -64,14 +63,12 @@ func NewMemTable() *MemTable { FlushChan: make(chan bool, 1), compactCh: make(chan bool, 1), stopCh: make(chan struct{}), - wal: NewWAL(), sst: NewSSTable(), } go mt.FlushWorker() go mt.ListenCompactCh() go mt.sst.LoadSSTableMetaList() - mt.recoverFromWAL() return mt } @@ -267,38 +264,9 @@ func (sl *SkipList) delete(key []byte) bool { return true } -func (m *MemTable) recoverFromWAL() { - entries, err := m.wal.Read() - if err != nil { - slog.Warn("failed to read WAL", "error", err) - return - } - - if len(entries) == 0 { - return - } - - slog.Info("recovering from WAL", "entries", len(entries)) - - for _, entry := range entries { - // Value==nil 为墓碑,按墓碑插入而非物理删除:恢复后仍需 shadow SSTable 旧值。 - m.active.insert(entry.Key, entry.Value) - } - - slog.Info("WAL recovery completed", "memtableSize", m.active.size) -} - -func (m *MemTable) Sync() error { - return m.wal.Sync() -} - -func (m *MemTable) Clear() error { - return m.wal.Clear() -} - func (m *MemTable) Close() error { close(m.stopCh) - return m.wal.Close() + return nil } func (m *MemTable) StartFlush() { @@ -313,7 +281,7 @@ func (m *MemTable) StartFlush() { // 1. 持锁交换 active → dirty(active 变为 dirty 的不可变快照) // 2. 创建新的空 active 表用于接受后续写入 // 3. 释放锁,在锁外将 dirty 数据写入 SSTable -// 4. 刷盘完成后清除 WAL,将 dirty 置 nil +// 4. 刷盘完成后将 dirty 置 nil func (m *MemTable) Flush() { // 步骤 1-2: 持锁进行交换(快速操作) m.mu.Lock() @@ -336,11 +304,6 @@ func (m *MemTable) Flush() { return } - err = m.Clear() - if err != nil { - slog.Error("WAL clear error", "error", err) - } - m.mu.Lock() m.dirty = nil m.mu.Unlock() diff --git a/storage/zstorage/wal_test.go b/storage/zstorage/wal_test.go deleted file mode 100644 index 1bf44e9..0000000 --- a/storage/zstorage/wal_test.go +++ /dev/null @@ -1,126 +0,0 @@ -package zstorage - -import ( - "os" - "testing" - - "github.com/NeverENG/BanDB/config" - "github.com/NeverENG/BanDB/storage/istorage" -) - -func setupTestWAL(t *testing.T) *WAL { - oldPath := config.G.WALPath - config.G.WALPath = "test_wal.log" - wal := NewWAL() - if wal == nil { - t.Skip("WAL initialization failed, skipping test") - } - t.Cleanup(func() { - wal.Close() - os.Remove("test_wal.log") - config.G.WALPath = oldPath - }) - return wal -} - -func TestWAL_WriteAndRead(t *testing.T) { - wal := setupTestWAL(t) - - err := wal.Write(istorage.LogEntry{Key: []byte("key1"), Value: []byte("value1")}) - if err != nil { - t.Fatalf("WAL Write failed: %v", err) - } - - readEntries, err := wal.Read() - if err != nil { - t.Fatalf("WAL Read failed: %v", err) - } - - if len(readEntries) != 1 { - t.Fatalf("Expected 1 entry, got %d", len(readEntries)) - } - if string(readEntries[0].Key) != "key1" || string(readEntries[0].Value) != "value1" { - t.Errorf("Entry mismatch: got (%s, %s)", readEntries[0].Key, readEntries[0].Value) - } -} - -func TestWAL_WriteMultipleEntries(t *testing.T) { - wal := setupTestWAL(t) - - entries := []istorage.LogEntry{ - {Key: []byte("key1"), Value: []byte("value1")}, - {Key: []byte("key2"), Value: []byte("value2")}, - {Key: []byte("key3"), Value: []byte("value3")}, - } - - for _, entry := range entries { - if err := wal.Write(entry); err != nil { - t.Fatalf("WAL Write failed: %v", err) - } - } - - readEntries, err := wal.Read() - if err != nil { - t.Fatalf("WAL Read failed: %v", err) - } - - if len(readEntries) != 3 { - t.Fatalf("Expected 3 entries, got %d", len(readEntries)) - } - - for i, entry := range entries { - if string(readEntries[i].Key) != string(entry.Key) { - t.Errorf("Entry %d key mismatch: expected %s, got %s", i, entry.Key, readEntries[i].Key) - } - if string(readEntries[i].Value) != string(entry.Value) { - t.Errorf("Entry %d value mismatch: expected %s, got %s", i, entry.Value, readEntries[i].Value) - } - } -} - -func TestWAL_Clear(t *testing.T) { - wal := setupTestWAL(t) - - err := wal.Write(istorage.LogEntry{Key: []byte("key1"), Value: []byte("value1")}) - if err != nil { - t.Fatalf("WAL Write failed: %v", err) - } - - err = wal.Clear() - if err != nil { - t.Fatalf("WAL Clear failed: %v", err) - } - - // 直接使用原来的 wal 实例验证清空结果(Clear 已经重新打开了文件) - readEntries, err := wal.Read() - if err != nil { - t.Fatalf("WAL Read failed: %v", err) - } - - if len(readEntries) != 0 { - t.Errorf("Expected 0 entries after clear, got %d", len(readEntries)) - } -} - -func TestWAL_Sync(t *testing.T) { - wal := setupTestWAL(t) - - err := wal.Write(istorage.LogEntry{Key: []byte("key1"), Value: []byte("value1")}) - if err != nil { - t.Fatalf("WAL Write failed: %v", err) - } - - err = wal.Sync() - if err != nil { - t.Fatalf("WAL Sync failed: %v", err) - } -} - -func TestWAL_Close(t *testing.T) { - wal := setupTestWAL(t) - - err := wal.Close() - if err != nil { - t.Fatalf("WAL Close failed: %v", err) - } -}