Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 28 additions & 13 deletions Raft/raft_wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ func (w *RaftWAL) LoadState() (int64, int64, error) {
return term, votedFor, nil
}

func (w *RaftWAL) AppendLog(entry LogEntry) error {
// writeEntry 写入单条日志的字节,但不 fsync。供 AppendLog 与 AppendLogs 复用。
func (w *RaftWAL) writeEntry(entry LogEntry) error {
if err := binary.Write(w.file, binary.BigEndian, int64(entry.Index)); err != nil {
return err
}
Expand All @@ -143,7 +144,27 @@ func (w *RaftWAL) AppendLog(entry LogEntry) error {
if _, err := w.file.Write(entry.Command); err != nil {
return err
}
return nil
}

func (w *RaftWAL) AppendLog(entry LogEntry) error {
if err := w.writeEntry(entry); err != nil {
return err
}
return w.file.Sync()
}

// AppendLogs 批量追加日志:全部写完后只 fsync 一次(group commit),
// 将整批 N 条的 N 次 fsync 摊销为 1 次。空批次为 no-op。
func (w *RaftWAL) AppendLogs(entries []LogEntry) error {
if len(entries) == 0 {
return nil
}
for _, entry := range entries {
if err := w.writeEntry(entry); err != nil {
return err
}
}
return w.file.Sync()
}

Expand Down Expand Up @@ -372,10 +393,8 @@ func (w *RaftWAL) TruncateLogs(lastIncludedIndex int64) error {
}
w.file = f

for _, log := range remainingLogs {
if err := w.AppendLog(log); err != nil {
return err
}
if err := w.AppendLogs(remainingLogs); err != nil {
return err
}

return nil
Expand All @@ -391,10 +410,8 @@ func (w *RaftWAL) RebuildLogFile(entries []LogEntry) error {
}
w.file = f

for _, entry := range entries {
if err := w.AppendLog(entry); err != nil {
return fmt.Errorf("failed to append log entry: %w", err)
}
if err := w.AppendLogs(entries); err != nil {
return fmt.Errorf("failed to append log entries: %w", err)
}
return nil
}
Expand All @@ -416,10 +433,8 @@ func (w *RaftWAL) SavePersist(data PersistData) error {
w.file = f

// 3. 写入所有日志条目
for _, entry := range data.Log {
if err := w.AppendLog(entry); err != nil {
return fmt.Errorf("failed to append log entry: %w", err)
}
if err := w.AppendLogs(data.Log); err != nil {
return fmt.Errorf("failed to append log entries: %w", err)
}

return nil
Expand Down
6 changes: 2 additions & 4 deletions Raft/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,10 +174,8 @@ func (r *RaftRPC) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesRep
slog.Error("failed to rebuild log", "error", err)
}
} else {
for _, entry := range newEntries {
if err := r.raft.wal.AppendLog(entry); err != nil {
slog.Error("failed to append log", "error", err)
}
if err := r.raft.wal.AppendLogs(newEntries); err != nil {
slog.Error("failed to append logs", "error", err)
}
}
r.raft.persistStateLocked()
Expand Down
Loading