diff --git a/Raft/raft_wal.go b/Raft/raft_wal.go index a7257b7..dee5163 100644 --- a/Raft/raft_wal.go +++ b/Raft/raft_wal.go @@ -130,7 +130,8 @@ func (w *RaftWAL) LoadState() (int64, int64, error) { return term, votedFor, nil } -func (w *RaftWAL) AppendLog(entry LogEntry) error { +// writeEntry 写入单条日志的字节,但不 fsync。供 AppendLog 与 AppendLogs 复用。 +func (w *RaftWAL) writeEntry(entry LogEntry) error { if err := binary.Write(w.file, binary.BigEndian, int64(entry.Index)); err != nil { return err } @@ -143,7 +144,27 @@ func (w *RaftWAL) AppendLog(entry LogEntry) error { if _, err := w.file.Write(entry.Command); err != nil { return err } + return nil +} + +func (w *RaftWAL) AppendLog(entry LogEntry) error { + if err := w.writeEntry(entry); err != nil { + return err + } + return w.file.Sync() +} +// AppendLogs 批量追加日志:全部写完后只 fsync 一次(group commit), +// 将整批 N 条的 N 次 fsync 摊销为 1 次。空批次为 no-op。 +func (w *RaftWAL) AppendLogs(entries []LogEntry) error { + if len(entries) == 0 { + return nil + } + for _, entry := range entries { + if err := w.writeEntry(entry); err != nil { + return err + } + } return w.file.Sync() } @@ -372,10 +393,8 @@ func (w *RaftWAL) TruncateLogs(lastIncludedIndex int64) error { } w.file = f - for _, log := range remainingLogs { - if err := w.AppendLog(log); err != nil { - return err - } + if err := w.AppendLogs(remainingLogs); err != nil { + return err } return nil @@ -391,10 +410,8 @@ func (w *RaftWAL) RebuildLogFile(entries []LogEntry) error { } w.file = f - for _, entry := range entries { - if err := w.AppendLog(entry); err != nil { - return fmt.Errorf("failed to append log entry: %w", err) - } + if err := w.AppendLogs(entries); err != nil { + return fmt.Errorf("failed to append log entries: %w", err) } return nil } @@ -416,10 +433,8 @@ func (w *RaftWAL) SavePersist(data PersistData) error { w.file = f // 3. 写入所有日志条目 - for _, entry := range data.Log { - if err := w.AppendLog(entry); err != nil { - return fmt.Errorf("failed to append log entry: %w", err) - } + if err := w.AppendLogs(data.Log); err != nil { + return fmt.Errorf("failed to append log entries: %w", err) } return nil diff --git a/Raft/rpc.go b/Raft/rpc.go index 838040b..75700dc 100644 --- a/Raft/rpc.go +++ b/Raft/rpc.go @@ -174,10 +174,8 @@ func (r *RaftRPC) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesRep slog.Error("failed to rebuild log", "error", err) } } else { - for _, entry := range newEntries { - if err := r.raft.wal.AppendLog(entry); err != nil { - slog.Error("failed to append log", "error", err) - } + if err := r.raft.wal.AppendLogs(newEntries); err != nil { + slog.Error("failed to append logs", "error", err) } } r.raft.persistStateLocked()