From b273390ce76c7df06ff94124495fde89e283689a Mon Sep 17 00:00:00 2001 From: bang <3656828039@qq.com> Date: Mon, 1 Jun 2026 19:31:40 +0800 Subject: [PATCH] =?UTF-8?q?perf(raft):=20WAL=20=E6=89=B9=E9=87=8F=20fsync?= =?UTF-8?q?=20(group=20commit),=20=E6=91=8A=E9=94=80=E6=95=B4=E6=89=B9=20N?= =?UTF-8?q?=20=E6=AC=A1=20fsync=20=E4=B8=BA=201=20=E6=AC=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit RaftWAL.AppendLog 此前每条 entry 一次 file.Sync(), follower 端 AppendEntries 处理一整批 N 条时循环调用 → N 次 fsync; rebuild/ SavePersist 同样逐条 fsync。 拆出 writeEntry(只写不 sync), AppendLog = writeEntry + Sync 不变; 新增 AppendLogs(entries): 全部写完只 fsync 一次。rpc.go follower 批量循环及 TruncateLogs/RebuildLogFile/SavePersist 改用 AppendLogs。 durability 契约不变: 整批写入仍在 persistStateLocked/回复成功前落盘。 空批次为 no-op。leader 单条路径 (raft.go AppendEntry) 不变。 Co-Authored-By: Claude Opus 4.8 (1M context) --- Raft/raft_wal.go | 41 ++++++++++++++++++++++++++++------------- Raft/rpc.go | 6 ++---- 2 files changed, 30 insertions(+), 17 deletions(-) diff --git a/Raft/raft_wal.go b/Raft/raft_wal.go index a7257b7..dee5163 100644 --- a/Raft/raft_wal.go +++ b/Raft/raft_wal.go @@ -130,7 +130,8 @@ func (w *RaftWAL) LoadState() (int64, int64, error) { return term, votedFor, nil } -func (w *RaftWAL) AppendLog(entry LogEntry) error { +// writeEntry 写入单条日志的字节,但不 fsync。供 AppendLog 与 AppendLogs 复用。 +func (w *RaftWAL) writeEntry(entry LogEntry) error { if err := binary.Write(w.file, binary.BigEndian, int64(entry.Index)); err != nil { return err } @@ -143,7 +144,27 @@ func (w *RaftWAL) AppendLog(entry LogEntry) error { if _, err := w.file.Write(entry.Command); err != nil { return err } + return nil +} + +func (w *RaftWAL) AppendLog(entry LogEntry) error { + if err := w.writeEntry(entry); err != nil { + return err + } + return w.file.Sync() +} +// AppendLogs 批量追加日志:全部写完后只 fsync 一次(group commit), +// 将整批 N 条的 N 次 fsync 摊销为 1 次。空批次为 no-op。 +func (w *RaftWAL) AppendLogs(entries []LogEntry) error { + if len(entries) == 0 { + return nil + } + for _, entry := range entries { + if err := w.writeEntry(entry); err != nil { + return err + } + } return w.file.Sync() } @@ -372,10 +393,8 @@ func (w *RaftWAL) TruncateLogs(lastIncludedIndex int64) error { } w.file = f - for _, log := range remainingLogs { - if err := w.AppendLog(log); err != nil { - return err - } + if err := w.AppendLogs(remainingLogs); err != nil { + return err } return nil @@ -391,10 +410,8 @@ func (w *RaftWAL) RebuildLogFile(entries []LogEntry) error { } w.file = f - for _, entry := range entries { - if err := w.AppendLog(entry); err != nil { - return fmt.Errorf("failed to append log entry: %w", err) - } + if err := w.AppendLogs(entries); err != nil { + return fmt.Errorf("failed to append log entries: %w", err) } return nil } @@ -416,10 +433,8 @@ func (w *RaftWAL) SavePersist(data PersistData) error { w.file = f // 3. 写入所有日志条目 - for _, entry := range data.Log { - if err := w.AppendLog(entry); err != nil { - return fmt.Errorf("failed to append log entry: %w", err) - } + if err := w.AppendLogs(data.Log); err != nil { + return fmt.Errorf("failed to append log entries: %w", err) } return nil diff --git a/Raft/rpc.go b/Raft/rpc.go index 838040b..75700dc 100644 --- a/Raft/rpc.go +++ b/Raft/rpc.go @@ -174,10 +174,8 @@ func (r *RaftRPC) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesRep slog.Error("failed to rebuild log", "error", err) } } else { - for _, entry := range newEntries { - if err := r.raft.wal.AppendLog(entry); err != nil { - slog.Error("failed to append log", "error", err) - } + if err := r.raft.wal.AppendLogs(newEntries); err != nil { + slog.Error("failed to append logs", "error", err) } } r.raft.persistStateLocked()