From b975c46e2eed307bac0ea2b57cbf3161c1ef2d02 Mon Sep 17 00:00:00 2001 From: Tin Dang Date: Wed, 8 Apr 2026 22:50:04 +0700 Subject: [PATCH 1/6] feat: multi-part AOF + BGREWRITEAOF monoio + RDB loader 3x MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Ports the multi-part AOF persistence work from feat/persistence-overhaul (PR #37) as a fresh squash onto post-disk-offload main, dropping the stale branch's obsolete accept-loop / SO_REUSEPORT / cfg-gate changes. Additive content: - src/persistence/aof_manifest.rs — appendonlydir/ manifest + multi-part replay - src/persistence/rdb.rs — save_to_bytes / load_from_bytes + fast bulk loader (count_entries_per_db + Database::reserve + insert_for_load) - src/persistence/aof.rs — writer rewrite: waits for manifest, handles AofMessage::Rewrite{,Sharded} via do_rewrite_{single,sharded}, detects MOON magic prefix for RDB-preamble replay - src/command/persistence.rs — bgrewriteaof_start_sharded - src/storage/compact_value.rs — heap_string_owned / heap_string_vec_direct - src/storage/db.rs — insert_for_load / reserve / recalculate_memory - src/shard/shared_databases.rs — all_shard_dbs() - src/server/conn/handler_{sharded,monoio}.rs — BGREWRITEAOF dispatch - src/main.rs — replay_multi_part layered on v2/v3 recovery, manifest initialized after recovery so the writer thread can unblock Coexistence rule: when appendonlydir/ manifest is present it is authoritative; legacy appendonly.aof fallback (handled by v2 recovery inside restore_from_persistence) only fires when no manifest exists — covering first-upgrade from pre-multi-part moon. Known limitation: multi-part replay is single-shard only; multi-shard clusters log a warning and fall back to v2/v3 recovery. Validation (moon-dev OrbStack, Linux aarch64): cargo fmt --check ok cargo clippy --release -- -D warnings ok cargo clippy --no-default-features --features runtime-tokio,jemalloc -- -D warnings ok cargo test --release --lib 1858 pass cargo test --no-default-features --features runtime-tokio,jemalloc --lib 1877 pass Two failures in cargo test --release --lib (test_inline_set, test_inline_set_with_aof) and tests/replication_test.rs reproduce on clean main — pre-existing, not introduced by this change. --- src/command/persistence.rs | 15 + src/main.rs | 37 ++ src/persistence/aof.rs | 368 ++++++++++++++-- src/persistence/aof_manifest.rs | 314 +++++++++++++ src/persistence/mod.rs | 1 + src/persistence/rdb.rs | 684 +++++++++++++++++++++++++++-- src/server/conn/handler_monoio.rs | 12 + src/server/conn/handler_sharded.rs | 23 +- src/shard/shared_databases.rs | 7 + src/storage/compact_value.rs | 47 +- src/storage/db.rs | 27 ++ 11 files changed, 1449 insertions(+), 86 deletions(-) create mode 100644 src/persistence/aof_manifest.rs diff --git a/src/command/persistence.rs b/src/command/persistence.rs index 928a2b1f..35423fab 100644 --- a/src/command/persistence.rs +++ b/src/command/persistence.rs @@ -212,6 +212,21 @@ pub fn bgrewriteaof_start(aof_tx: &channel::MpscSender, db: SharedDa } } +/// Start BGREWRITEAOF in sharded mode using ShardDatabases. +pub fn bgrewriteaof_start_sharded( + aof_tx: &channel::MpscSender, + shard_databases: std::sync::Arc, +) -> Frame { + match aof_tx.try_send(AofMessage::RewriteSharded(shard_databases)) { + Ok(()) => Frame::SimpleString(Bytes::from_static( + b"Background append only file rewriting started", + )), + Err(_) => Frame::Error(Bytes::from_static( + b"ERR Background AOF rewrite failed to start", + )), + } +} + /// SAVE command: synchronous save to disk. Blocks until complete. /// /// Clones all entries under read locks (same as BGSAVE), then serializes diff --git a/src/main.rs b/src/main.rs index 089b6a6b..f57dd028 100644 --- a/src/main.rs +++ b/src/main.rs @@ -240,6 +240,43 @@ fn main() -> anyhow::Result<()> { }) .collect(); + // Multi-part AOF replay layered on top of v2/v3 recovery. + // Priority: if appendonlydir/ manifest exists → load multi-part (skip legacy v2 fallback). + // Otherwise v2 already handled legacy appendonly.aof during restore_from_persistence. + if config.appendonly == "yes" { + if let Some(ref dir) = persistence_dir { + use moon::persistence::aof_manifest::AofManifest; + use moon::persistence::replay::DispatchReplayEngine; + let base_dir = std::path::PathBuf::from(dir); + if let Some(manifest) = AofManifest::load(&base_dir) { + if num_shards == 1 { + match moon::persistence::aof_manifest::replay_multi_part( + &mut shards[0].databases, + &manifest, + &DispatchReplayEngine, + ) { + Ok(n) => info!( + "AOF multi-part loaded (seq {}): {} entries", + manifest.seq, n + ), + Err(e) => tracing::error!("Multi-part AOF load failed: {}", e), + } + } else { + tracing::warn!( + "Multi-part AOF skipped in multi-shard mode (not yet supported)" + ); + } + } + // Initialize manifest for writer thread (safe — recovery is complete). + // On fresh/legacy upgrade, writer is blocked waiting for this file. + if AofManifest::load(&base_dir).is_none() { + if let Err(e) = AofManifest::initialize(&base_dir) { + tracing::error!("Failed to initialize AOF manifest: {}", e); + } + } + } + } + // Extract databases from all shards and wrap in ShardDatabases let all_dbs: Vec> = shards .iter_mut() diff --git a/src/persistence/aof.rs b/src/persistence/aof.rs index b0161ca7..5caeb4d1 100644 --- a/src/persistence/aof.rs +++ b/src/persistence/aof.rs @@ -61,6 +61,8 @@ pub enum AofMessage { Append(Bytes), /// Trigger a full AOF rewrite (compaction) using current database state. Rewrite(SharedDatabases), + /// Trigger AOF rewrite in sharded mode (all shards' databases). + RewriteSharded(Arc), /// Shut down the AOF writer task gracefully. Shutdown, } @@ -107,38 +109,129 @@ pub async fn aof_writer_task( #[cfg(feature = "runtime-tokio")] interval.tick().await; // consume first tick - // Monoio fallback: AOF writer uses sync I/O in a simple recv loop. + // Monoio path: multi-part AOF (base RDB + incremental RESP) with sync I/O. + // + // On startup, if appendonlydir/ exists with a manifest, open the current + // incr file for appending. Otherwise start fresh with seq 1. + // On BGREWRITEAOF: snapshot → write new base RDB → create new incr → advance manifest. #[cfg(feature = "runtime-monoio")] { + use crate::persistence::aof_manifest::AofManifest; use std::io::Write; + + // Resolve the persistence base directory from aof_path's parent. + let base_dir = aof_path.parent().unwrap_or(Path::new(".")).to_path_buf(); + + // Load manifest — do NOT create one here if it doesn't exist. + // main.rs recovery runs concurrently and must finish before a manifest + // is created, to avoid racing against legacy single-file AOF detection. + // main.rs will create the manifest after recovery completes. + let mut manifest = loop { + if let Some(m) = AofManifest::load(&base_dir) { + break m; + } + // main.rs recovery hasn't created the manifest yet — wait. + std::thread::sleep(std::time::Duration::from_millis(50)); + }; + + // Open the current incremental file for appending + let incr_path = manifest.incr_path(); let mut file = match std::fs::OpenOptions::new() .create(true) .append(true) - .open(&aof_path) + .open(&incr_path) { Ok(f) => f, Err(e) => { - error!("Failed to open AOF file {}: {}", aof_path.display(), e); + error!( + "Failed to open AOF incr file {}: {}", + incr_path.display(), + e + ); return; } }; + info!( + "AOF writer: seq {}, incr={}", + manifest.seq, + incr_path.display() + ); + + let mut last_fsync = Instant::now(); + + let mut write_error = false; + loop { - // Use blocking recv since monoio doesn't support tokio::select! match rx.recv() { Ok(AofMessage::Append(data)) => { - let _ = file.write_all(&data); - if fsync == FsyncPolicy::Always { - let _ = file.flush(); - let _ = std::io::Write::flush(&mut file); + if write_error { + continue; // Drop appends after persistent I/O failure + } + if let Err(e) = file.write_all(&data) { + error!( + "AOF write failed (seq {}): {}. Persistence degraded.", + manifest.seq, e + ); + write_error = true; + continue; + } + match fsync { + FsyncPolicy::Always => { + if let Err(e) = file.flush().and_then(|_| file.sync_data()) { + error!("AOF sync failed (seq {}, always): {}", manifest.seq, e); + write_error = true; + } + } + FsyncPolicy::EverySec => { + if last_fsync.elapsed() >= std::time::Duration::from_secs(1) { + if let Err(e) = file.flush().and_then(|_| file.sync_data()) { + error!( + "AOF sync failed (seq {}, everysec): {}", + manifest.seq, e + ); + // Non-fatal for everysec: retry next interval + } else { + last_fsync = Instant::now(); + } + } + } + FsyncPolicy::No => {} } } Ok(AofMessage::Shutdown) | Err(_) => { - let _ = file.flush(); - info!("AOF writer shutting down (monoio)"); + if !write_error { + if let Err(e) = file.flush().and_then(|_| file.sync_data()) { + error!("AOF final sync failed (seq {}): {}", manifest.seq, e); + } + } + info!("AOF writer shutting down (monoio, seq {})", manifest.seq); break; } - Ok(AofMessage::Rewrite(_db)) => { - // AOF rewrite under monoio: not yet implemented + Ok(AofMessage::Rewrite(db)) => { + if !write_error { + if let Err(e) = file.flush().and_then(|_| file.sync_data()) { + error!("AOF pre-rewrite sync failed (seq {}): {}", manifest.seq, e); + } + } + match do_rewrite_single(&db, &mut manifest, &mut file) { + Ok(()) => { + write_error = false; // Reset on successful rewrite + } + Err(e) => error!("AOF rewrite failed (seq {}): {}", manifest.seq, e), + } + } + Ok(AofMessage::RewriteSharded(shard_dbs)) => { + if !write_error { + if let Err(e) = file.flush().and_then(|_| file.sync_data()) { + error!("AOF pre-rewrite sync failed (seq {}): {}", manifest.seq, e); + } + } + match do_rewrite_sharded(&shard_dbs, &mut manifest, &mut file) { + Ok(()) => { + write_error = false; + } + Err(e) => error!("AOF rewrite failed (seq {}): {}", manifest.seq, e), + } } } } @@ -190,6 +283,19 @@ pub async fn aof_writer_task( } } } + Ok(AofMessage::RewriteSharded(shard_dbs)) => { + let _ = writer.flush().await; + let _ = writer.get_ref().sync_data().await; + if let Err(e) = rewrite_aof_sharded_sync(&shard_dbs, &aof_path) { + error!("AOF rewrite (sharded) failed: {}", e); + } + let reopen_result: Result = tokio::fs::OpenOptions::new() + .create(true).append(true).open(&aof_path).await; + match reopen_result { + Ok(f) => writer = tokio::io::BufWriter::new(f), + Err(e) => { error!("Failed to reopen AOF after rewrite: {}", e); return; } + } + } Ok(AofMessage::Shutdown) | Err(_) => { let _ = writer.flush().await; let _ = writer.get_ref().sync_data().await; @@ -233,8 +339,35 @@ pub fn replay_aof( return Ok(0); } - let total_len = data.len(); - let mut buf = BytesMut::from(&data[..]); + // Detect RDB preamble: if the file starts with "MOON" magic, load the binary + // RDB section first, then replay any RESP commands appended after it. + let (rdb_keys, resp_start) = if data.starts_with(b"MOON") { + match crate::persistence::rdb::load_from_bytes(databases, &data) { + Ok((keys, consumed)) => { + info!( + "AOF RDB preamble loaded: {} keys ({} bytes)", + keys, consumed + ); + (keys, consumed) + } + Err(e) => { + // Data starts with MOON magic — it IS RDB format. + // Falling back to RESP would parse garbage. Propagate the error. + return Err(e); + } + } + } else { + (0, 0) + }; + + // If the entire file was RDB (no RESP tail), we're done + if resp_start >= data.len() { + return Ok(rdb_keys); + } + + let resp_data = &data[resp_start..]; + let total_len = resp_data.len(); + let mut buf = BytesMut::from(resp_data); let config = ParseConfig::default(); let mut selected_db: usize = 0; let mut count: usize = 0; @@ -314,12 +447,13 @@ pub fn replay_aof( ); } - Ok(count) + Ok(rdb_keys + count) } /// Generate synthetic RESP commands from the current database state for AOF rewriting. /// /// Produces commands for all 5 data types plus PEXPIRE for keys with TTL. +#[allow(dead_code)] // Retained for RESP-only AOF rewrite fallback and testing pub fn generate_rewrite_commands(databases: &[Database]) -> BytesMut { let mut buf = BytesMut::new(); let now_ms = current_time_ms(); @@ -518,12 +652,11 @@ pub fn generate_rewrite_commands(databases: &[Database]) -> BytesMut { buf } -/// Rewrite the AOF file with synthetic commands from current database state. +/// Snapshot databases and generate compacted AOF commands. /// -/// Writes to a temporary file first, then atomically renames for crash safety. -#[cfg(feature = "runtime-tokio")] -pub async fn rewrite_aof(db: SharedDatabases, aof_path: &Path) -> Result<(), MoonError> { - // Clone database state: lock each db individually with read lock +/// Shared by both the async (tokio) and sync (monoio) rewrite paths. +#[allow(dead_code)] +fn snapshot_and_generate(db: &SharedDatabases) -> BytesMut { let snapshot: Vec<(Vec<(CompactKey, Entry)>, u32)> = db .iter() .map(|lock| { @@ -538,7 +671,6 @@ pub async fn rewrite_aof(db: SharedDatabases, aof_path: &Path) -> Result<(), Moo }) .collect(); - // Reconstruct temporary Database objects for generate_rewrite_commands let mut temp_dbs: Vec = Vec::with_capacity(snapshot.len()); for (entries, _base_ts) in &snapshot { let mut db = Database::new(); @@ -548,11 +680,130 @@ pub async fn rewrite_aof(db: SharedDatabases, aof_path: &Path) -> Result<(), Moo temp_dbs.push(db); } - let commands = generate_rewrite_commands(&temp_dbs); + generate_rewrite_commands(&temp_dbs) +} + +/// Multi-part rewrite: snapshot single-shard databases → RDB base → advance manifest. +#[cfg(feature = "runtime-monoio")] +fn do_rewrite_single( + db: &SharedDatabases, + manifest: &mut crate::persistence::aof_manifest::AofManifest, + file: &mut std::fs::File, +) -> Result<(), MoonError> { + // Capture (entries, base_ts) per database — preserves original base_ts for correct TTL. + let snapshot: Vec<( + Vec<( + crate::storage::compact_key::CompactKey, + crate::storage::entry::Entry, + )>, + u32, + )> = db + .iter() + .map(|lock| { + let guard = lock.read(); + let base_ts = guard.base_timestamp(); + let entries = guard + .data() + .iter() + .map(|(k, v)| (k.clone(), v.clone())) + .collect(); + (entries, base_ts) + }) + .collect(); + + let rdb_bytes = crate::persistence::rdb::save_snapshot_to_bytes(&snapshot)?; + let new_incr = manifest.advance(&rdb_bytes)?; + + // Switch writer to new incr file + *file = std::fs::OpenOptions::new() + .create(true) + .append(true) + .open(&new_incr) + .map_err(|e| AofError::Io { + path: new_incr, + source: e, + })?; + + Ok(()) +} + +/// Multi-part rewrite: snapshot all shards → merged RDB base → advance manifest. +#[cfg(feature = "runtime-monoio")] +fn do_rewrite_sharded( + shard_dbs: &crate::shard::shared_databases::ShardDatabases, + manifest: &mut crate::persistence::aof_manifest::AofManifest, + file: &mut std::fs::File, +) -> Result<(), MoonError> { + // Capture (entries, base_ts) per merged database, preserving original base_ts for TTL. + let db_count = shard_dbs.db_count(); + let mut merged: Vec<( + Vec<( + crate::storage::compact_key::CompactKey, + crate::storage::entry::Entry, + )>, + u32, + )> = (0..db_count).map(|_| (Vec::new(), 0u32)).collect(); + + for shard_locks in shard_dbs.all_shard_dbs() { + for (db_idx, lock) in shard_locks.iter().enumerate() { + let guard = lock.read(); + let base_ts = guard.base_timestamp(); + let now_ms = current_time_ms(); + if merged[db_idx].0.is_empty() { + merged[db_idx].1 = base_ts; + } + for (key, entry) in guard.data().iter() { + if !entry.is_expired_at(base_ts, now_ms) { + merged[db_idx].0.push((key.clone(), entry.clone())); + } + } + } + } + + let rdb_bytes = crate::persistence::rdb::save_snapshot_to_bytes(&merged)?; + let new_incr = manifest.advance(&rdb_bytes)?; + + *file = std::fs::OpenOptions::new() + .create(true) + .append(true) + .open(&new_incr) + .map_err(|e| AofError::Io { + path: new_incr, + source: e, + })?; + + Ok(()) +} + +/// Rewrite the AOF file with RDB preamble (binary base + empty RESP incremental). +/// +/// Uses the same strategy as Redis 7+ `aof-use-rdb-preamble yes`: +/// the rewritten AOF starts with a full RDB snapshot (compact binary), +/// and new writes are appended as RESP after it. On startup, the loader +/// detects the RDB magic and reads the binary preamble, then switches +/// to RESP parsing for any incremental commands appended after. +#[allow(dead_code)] // Retained for legacy single-file and tokio path +fn rewrite_aof_sync(db: &SharedDatabases, aof_path: &Path) -> Result<(), MoonError> { + // Snapshot under read locks, build temp Database objects for RDB serialization + let snapshot: Vec = db + .iter() + .map(|lock| { + let guard = lock.read(); + let mut temp = Database::new(); + let now_ms = current_time_ms(); + for (k, v) in guard.data().iter() { + if !v.is_expired_at(guard.base_timestamp(), now_ms) { + temp.set(k.to_bytes(), v.clone()); + } + } + temp + }) + .collect(); + + let rdb_bytes = crate::persistence::rdb::save_to_bytes(&snapshot)?; - // Write to temp file, then atomic rename let tmp_path = aof_path.with_extension("aof.tmp"); - std::fs::write(&tmp_path, &commands).map_err(|e| AofError::Io { + std::fs::write(&tmp_path, &rdb_bytes).map_err(|e| AofError::Io { path: tmp_path.clone(), source: e, })?; @@ -565,10 +816,77 @@ pub async fn rewrite_aof(db: SharedDatabases, aof_path: &Path) -> Result<(), Moo ), })?; - info!("AOF rewrite complete: {} bytes", commands.len()); + info!( + "AOF rewrite complete (RDB preamble): {} bytes", + rdb_bytes.len() + ); Ok(()) } +/// Rewrite the AOF in sharded mode with RDB preamble. +/// +/// Merges all shards' databases into a single RDB snapshot, writes it as +/// the AOF base file. New incremental writes are appended as RESP after. +#[allow(dead_code)] +fn rewrite_aof_sharded_sync( + shard_dbs: &crate::shard::shared_databases::ShardDatabases, + aof_path: &Path, +) -> Result<(), MoonError> { + let db_count = shard_dbs.db_count(); + let now_ms = current_time_ms(); + let mut merged_dbs: Vec = (0..db_count).map(|_| Database::new()).collect(); + + for shard_locks in shard_dbs.all_shard_dbs() { + for (db_idx, lock) in shard_locks.iter().enumerate() { + let guard = lock.read(); + for (key, entry) in guard.data().iter() { + if !entry.is_expired_at(guard.base_timestamp(), now_ms) { + merged_dbs[db_idx].set(key.to_bytes(), entry.clone()); + } + } + } + } + + let rdb_bytes = crate::persistence::rdb::save_to_bytes(&merged_dbs)?; + + let tmp_path = aof_path.with_extension("aof.tmp"); + std::fs::write(&tmp_path, &rdb_bytes).map_err(|e| AofError::Io { + path: tmp_path.clone(), + source: e, + })?; + std::fs::rename(&tmp_path, aof_path).map_err(|e| AofError::RewriteFailed { + detail: format!( + "rename {} -> {}: {}", + tmp_path.display(), + aof_path.display(), + e + ), + })?; + + info!( + "AOF rewrite (sharded, RDB preamble) complete: {} bytes", + rdb_bytes.len() + ); + Ok(()) +} + +/// Reopen AOF file in append mode after atomic rewrite replaced it. +#[allow(dead_code)] +fn reopen_aof_sync(aof_path: &Path) -> Result { + std::fs::OpenOptions::new() + .create(true) + .append(true) + .open(aof_path) +} + +/// Rewrite the AOF file (tokio async wrapper). +/// +/// Delegates to `rewrite_aof_sync` — the actual I/O is synchronous (temp write + rename). +#[cfg(feature = "runtime-tokio")] +pub async fn rewrite_aof(db: SharedDatabases, aof_path: &Path) -> Result<(), MoonError> { + rewrite_aof_sync(&db, aof_path) +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/persistence/aof_manifest.rs b/src/persistence/aof_manifest.rs new file mode 100644 index 00000000..84b884a8 --- /dev/null +++ b/src/persistence/aof_manifest.rs @@ -0,0 +1,314 @@ +//! Multi-part AOF manifest: tracks base (RDB) and incremental (RESP) files. +//! +//! Implements the same directory-based AOF format as Redis 7+: +//! ```text +//! appendonlydir/ +//! moon.aof.1.base.rdb # RDB snapshot base +//! moon.aof.1.incr.aof # Incremental RESP since base +//! moon.aof.manifest # This file +//! ``` +//! +//! The manifest is a simple text file listing the active base and incremental +//! files with their sequence numbers. On BGREWRITEAOF, the sequence increments, +//! a new base + incr pair is created, and old files are deleted. + +use std::io::Write; +use std::path::{Path, PathBuf}; + +use tracing::{error, info, warn}; + +const MANIFEST_NAME: &str = "moon.aof.manifest"; +const AOF_DIR_NAME: &str = "appendonlydir"; + +/// Active AOF file set tracked by the manifest. +#[derive(Debug, Clone)] +pub struct AofManifest { + /// Base directory (parent of `appendonlydir/`) + pub dir: PathBuf, + /// Current sequence number (incremented on each rewrite) + pub seq: u64, +} + +impl AofManifest { + /// Path to the `appendonlydir/` directory. + pub fn aof_dir(&self) -> PathBuf { + self.dir.join(AOF_DIR_NAME) + } + + /// Path to the manifest file. + pub fn manifest_path(&self) -> PathBuf { + self.aof_dir().join(MANIFEST_NAME) + } + + /// Path to the base RDB file for the current sequence. + pub fn base_path(&self) -> PathBuf { + self.aof_dir() + .join(format!("moon.aof.{}.base.rdb", self.seq)) + } + + /// Path to the incremental RESP file for the current sequence. + pub fn incr_path(&self) -> PathBuf { + self.aof_dir() + .join(format!("moon.aof.{}.incr.aof", self.seq)) + } + + /// Path to the base RDB file for a given sequence. + pub fn base_path_seq(&self, seq: u64) -> PathBuf { + self.aof_dir().join(format!("moon.aof.{}.base.rdb", seq)) + } + + /// Path to the incremental RESP file for a given sequence. + pub fn incr_path_seq(&self, seq: u64) -> PathBuf { + self.aof_dir().join(format!("moon.aof.{}.incr.aof", seq)) + } + + /// Create the `appendonlydir/` and write the initial manifest. + pub fn initialize(dir: &Path) -> std::io::Result { + let manifest = Self { + dir: dir.to_path_buf(), + seq: 1, + }; + std::fs::create_dir_all(manifest.aof_dir())?; + manifest.write_manifest()?; + Ok(manifest) + } + + /// Load manifest from disk. Returns `None` if manifest doesn't exist. + pub fn load(dir: &Path) -> Option { + let aof_dir = dir.join(AOF_DIR_NAME); + let manifest_path = aof_dir.join(MANIFEST_NAME); + + if !manifest_path.exists() { + return None; + } + + let content = match std::fs::read_to_string(&manifest_path) { + Ok(c) => c, + Err(e) => { + error!("Failed to read AOF manifest: {}", e); + return None; + } + }; + + let mut seq = 0u64; + for line in content.lines() { + let line = line.trim(); + if let Some(val) = line.strip_prefix("seq ") { + if let Ok(n) = val.parse::() { + seq = n; + } + } + } + + if seq == 0 { + error!("AOF manifest has no valid sequence number"); + return None; + } + + Some(Self { + dir: dir.to_path_buf(), + seq, + }) + } + + /// Write the manifest file atomically (write tmp + rename). + pub fn write_manifest(&self) -> std::io::Result<()> { + let manifest_path = self.manifest_path(); + let tmp_path = manifest_path.with_extension("tmp"); + + let content = format!( + "seq {}\nbase moon.aof.{}.base.rdb\nincr moon.aof.{}.incr.aof\n", + self.seq, self.seq, self.seq + ); + + let mut f = std::fs::File::create(&tmp_path)?; + f.write_all(content.as_bytes())?; + f.sync_data()?; + std::fs::rename(&tmp_path, &manifest_path)?; + Ok(()) + } + + /// Advance to the next sequence: write new base RDB, create new incr file, + /// update manifest, delete old files. + /// + /// Returns the path to the new incremental file (caller should switch writing to it). + pub fn advance(&mut self, rdb_bytes: &[u8]) -> Result { + let old_seq = self.seq; + let new_seq = old_seq + 1; + + let aof_dir = self.aof_dir(); + std::fs::create_dir_all(&aof_dir).map_err(|e| crate::error::AofError::Io { + path: aof_dir.clone(), + source: e, + })?; + + // 1. Write new base RDB (atomic: tmp + rename) + let new_base = self.base_path_seq(new_seq); + let tmp_base = new_base.with_extension("rdb.tmp"); + std::fs::write(&tmp_base, rdb_bytes).map_err(|e| crate::error::AofError::Io { + path: tmp_base.clone(), + source: e, + })?; + std::fs::rename(&tmp_base, &new_base).map_err(|e| { + crate::error::AofError::RewriteFailed { + detail: format!("rename base: {}", e), + } + })?; + + // 2. Create empty new incremental file + let new_incr = self.incr_path_seq(new_seq); + std::fs::File::create(&new_incr).map_err(|e| crate::error::AofError::Io { + path: new_incr.clone(), + source: e, + })?; + + // 3. Update manifest (atomic) + self.seq = new_seq; + self.write_manifest() + .map_err(|e| crate::error::AofError::Io { + path: self.manifest_path(), + source: e, + })?; + + // 4. Delete old files (best-effort) + let old_base = self.base_path_seq(old_seq); + let old_incr = self.incr_path_seq(old_seq); + if old_base.exists() { + if let Err(e) = std::fs::remove_file(&old_base) { + warn!("Failed to delete old base {}: {}", old_base.display(), e); + } + } + if old_incr.exists() { + if let Err(e) = std::fs::remove_file(&old_incr) { + warn!("Failed to delete old incr {}: {}", old_incr.display(), e); + } + } + + info!( + "AOF advanced to seq {}: base={} bytes, incr={}", + new_seq, + rdb_bytes.len(), + new_incr.display() + ); + + Ok(new_incr) + } +} + +/// Replay multi-part AOF: load base RDB then replay incremental RESP. +/// +/// Returns total keys/commands loaded. +pub fn replay_multi_part( + databases: &mut [crate::storage::Database], + manifest: &AofManifest, + engine: &dyn crate::persistence::replay::CommandReplayEngine, +) -> Result { + let mut total = 0usize; + + // Load base RDB + let base_path = manifest.base_path(); + if base_path.exists() { + match crate::persistence::rdb::load(databases, &base_path) { + Ok(n) => { + info!( + "AOF base RDB loaded: {} keys from {}", + n, + base_path.display() + ); + total += n; + } + Err(e) => { + // Base RDB is corrupt or unreadable — applying incremental + // deltas on top of missing/corrupt base gives wrong results. + error!("AOF base RDB load failed: {}", e); + return Err(e); + } + } + } else { + warn!("AOF base RDB not found: {}", base_path.display()); + } + + // Replay incremental RESP + let incr_path = manifest.incr_path(); + if incr_path.exists() { + let data = std::fs::read(&incr_path)?; + if !data.is_empty() { + // Pure RESP — use replay_aof_resp (no RDB preamble detection needed) + let count = replay_incr_resp(databases, &data, engine)?; + info!( + "AOF incr replayed: {} commands from {}", + count, + incr_path.display() + ); + total += count; + } + } + + Ok(total) +} + +/// Replay pure RESP commands from a byte slice. +fn replay_incr_resp( + databases: &mut [crate::storage::Database], + data: &[u8], + engine: &dyn crate::persistence::replay::CommandReplayEngine, +) -> Result { + use crate::protocol::{Frame, ParseConfig, parse}; + use bytes::BytesMut; + + let total_len = data.len(); + let mut buf = BytesMut::from(data); + let config = ParseConfig::default(); + let mut selected_db: usize = 0; + let mut count: usize = 0; + + loop { + if buf.is_empty() { + break; + } + match parse::parse(&mut buf, &config) { + Ok(Some(frame)) => { + let (cmd, cmd_args) = match &frame { + Frame::Array(arr) if !arr.is_empty() => { + let name = match &arr[0] { + Frame::BulkString(s) => s.as_ref(), + Frame::SimpleString(s) => s.as_ref(), + _ => { + count += 1; + continue; + } + }; + (name as &[u8], &arr[1..]) + } + _ => { + count += 1; + continue; + } + }; + engine.replay_command(databases, cmd, cmd_args, &mut selected_db); + count += 1; + } + Ok(None) => { + if !buf.is_empty() { + let offset = total_len - buf.len(); + warn!( + "AOF incr truncated: {} bytes at offset {}", + buf.len(), + offset + ); + } + break; + } + Err(_) => { + let _ = buf.split_to(1); + if let Some(pos) = buf.iter().position(|&b| b == b'*') { + let _ = buf.split_to(pos); + } else { + break; + } + } + } + } + + Ok(count) +} diff --git a/src/persistence/mod.rs b/src/persistence/mod.rs index ded689b1..5c51d4a5 100644 --- a/src/persistence/mod.rs +++ b/src/persistence/mod.rs @@ -1,4 +1,5 @@ pub mod aof; +pub mod aof_manifest; pub mod auto_save; pub mod checkpoint; pub mod clog; diff --git a/src/persistence/rdb.rs b/src/persistence/rdb.rs index a14ec0d6..dbc8a3ba 100644 --- a/src/persistence/rdb.rs +++ b/src/persistence/rdb.rs @@ -47,7 +47,11 @@ const EOF_MARKER: u8 = 0xFF; /// Uses atomic write (write to .tmp, then rename) for crash safety. /// Expired keys are skipped. Empty databases are skipped. /// Footer contains CRC32 checksum of all preceding bytes. -pub fn save(databases: &[Database], path: &Path) -> Result<(), MoonError> { +/// Serialize all databases to RDB format in memory. +/// +/// Returns the complete RDB byte stream (header + entries + footer + CRC32). +/// Used by both `save()` (file) and AOF RDB-preamble rewrite. +pub fn save_to_bytes(databases: &[Database]) -> Result, MoonError> { let mut buf = Vec::new(); // Header @@ -60,7 +64,6 @@ pub fn save(databases: &[Database], path: &Path) -> Result<(), MoonError> { for (db_idx, db) in databases.iter().enumerate() { let base_ts = db.base_timestamp(); let data = db.data(); - // Collect non-expired entries let live: Vec<_> = data .iter() .filter(|(_, entry)| !entry.is_expired_at(base_ts, now_ms)) @@ -86,6 +89,12 @@ pub fn save(databases: &[Database], path: &Path) -> Result<(), MoonError> { let checksum = hasher.finalize(); buf.write_all(&checksum.to_le_bytes())?; + Ok(buf) +} + +pub fn save(databases: &[Database], path: &Path) -> Result<(), MoonError> { + let buf = save_to_bytes(databases)?; + // Atomic write: write to tmp, then rename let tmp_path = path.with_extension("rdb.tmp"); std::fs::write(&tmp_path, &buf).map_err(|e| RdbError::Io { @@ -154,6 +163,46 @@ pub fn save_from_snapshot( Ok(()) } +/// Serialize snapshot data (with correct base_ts per database) to RDB bytes in memory. +/// +/// Unlike `save_to_bytes(&[Database])` which reads base_ts from each Database, +/// this takes explicit (entries, base_ts) tuples — critical for AOF rewrite where +/// entries are cloned into temporary storage and the original base_ts must be preserved. +pub fn save_snapshot_to_bytes( + snapshot: &[(Vec<(CompactKey, Entry)>, u32)], +) -> Result, MoonError> { + let mut buf = Vec::new(); + + buf.write_all(RDB_MAGIC)?; + buf.write_all(&[RDB_VERSION])?; + + let now_ms = current_time_ms(); + + for (db_idx, (entries, base_ts)) in snapshot.iter().enumerate() { + let live: Vec<_> = entries + .iter() + .filter(|(_, e)| !e.is_expired_at(*base_ts, now_ms)) + .collect(); + if live.is_empty() { + continue; + } + + buf.write_all(&[DB_SELECTOR])?; + buf.write_all(&[db_idx as u8])?; + + for (key, entry) in live { + write_entry(&mut buf, key.as_bytes(), entry, *base_ts)?; + } + } + + buf.write_all(&[EOF_MARKER])?; + let mut hasher = Hasher::new(); + hasher.update(&buf); + buf.write_all(&hasher.finalize().to_le_bytes())?; + + Ok(buf) +} + /// Load an RDB file and populate databases. Returns total keys loaded. /// /// On any error (missing file, corrupt data, bad checksum), returns Err. @@ -175,25 +224,28 @@ pub fn load(databases: &mut [Database], path: &Path) -> Result .into()); } + // Wrap in Bytes for zero-copy slicing (shared refcount, no copy) + let shared_buf = Bytes::from(data); + // Verify CRC32: all bytes except last 4 vs last 4 bytes - let (payload, checksum_bytes) = data.split_at(data.len() - 4); + let payload_len = shared_buf.len() - 4; let stored_checksum = u32::from_le_bytes([ - checksum_bytes[0], - checksum_bytes[1], - checksum_bytes[2], - checksum_bytes[3], + shared_buf[payload_len], + shared_buf[payload_len + 1], + shared_buf[payload_len + 2], + shared_buf[payload_len + 3], ]); let mut hasher = Hasher::new(); - hasher.update(payload); + hasher.update(&shared_buf[..payload_len]); let computed_checksum = hasher.finalize(); if stored_checksum != computed_checksum { return Err(RdbError::ChecksumMismatch.into()); } - let mut cursor = Cursor::new(payload); + let mut cursor = Cursor::new(&shared_buf[..payload_len] as &[u8]); // Verify magic - let mut magic = [0u8; 4]; // "MOON" is 4 bytes + let mut magic = [0u8; 4]; cursor.read_exact(&mut magic).map_err(|e| RdbError::Io { path: path.to_path_buf(), source: e, @@ -218,18 +270,28 @@ pub fn load(databases: &mut [Database], path: &Path) -> Result .into()); } + // Cache timestamps once (Fix #4: avoid syscall per entry) let now_ms = current_time_ms(); + let now_secs = (now_ms / 1000) as u32; + + // First pass: count entries per database for pre-sizing (Fix #2) + let entry_counts = count_entries_per_db(&cursor, databases.len()); + + // Pre-size DashTables to avoid segment splits during load (Fix #2) + for (db_idx, &count) in entry_counts.iter().enumerate() { + if count > 0 && db_idx < databases.len() { + databases[db_idx].reserve(count); + } + } let mut total_keys = 0usize; - let mut skipped_entries = 0usize; let mut current_db: usize = 0; loop { let mut tag = [0u8; 1]; if cursor.read_exact(&mut tag).is_err() { - // Truncated tail: no more data to read, treat as implicit EOF tracing::warn!( - "RDB load: truncated tail after {} keys (no EOF marker), treating as end of file", + "RDB load: truncated tail after {} keys (no EOF marker)", total_keys ); break; @@ -256,33 +318,22 @@ pub fn load(databases: &mut [Database], path: &Path) -> Result } } type_tag => { - // Mid-stream corruption recovery: log+skip on entry parse failure - match read_entry(&mut cursor, type_tag) { + match read_entry_zero_copy(&mut cursor, type_tag, &shared_buf, now_secs) { Ok((key, entry)) => { - // Skip entries whose TTL is already in the past - if entry.has_expiry() && entry.is_expired_at(current_secs(), now_ms) { + if entry.has_expiry() && entry.is_expired_at(now_secs, now_ms) { continue; } if current_db < databases.len() { - databases[current_db].set(key, entry); + // Fix #3: skip duplicate check + memory accounting + databases[current_db].insert_for_load(key, entry); total_keys += 1; } } Err(e) => { - let offset = cursor.position(); - tracing::warn!( - "RDB load: skipping corrupted entry at offset {}: {}", - offset, - e - ); - skipped_entries += 1; - // Cannot reliably skip to next entry in a variable-length - // format without framing, so break out of the loop. - // Entries loaded so far are valid (checksum passed). tracing::warn!( - "RDB load: stopping mid-stream recovery after {} skipped entries; \ - {} keys loaded successfully", - skipped_entries, + "RDB load: corrupted entry at offset {}: {}. {} keys loaded.", + cursor.position(), + e, total_keys ); break; @@ -292,17 +343,525 @@ pub fn load(databases: &mut [Database], path: &Path) -> Result } } - if skipped_entries > 0 { - tracing::warn!( - "RDB load completed with {} entries skipped due to corruption, {} keys loaded", - skipped_entries, - total_keys - ); + // Fix #3: single-pass memory recalculation after all inserts + for db in databases.iter_mut() { + db.recalculate_memory(); } Ok(total_keys) } +/// Fast first-pass: count entries per database without parsing values. +/// Scans type tags and skips over entry payloads to count keys per db_idx. +fn count_entries_per_db(cursor: &Cursor<&[u8]>, db_count: usize) -> Vec { + let mut counts = vec![0usize; db_count]; + let data = cursor.get_ref(); + let mut pos = cursor.position() as usize; + let mut current_db = 0usize; + + while pos < data.len() { + let tag = data[pos]; + pos += 1; + + match tag { + EOF_MARKER => break, + DB_SELECTOR => { + if pos < data.len() { + current_db = data[pos] as usize; + pos += 1; + } else { + break; + } + } + TYPE_STRING | TYPE_HASH | TYPE_LIST | TYPE_SET | TYPE_SORTED_SET | TYPE_STREAM => { + if current_db < db_count { + counts[current_db] += 1; + } + // Skip over the entry payload without parsing + if let Some(new_pos) = skip_entry(data, pos, tag) { + pos = new_pos; + } else { + break; + } + } + _ => break, + } + } + + counts +} + +/// Skip over an RDB entry's bytes without allocating or parsing values. +/// Returns the new position after the entry, or None if data is truncated. +fn skip_entry(data: &[u8], mut pos: usize, type_tag: u8) -> Option { + // Skip key + pos = skip_bytes_field(data, pos)?; + // Skip TTL (8 bytes) + pos = pos.checked_add(8)?; + if pos > data.len() { + return None; + } + + match type_tag { + TYPE_STRING => { + pos = skip_bytes_field(data, pos)?; + } + TYPE_HASH => { + let count = read_u32_raw(data, pos)?; + pos += 4; + for _ in 0..count { + pos = skip_bytes_field(data, pos)?; // field + pos = skip_bytes_field(data, pos)?; // value + } + } + TYPE_LIST | TYPE_SET => { + let count = read_u32_raw(data, pos)?; + pos += 4; + for _ in 0..count { + pos = skip_bytes_field(data, pos)?; + } + } + TYPE_SORTED_SET => { + let count = read_u32_raw(data, pos)?; + pos += 4; + for _ in 0..count { + pos = skip_bytes_field(data, pos)?; // member + pos = pos.checked_add(8)?; // f64 score + if pos > data.len() { + return None; + } + } + } + TYPE_STREAM => { + // entry_count(8) + last_id(16) + pos = pos.checked_add(24)?; + if pos > data.len() { + return None; + } + let entry_count = + u64::from_le_bytes(data[pos - 24..pos - 16].try_into().ok()?) as usize; + for _ in 0..entry_count { + pos = pos.checked_add(16)?; // StreamId (ms + seq) + if pos > data.len() { + return None; + } + let field_count = read_u32_raw(data, pos)?; + pos += 4; + for _ in 0..field_count { + pos = skip_bytes_field(data, pos)?; + pos = skip_bytes_field(data, pos)?; + } + } + // Consumer groups + let group_count = read_u32_raw(data, pos)?; + pos += 4; + for _ in 0..group_count { + pos = skip_bytes_field(data, pos)?; // group name + pos = pos.checked_add(16)?; // last_delivered_id + if pos > data.len() { + return None; + } + let pel_count = read_u32_raw(data, pos)?; + pos += 4; + for _ in 0..pel_count { + pos = pos.checked_add(16)?; // StreamId + if pos > data.len() { + return None; + } + pos = skip_bytes_field(data, pos)?; // consumer name + pos = pos.checked_add(16)?; // delivery_time + delivery_count + if pos > data.len() { + return None; + } + } + let consumer_count = read_u32_raw(data, pos)?; + pos += 4; + for _ in 0..consumer_count { + pos = skip_bytes_field(data, pos)?; // consumer name + pos = pos.checked_add(8)?; // seen_time + if pos > data.len() { + return None; + } + let pending_count = read_u32_raw(data, pos)?; + pos += 4; + for _ in 0..pending_count { + pos = pos.checked_add(16)?; // StreamId + if pos > data.len() { + return None; + } + } + } + } + } + _ => return None, + } + + Some(pos) +} + +/// Read u32 LE from raw bytes without cursor overhead. +#[inline] +fn read_u32_raw(data: &[u8], pos: usize) -> Option { + if pos + 4 > data.len() { + return None; + } + Some(u32::from_le_bytes(data[pos..pos + 4].try_into().ok()?) as usize) +} + +/// Skip a length-prefixed bytes field (4-byte LE length + payload). +#[inline] +fn skip_bytes_field(data: &[u8], pos: usize) -> Option { + let len = read_u32_raw(data, pos)?; + let new_pos = pos.checked_add(4)?.checked_add(len)?; + if new_pos > data.len() { + None + } else { + Some(new_pos) + } +} + +/// Zero-copy variant of read_entry: uses shared Bytes buffer and cached timestamps. +fn read_entry_zero_copy( + cursor: &mut Cursor<&[u8]>, + type_tag: u8, + _shared_buf: &Bytes, + cached_secs: u32, +) -> Result<(Bytes, Entry), MoonError> { + let key = read_bytes(cursor)?; + + let mut ttl_buf = [0u8; 8]; + cursor.read_exact(&mut ttl_buf)?; + let ttl_ms = i64::from_le_bytes(ttl_buf); + let expires_at_ms = if ttl_ms > 0 { ttl_ms as u64 } else { 0 }; + + let value = match type_tag { + TYPE_STRING => { + // Fast path: build CompactValue directly from Vec, skipping RedisValue intermediate. + // This avoids: Vec → Bytes → RedisValue::String → from_redis_value → heap_string_vec + // and instead does: Vec → CompactValue directly (one Box alloc, zero copy). + let vec = read_bytes_vec(cursor)?; + let cv = if vec.len() <= 12 { + crate::storage::compact_value::CompactValue::from_redis_value(RedisValue::String( + Bytes::from(vec), + )) + } else { + crate::storage::compact_value::CompactValue::heap_string_vec_direct(vec) + }; + let mut entry = Entry::new_string(Bytes::new()); + entry.value = cv; + if expires_at_ms > 0 { + entry.set_expires_at_ms(cached_secs, expires_at_ms); + } + entry.set_last_access(cached_secs); + entry.set_access_counter(5); + return Ok((key, entry)); + } + TYPE_HASH => { + let count = read_u32(cursor)? as usize; + validate_count(cursor, count, 8, "hash")?; + let mut map = HashMap::with_capacity(count); + for _ in 0..count { + let field = read_bytes(cursor)?; + let val = read_bytes(cursor)?; + map.insert(field, val); + } + RedisValue::Hash(map) + } + TYPE_LIST => { + let count = read_u32(cursor)? as usize; + validate_count(cursor, count, 4, "list")?; + let mut list = VecDeque::with_capacity(count); + for _ in 0..count { + list.push_back(read_bytes(cursor)?); + } + RedisValue::List(list) + } + TYPE_SET => { + let count = read_u32(cursor)? as usize; + validate_count(cursor, count, 4, "set")?; + let mut set = HashSet::with_capacity(count); + for _ in 0..count { + set.insert(read_bytes(cursor)?); + } + RedisValue::Set(set) + } + TYPE_SORTED_SET => { + let count = read_u32(cursor)? as usize; + validate_count(cursor, count, 12, "sorted_set")?; + let mut members = HashMap::with_capacity(count); + let mut tree = BPTree::new(); + for _ in 0..count { + let member = read_bytes(cursor)?; + let mut score_buf = [0u8; 8]; + cursor.read_exact(&mut score_buf)?; + let score = f64::from_le_bytes(score_buf); + members.insert(member.clone(), score); + tree.insert(OrderedFloat(score), member); + } + RedisValue::SortedSetBPTree { tree, members } + } + TYPE_STREAM => { + // Stream parsing: reuse read_bytes (not zero-copy for this rare type) + let mut entry_count_buf = [0u8; 8]; + cursor.read_exact(&mut entry_count_buf)?; + let entry_count = u64::from_le_bytes(entry_count_buf) as usize; + let mut last_id_ms_buf = [0u8; 8]; + let mut last_id_seq_buf = [0u8; 8]; + cursor.read_exact(&mut last_id_ms_buf)?; + cursor.read_exact(&mut last_id_seq_buf)?; + let last_id = StreamId { + ms: u64::from_le_bytes(last_id_ms_buf), + seq: u64::from_le_bytes(last_id_seq_buf), + }; + let mut stream = StreamData::new(); + stream.last_id = last_id; + validate_count(cursor, entry_count, 20, "stream_entries")?; + for _ in 0..entry_count { + let mut ms_buf = [0u8; 8]; + let mut seq_buf = [0u8; 8]; + cursor.read_exact(&mut ms_buf)?; + cursor.read_exact(&mut seq_buf)?; + let id = StreamId { + ms: u64::from_le_bytes(ms_buf), + seq: u64::from_le_bytes(seq_buf), + }; + let field_count = read_u32(cursor)? as usize; + validate_count(cursor, field_count, 8, "stream_fields")?; + let mut fields = Vec::with_capacity(field_count); + for _ in 0..field_count { + fields.push((read_bytes(cursor)?, read_bytes(cursor)?)); + } + stream.entries.insert(id, fields); + stream.length += 1; + } + let group_count = read_u32(cursor)? as usize; + for _ in 0..group_count { + let group_name = read_bytes(cursor)?; + let mut gld_ms = [0u8; 8]; + let mut gld_seq = [0u8; 8]; + cursor.read_exact(&mut gld_ms)?; + cursor.read_exact(&mut gld_seq)?; + let last_delivered_id = StreamId { + ms: u64::from_le_bytes(gld_ms), + seq: u64::from_le_bytes(gld_seq), + }; + let pel_count = read_u32(cursor)? as usize; + let mut pel = BTreeMap::new(); + for _ in 0..pel_count { + let mut pid_ms = [0u8; 8]; + let mut pid_seq = [0u8; 8]; + cursor.read_exact(&mut pid_ms)?; + cursor.read_exact(&mut pid_seq)?; + let pid = StreamId { + ms: u64::from_le_bytes(pid_ms), + seq: u64::from_le_bytes(pid_seq), + }; + let consumer_name = read_bytes(cursor)?; + let mut dt_buf = [0u8; 8]; + let mut dc_buf = [0u8; 8]; + cursor.read_exact(&mut dt_buf)?; + cursor.read_exact(&mut dc_buf)?; + pel.insert( + pid, + crate::storage::stream::PendingEntry { + consumer: consumer_name, + delivery_time: u64::from_le_bytes(dt_buf), + delivery_count: u64::from_le_bytes(dc_buf), + }, + ); + } + let consumer_count = read_u32(cursor)? as usize; + let mut consumers = HashMap::new(); + for _ in 0..consumer_count { + let cname = read_bytes(cursor)?; + let mut st_buf = [0u8; 8]; + cursor.read_exact(&mut st_buf)?; + let seen_time = u64::from_le_bytes(st_buf); + let pending_count = read_u32(cursor)? as usize; + let mut pending = BTreeMap::new(); + for _ in 0..pending_count { + let mut cid_ms = [0u8; 8]; + let mut cid_seq = [0u8; 8]; + cursor.read_exact(&mut cid_ms)?; + cursor.read_exact(&mut cid_seq)?; + pending.insert( + StreamId { + ms: u64::from_le_bytes(cid_ms), + seq: u64::from_le_bytes(cid_seq), + }, + (), + ); + } + consumers.insert( + cname.clone(), + crate::storage::stream::Consumer { + name: cname, + pending, + seen_time, + }, + ); + } + stream.groups.insert( + group_name, + crate::storage::stream::ConsumerGroup { + last_delivered_id, + pel, + consumers, + }, + ); + } + RedisValue::Stream(Box::new(stream)) + } + _ => return Err(RdbError::UnsupportedType { type_tag }.into()), + }; + + let mut entry = Entry::new_string(Bytes::new()); + entry.value = crate::storage::compact_value::CompactValue::from_redis_value(value); + if expires_at_ms > 0 { + entry.set_expires_at_ms(cached_secs, expires_at_ms); + } + entry.set_last_access(cached_secs); + entry.set_access_counter(5); + + Ok((key, entry)) +} + +/// Load an RDB snapshot from a byte slice (for AOF RDB-preamble format). +/// +/// Returns `(keys_loaded, bytes_consumed)`. The caller can use `bytes_consumed` +/// to find the start of any RESP commands appended after the RDB preamble. +pub fn load_from_bytes( + databases: &mut [Database], + data: &[u8], +) -> Result<(usize, usize), MoonError> { + if data.len() < RDB_MAGIC.len() + 1 + 1 + 4 { + return Err(RdbError::Corrupted { + detail: "RDB preamble too small".into(), + } + .into()); + } + + // Find EOF_MARKER to determine RDB section length. + // The RDB section is: header + entries + EOF_MARKER(1) + CRC32(4). + // We scan for EOF_MARKER (0xFF) — the first one after the header that's + // immediately followed by a valid CRC32 of the preceding bytes. + let mut rdb_end = None; + // Start scanning after header (MOON + version = 5 bytes) + for i in 5..data.len().saturating_sub(4) { + if data[i] == EOF_MARKER { + let payload = &data[..=i]; // everything up to and including EOF_MARKER + if let Some(checksum_bytes) = data.get(i + 1..i + 5) { + let stored = u32::from_le_bytes([ + checksum_bytes[0], + checksum_bytes[1], + checksum_bytes[2], + checksum_bytes[3], + ]); + let mut hasher = Hasher::new(); + hasher.update(payload); + if hasher.finalize() == stored { + rdb_end = Some(i + 5); // past CRC32 + break; + } + } + } + } + + let rdb_len = rdb_end.ok_or_else(|| { + MoonError::from(RdbError::Corrupted { + detail: "RDB preamble: no valid EOF+CRC found".into(), + }) + })?; + + // Load using the same logic as `load`, but from the byte slice + let payload = &data[..rdb_len - 4]; // exclude CRC32 + let mut cursor = Cursor::new(payload); + + // Skip magic + version + let mut magic = [0u8; 4]; + cursor.read_exact(&mut magic).map_err(|e| RdbError::Io { + path: std::path::PathBuf::from(""), + source: e, + })?; + if &magic != RDB_MAGIC { + return Err(RdbError::Corrupted { + detail: "invalid RDB magic in AOF preamble".into(), + } + .into()); + } + let mut version = [0u8; 1]; + cursor.read_exact(&mut version).map_err(|e| RdbError::Io { + path: std::path::PathBuf::from(""), + source: e, + })?; + if version[0] != RDB_VERSION { + return Err(RdbError::UnsupportedVersion { + version: version[0] as u32, + } + .into()); + } + + let now_ms = current_time_ms(); + let now_secs = (now_ms / 1000) as u32; + let shared_buf = Bytes::copy_from_slice(data); + let mut total_keys = 0usize; + let mut current_db: usize = 0; + + // Pre-size DashTables + let entry_counts = count_entries_per_db(&cursor, databases.len()); + for (db_idx, &count) in entry_counts.iter().enumerate() { + if count > 0 && db_idx < databases.len() { + databases[db_idx].reserve(count); + } + } + + loop { + let mut tag = [0u8; 1]; + if cursor.read_exact(&mut tag).is_err() { + break; + } + match tag[0] { + EOF_MARKER => break, + DB_SELECTOR => { + let mut db_idx = [0u8; 1]; + cursor.read_exact(&mut db_idx).map_err(|e| RdbError::Io { + path: std::path::PathBuf::from(""), + source: e, + })?; + current_db = db_idx[0] as usize; + if current_db >= databases.len() { + return Err(RdbError::Corrupted { + detail: format!( + "RDB preamble references database {} but only {} configured", + current_db, + databases.len() + ), + } + .into()); + } + } + type_tag => match read_entry_zero_copy(&mut cursor, type_tag, &shared_buf, now_secs) { + Ok((key, entry)) => { + if entry.has_expiry() && entry.is_expired_at(now_secs, now_ms) { + continue; + } + if current_db < databases.len() { + databases[current_db].insert_for_load(key, entry); + total_keys += 1; + } + } + Err(_) => break, + }, + } + } + + for db in databases.iter_mut() { + db.recalculate_memory(); + } + + Ok((total_keys, rdb_len)) +} + /// Distribute keys from loaded databases to the correct per-shard databases. /// /// After loading an RDB file into temporary databases, this function routes each key @@ -756,7 +1315,8 @@ fn validate_count( pub(crate) fn read_bytes(cursor: &mut Cursor<&[u8]>) -> Result { let len = read_u32(cursor)? as usize; - let remaining = cursor.get_ref().len() - cursor.position() as usize; + let pos = cursor.position() as usize; + let remaining = cursor.get_ref().len() - pos; if len > remaining { return Err(RdbError::Corrupted { detail: format!( @@ -766,9 +1326,51 @@ pub(crate) fn read_bytes(cursor: &mut Cursor<&[u8]>) -> Result } .into()); } - let mut data = vec![0u8; len]; - cursor.read_exact(&mut data)?; - Ok(Bytes::from(data)) + let slice = &cursor.get_ref()[pos..pos + len]; + cursor.set_position((pos + len) as u64); + Ok(Bytes::copy_from_slice(slice)) +} + +/// Read bytes as owned Vec — avoids Bytes intermediate for RDB load path. +/// Single allocation directly to the right size, no refcount overhead. +pub(crate) fn read_bytes_vec(cursor: &mut Cursor<&[u8]>) -> Result, MoonError> { + let len = read_u32(cursor)? as usize; + let pos = cursor.position() as usize; + let remaining = cursor.get_ref().len() - pos; + if len > remaining { + return Err(RdbError::Corrupted { + detail: format!( + "read_bytes_vec: length {} exceeds remaining {}", + len, remaining + ), + } + .into()); + } + let slice = &cursor.get_ref()[pos..pos + len]; + cursor.set_position((pos + len) as u64); + Ok(slice.to_vec()) +} + +/// Zero-copy read: returns a `Bytes` slice of the shared buffer (no heap alloc). +#[allow(dead_code)] +pub(crate) fn read_bytes_zero_copy( + cursor: &mut Cursor<&[u8]>, + shared_buf: &Bytes, +) -> Result { + let len = read_u32(cursor)? as usize; + let pos = cursor.position() as usize; + let remaining = cursor.get_ref().len() - pos; + if len > remaining { + return Err(RdbError::Corrupted { + detail: format!( + "read_bytes_zero_copy: length {} exceeds remaining {}", + len, remaining + ), + } + .into()); + } + cursor.set_position((pos + len) as u64); + Ok(shared_buf.slice(pos..pos + len)) } pub(crate) fn read_u32(cursor: &mut Cursor<&[u8]>) -> Result { diff --git a/src/server/conn/handler_monoio.rs b/src/server/conn/handler_monoio.rs index a97856d1..5186830c 100644 --- a/src/server/conn/handler_monoio.rs +++ b/src/server/conn/handler_monoio.rs @@ -1204,6 +1204,18 @@ pub async fn handle_connection_sharded_monoio< responses.push(crate::command::persistence::handle_lastsave()); continue; } + // BGREWRITEAOF -- multi-part AOF rewrite + if cmd.eq_ignore_ascii_case(b"BGREWRITEAOF") { + if let Some(ref tx) = aof_tx { + responses.push(crate::command::persistence::bgrewriteaof_start_sharded( + tx, + shard_databases.clone(), + )); + } else { + responses.push(Frame::Error(Bytes::from_static(b"ERR AOF is not enabled"))); + } + continue; + } // === ACL permission check (NOPERM gate) === // Exempt commands (AUTH, HELLO, QUIT, ACL) already handled above. diff --git a/src/server/conn/handler_sharded.rs b/src/server/conn/handler_sharded.rs index 404643f8..6be205ca 100644 --- a/src/server/conn/handler_sharded.rs +++ b/src/server/conn/handler_sharded.rs @@ -1214,6 +1214,13 @@ pub async fn handle_connection_sharded_inner< continue; } + // --- MULTI queue mode --- + if in_multi { + command_queue.push(frame); + responses.push(Frame::SimpleString(Bytes::from_static(b"QUEUED"))); + continue; + } + // --- BGSAVE --- if cmd.eq_ignore_ascii_case(b"BGSAVE") { responses.push(crate::command::persistence::bgsave_start_sharded(&snapshot_trigger_tx, num_shards)); @@ -1227,11 +1234,17 @@ pub async fn handle_connection_sharded_inner< responses.push(crate::command::persistence::handle_lastsave()); continue; } - - // --- MULTI queue mode --- - if in_multi { - command_queue.push(frame); - responses.push(Frame::SimpleString(Bytes::from_static(b"QUEUED"))); + if cmd.eq_ignore_ascii_case(b"BGREWRITEAOF") { + if let Some(ref tx) = aof_tx { + responses.push(crate::command::persistence::bgrewriteaof_start_sharded( + tx, + shard_databases.clone(), + )); + } else { + responses.push(Frame::Error(Bytes::from_static( + b"ERR AOF is not enabled", + ))); + } continue; } diff --git a/src/shard/shared_databases.rs b/src/shard/shared_databases.rs index 663ee418..b299ad27 100644 --- a/src/shard/shared_databases.rs +++ b/src/shard/shared_databases.rs @@ -138,6 +138,13 @@ impl ShardDatabases { self.num_shards } + /// Return a reference to all databases across all shards (for AOF rewrite). + /// Callers iterate shards × dbs and acquire read locks individually. + #[inline] + pub fn all_shard_dbs(&self) -> &[Vec>] { + &self.shards + } + /// Number of databases per shard (typically 16). #[inline] pub fn db_count(&self) -> usize { diff --git a/src/storage/compact_value.rs b/src/storage/compact_value.rs index d39a88cf..e830b348 100644 --- a/src/storage/compact_value.rs +++ b/src/storage/compact_value.rs @@ -42,7 +42,7 @@ const HEAP_TAG_MASK: usize = 0x7; /// Thin wrapper for heap-allocated strings. /// At 24 bytes (Vec), this is smaller than RedisValue::String(Bytes) (~40 bytes) -/// and avoids the enum discriminant overhead. +/// and avoids the enum discriminant + refcount overhead. struct HeapString(Vec); /// Borrowed view of a CompactValue, for zero-copy read access. @@ -139,16 +139,13 @@ impl CompactValue { /// `RedisValue` enum wrapper (~40B savings per heap string). /// Collections are still stored as `Box`. pub fn from_redis_value(value: RedisValue) -> Self { - match &value { - RedisValue::String(s) if s.len() <= SSO_MAX_LEN => { - return Self::inline_string(s); - } - _ => {} - } - - // String heap path: store as Box<[u8]> directly (no RedisValue wrapper) - if let RedisValue::String(s) = &value { - return Self::heap_string(s); + // String fast path: inline SSO or zero-copy owned Bytes + if let RedisValue::String(s) = value { + return if s.len() <= SSO_MAX_LEN { + Self::inline_string(&s) + } else { + Self::heap_string_owned(s) + }; } // Collection heap path: store as Box @@ -183,10 +180,29 @@ impl CompactValue { } } - /// Create a heap-allocated string CompactValue from byte data. - /// Stores as `Box` — eliminates RedisValue enum wrapper. - /// HeapString is 24 bytes (Vec) vs RedisValue::String(Bytes) at ~40 bytes. + /// Create a heap-allocated string CompactValue from a byte slice (copies data). pub fn heap_string(data: &[u8]) -> Self { + Self::heap_string_vec(data.to_vec()) + } + + /// Create from owned Bytes (converts to Vec via Bytes::into for zero-copy + /// when Bytes has unique ownership, or copies when shared). + pub fn heap_string_owned(data: Bytes) -> Self { + // Bytes::into::> is zero-copy when refcount == 1, copies otherwise + Self::heap_string_vec(data.into()) + } + + /// Create from an owned Vec directly — no copy, no refcount. + /// This is the fastest path: one Box allocation for the HeapString wrapper. + /// Public for RDB loader fast path. + pub fn heap_string_vec_direct(data: Vec) -> Self { + if data.len() <= SSO_MAX_LEN { + return Self::inline_string(&data); + } + Self::heap_string_vec(data) + } + + fn heap_string_vec(data: Vec) -> Self { debug_assert!(data.len() > SSO_MAX_LEN); let str_len = data.len(); @@ -194,7 +210,7 @@ impl CompactValue { let copy_len = str_len.min(4); prefix[..copy_len].copy_from_slice(&data[..copy_len]); - let hs = Box::new(HeapString(data.to_vec())); + let hs = Box::new(HeapString(data)); let raw_ptr = Box::into_raw(hs) as usize; debug_assert!( raw_ptr & HEAP_TAG_MASK == 0, @@ -322,6 +338,7 @@ impl CompactValue { /// Get a mutable reference to the heap string bytes. /// Returns None for non-string types and inline values. + /// Note: returns `Option<&mut Vec>` for the underlying byte buffer. pub fn as_bytes_mut(&mut self) -> Option<&mut Vec> { if self.is_inline() { None diff --git a/src/storage/db.rs b/src/storage/db.rs index 27b5176a..a6455c65 100644 --- a/src/storage/db.rs +++ b/src/storage/db.rs @@ -434,6 +434,33 @@ impl Database { self.data.insert(CompactKey::from(key), entry); } + /// Bulk-load insert: skip duplicate check, version tracking, and per-key memory accounting. + /// + /// Used exclusively during RDB/AOF restore where keys are guaranteed unique and + /// we recalculate `used_memory` once after the entire load completes. + #[inline] + pub fn insert_for_load(&mut self, key: Bytes, entry: Entry) { + self.data.insert(CompactKey::from(key), entry); + } + + /// Recalculate `used_memory` by scanning all entries. Call once after bulk load. + pub fn recalculate_memory(&mut self) { + let mut total = 0usize; + for (key, entry) in self.data.iter() { + total += entry_overhead(key.as_bytes(), entry); + } + self.used_memory = total; + } + + /// Pre-size the internal hash table for an expected key count. + /// Eliminates segment splits during bulk load. + pub fn reserve(&mut self, additional: usize) { + if additional > self.data.len() { + let new_table = DashTable::with_capacity(additional); + self.data = new_table; + } + } + /// Remove a key and return its entry. No expiry check needed (DEL removes regardless). pub fn remove(&mut self, key: &[u8]) -> Option { if let Some(entry) = self.data.remove(key) { From ca2ec51ee569235bbf4d44ab28253879f65b3eff Mon Sep 17 00:00:00 2001 From: Tin Dang Date: Wed, 8 Apr 2026 23:03:51 +0700 Subject: [PATCH 2/6] fix(aof): correctness issues in multi-part rewrite and manifest loading MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses senior-rust-engineer review of #63. Six fixes across P0/P1: 1. AofManifest::load returns Result, io::Error> Previous silent-None-on-corruption caused main.rs to call initialize() and overwrite the corrupt manifest, destroying the reference to the real base RDB and losing all persisted data. Corrupt manifest is now fatal at startup. 2. Orphan cleanup on manifest load Scans appendonlydir/ for stray moon.aof.{N}.{base.rdb,incr.aof, *.tmp} with N != current seq and deletes them. Previously a crash between advance() phases 1-3 left zombie base RDBs that never got referenced by any manifest, filling disk over repeated failures. 3. replay_incr_resp: fail-hard on parse error Previous impl did buf.split_to(1) + scan for next '*' byte, silently dropping runs of corrupt commands. '*' can legitimately appear inside bulk-string payloads, so resync was unsound. Recovery of a corrupt incr log is now an error, not silent data loss. 4. Rewrite ordering: drain + lock + snapshot + drain P0 bug: non-idempotent commands (INCR, LPUSH, SADD, ZADD, HINCRBY, APPEND, etc.) were double-applied on recovery after BGREWRITEAOF. The handler applies the write to the DB synchronously, then sends an AofMessage::Append asynchronously. During rewrite, appends queue in the channel while the writer thread is in do_rewrite_*. After rewrite, queued appends are processed into the NEW incr. If the write happened BEFORE the snapshot captured its shard, the write is both in base AND in new incr → replay double-applies. Fix: in do_rewrite_single/sharded, (a) drain the channel to the old incr and fsync, (b) acquire write locks on all (shard, db) pairs simultaneously, (c) drain once more to catch appends completed between step a and step b, (d) snapshot under the locks, (e) release locks, (f) write new base + advance manifest + reopen. Invariant: any write captured in the new base is NOT in the new incr (handlers were blocked), and any write NOT in the new base IS in the new incr (queued after lock release). Creates a brief global write pause during snapshot — acceptable cost for correctness. 5. AOF writer honors corrupt-manifest error Writer thread exits with a loud log instead of spinning on load() forever when the manifest is corrupt, so server startup fails fast. 6. Database::reserve debug_assert empty table Previous impl silently replaced the DashTable regardless of current contents — caller who misused reserve() on a populated database would lose all data without warning. Debug assertion catches the misuse in tests. Validation (moon-dev OrbStack): cargo fmt --check ok cargo clippy --release -- -D warnings ok cargo clippy --no-default-features --features runtime-tokio,jemalloc -- -D warnings ok cargo test --release --lib 1858 pass (test_inline_set* pre-existing failures on main, unrelated) --- src/main.rs | 43 +++++--- src/persistence/aof.rs | 187 +++++++++++++++++++++++++++++--- src/persistence/aof_manifest.rs | 132 +++++++++++++++++----- src/storage/db.rs | 14 ++- 4 files changed, 312 insertions(+), 64 deletions(-) diff --git a/src/main.rs b/src/main.rs index f57dd028..0439c491 100644 --- a/src/main.rs +++ b/src/main.rs @@ -243,36 +243,45 @@ fn main() -> anyhow::Result<()> { // Multi-part AOF replay layered on top of v2/v3 recovery. // Priority: if appendonlydir/ manifest exists → load multi-part (skip legacy v2 fallback). // Otherwise v2 already handled legacy appendonly.aof during restore_from_persistence. + // + // A corrupt manifest is FATAL: overwriting it silently destroys the reference + // to the real base RDB and loses all persisted data. if config.appendonly == "yes" { if let Some(ref dir) = persistence_dir { + use anyhow::Context; use moon::persistence::aof_manifest::AofManifest; use moon::persistence::replay::DispatchReplayEngine; let base_dir = std::path::PathBuf::from(dir); - if let Some(manifest) = AofManifest::load(&base_dir) { + let manifest_opt = AofManifest::load(&base_dir).with_context(|| { + format!( + "AOF manifest at {}/appendonlydir/ is corrupt; refusing to start to avoid data loss. Inspect manually before deleting.", + base_dir.display() + ) + })?; + if let Some(ref manifest) = manifest_opt { if num_shards == 1 { - match moon::persistence::aof_manifest::replay_multi_part( + let loaded = moon::persistence::aof_manifest::replay_multi_part( &mut shards[0].databases, - &manifest, + manifest, &DispatchReplayEngine, - ) { - Ok(n) => info!( - "AOF multi-part loaded (seq {}): {} entries", - manifest.seq, n - ), - Err(e) => tracing::error!("Multi-part AOF load failed: {}", e), - } + ) + .with_context(|| "multi-part AOF replay failed")?; + info!( + "AOF multi-part loaded (seq {}): {} entries", + manifest.seq, loaded + ); } else { tracing::warn!( "Multi-part AOF skipped in multi-shard mode (not yet supported)" ); } - } - // Initialize manifest for writer thread (safe — recovery is complete). - // On fresh/legacy upgrade, writer is blocked waiting for this file. - if AofManifest::load(&base_dir).is_none() { - if let Err(e) = AofManifest::initialize(&base_dir) { - tracing::error!("Failed to initialize AOF manifest: {}", e); - } + } else { + // No manifest present — first boot after upgrade from legacy + // single-file AOF (v2 recovery already loaded it above) or + // fresh install. Initialize now so the writer thread can + // unblock and BGREWRITEAOF can advance from seq 1. + AofManifest::initialize(&base_dir) + .with_context(|| "failed to initialize AOF manifest")?; } } } diff --git a/src/persistence/aof.rs b/src/persistence/aof.rs index 5caeb4d1..bd8de088 100644 --- a/src/persistence/aof.rs +++ b/src/persistence/aof.rs @@ -126,12 +126,25 @@ pub async fn aof_writer_task( // main.rs recovery runs concurrently and must finish before a manifest // is created, to avoid racing against legacy single-file AOF detection. // main.rs will create the manifest after recovery completes. + // + // A corrupt manifest is fatal — exit the writer so the server startup + // notices and fails loud rather than silently overwriting. let mut manifest = loop { - if let Some(m) = AofManifest::load(&base_dir) { - break m; + match AofManifest::load(&base_dir) { + Ok(Some(m)) => break m, + Ok(None) => { + // main.rs recovery hasn't created the manifest yet — wait. + std::thread::sleep(std::time::Duration::from_millis(50)); + } + Err(e) => { + error!( + "AOF manifest corrupt at {}: {}. Writer exiting; persistence disabled.", + base_dir.display(), + e + ); + return; + } } - // main.rs recovery hasn't created the manifest yet — wait. - std::thread::sleep(std::time::Duration::from_millis(50)); }; // Open the current incremental file for appending @@ -213,7 +226,7 @@ pub async fn aof_writer_task( error!("AOF pre-rewrite sync failed (seq {}): {}", manifest.seq, e); } } - match do_rewrite_single(&db, &mut manifest, &mut file) { + match do_rewrite_single(&db, &mut manifest, &mut file, &rx) { Ok(()) => { write_error = false; // Reset on successful rewrite } @@ -226,7 +239,7 @@ pub async fn aof_writer_task( error!("AOF pre-rewrite sync failed (seq {}): {}", manifest.seq, e); } } - match do_rewrite_sharded(&shard_dbs, &mut manifest, &mut file) { + match do_rewrite_sharded(&shard_dbs, &mut manifest, &mut file, &rx) { Ok(()) => { write_error = false; } @@ -683,38 +696,124 @@ fn snapshot_and_generate(db: &SharedDatabases) -> BytesMut { generate_rewrite_commands(&temp_dbs) } +/// Drain any queued `AofMessage::Append` messages to the current incr file. +/// +/// Called during rewrite to catch in-flight appends that handlers sent before +/// the writer thread could enter the rewrite routine. Messages of other variants +/// are dropped silently (duplicate rewrites while a rewrite is in progress) or +/// returned via the flag for Shutdown (caller is responsible for honoring it +/// after the rewrite completes). +#[cfg(feature = "runtime-monoio")] +#[derive(Default)] +struct DrainOutcome { + drained: usize, + shutdown_requested: bool, +} + +#[cfg(feature = "runtime-monoio")] +fn drain_pending_appends( + rx: &channel::MpscReceiver, + file: &mut std::fs::File, +) -> Result { + use std::io::Write; + let mut outcome = DrainOutcome::default(); + while let Ok(msg) = rx.try_recv() { + match msg { + AofMessage::Append(data) => { + file.write_all(&data).map_err(|e| AofError::Io { + path: PathBuf::from(""), + source: e, + })?; + outcome.drained += 1; + } + AofMessage::Shutdown => { + outcome.shutdown_requested = true; + } + AofMessage::Rewrite(_) | AofMessage::RewriteSharded(_) => { + // Already rewriting — drop redundant request. + } + } + } + Ok(outcome) +} + /// Multi-part rewrite: snapshot single-shard databases → RDB base → advance manifest. +/// +/// Correctness ordering (prevents double-apply of non-idempotent commands like +/// INCR/LPUSH/SADD after rewrite): +/// +/// 1. Drain any queued appends into the OLD incr file and fsync. +/// 2. Acquire write locks on all databases in the shard. This blocks handlers +/// from applying new writes or queueing new appends for the locked dbs. +/// 3. Drain the channel once more — catches appends for writes that the +/// handler completed between step 1 and step 2. +/// 4. Snapshot every database under the write locks. Because no handler can +/// mutate the dbs while we hold the locks, the snapshot is atomic with +/// respect to the post-drain channel state. +/// 5. Release the write locks. New handler writes from here on queue in the +/// channel and will be processed into the NEW incr file after rotation. +/// 6. Write the new base RDB, advance the manifest, reopen the file handle. +/// +/// Invariant: any write captured in the new base is NOT in the new incr file +/// (handlers were blocked between drain and snapshot), and any write NOT in +/// the new base IS in the new incr file (queued after lock release). #[cfg(feature = "runtime-monoio")] fn do_rewrite_single( db: &SharedDatabases, manifest: &mut crate::persistence::aof_manifest::AofManifest, file: &mut std::fs::File, + rx: &channel::MpscReceiver, ) -> Result<(), MoonError> { - // Capture (entries, base_ts) per database — preserves original base_ts for correct TTL. + // Phase 1: drain pre-rewrite queued appends into old incr, fsync. + let pre_drain = drain_pending_appends(rx, file)?; + file.sync_data().map_err(|e| AofError::Io { + path: manifest.incr_path(), + source: e, + })?; + + // Phase 2: acquire write locks on every database in the shard. + // Order is consistent (index-ascending) so concurrent callers would + // serialize without deadlock — but in practice only this thread + // acquires multi-db locks. + let guards: Vec<_> = db.iter().map(|lock| lock.write()).collect(); + + // Phase 3: drain any appends the handlers sent between phase 1 and phase 2. + let mid_drain = drain_pending_appends(rx, file)?; + file.sync_data().map_err(|e| AofError::Io { + path: manifest.incr_path(), + source: e, + })?; + + // Phase 4: snapshot under the write locks. No mutation is possible. + let now_ms = current_time_ms(); let snapshot: Vec<( Vec<( crate::storage::compact_key::CompactKey, crate::storage::entry::Entry, )>, u32, - )> = db + )> = guards .iter() - .map(|lock| { - let guard = lock.read(); + .map(|guard| { let base_ts = guard.base_timestamp(); - let entries = guard + let entries: Vec<_> = guard .data() .iter() + .filter(|(_, v)| !v.is_expired_at(base_ts, now_ms)) .map(|(k, v)| (k.clone(), v.clone())) .collect(); (entries, base_ts) }) .collect(); + // Phase 5: release locks. Handlers resume; new appends queue in the channel + // and will be processed into the new incr after step 6. + drop(guards); + + // Phase 6: write new base, advance manifest, reopen. let rdb_bytes = crate::persistence::rdb::save_snapshot_to_bytes(&snapshot)?; let new_incr = manifest.advance(&rdb_bytes)?; - // Switch writer to new incr file *file = std::fs::OpenOptions::new() .create(true) .append(true) @@ -724,17 +823,60 @@ fn do_rewrite_single( source: e, })?; + info!( + "AOF rewrite complete (single): drained {}+{} pre-snapshot appends, seq={}", + pre_drain.drained, mid_drain.drained, manifest.seq + ); + if pre_drain.shutdown_requested || mid_drain.shutdown_requested { + // Caller doesn't currently observe this; logging is the escape hatch. + warn!("AOF writer: shutdown requested during rewrite (will honor on next recv)"); + } Ok(()) } /// Multi-part rewrite: snapshot all shards → merged RDB base → advance manifest. +/// +/// See [`do_rewrite_single`] for the ordering rationale. The multi-shard variant +/// holds write locks on every (shard, db) pair simultaneously for the duration +/// of the snapshot. This creates a brief global write pause, but it is the only +/// way to guarantee a torn-free snapshot without per-message sequence numbers. #[cfg(feature = "runtime-monoio")] fn do_rewrite_sharded( shard_dbs: &crate::shard::shared_databases::ShardDatabases, manifest: &mut crate::persistence::aof_manifest::AofManifest, file: &mut std::fs::File, + rx: &channel::MpscReceiver, ) -> Result<(), MoonError> { - // Capture (entries, base_ts) per merged database, preserving original base_ts for TTL. + // Phase 1: drain pre-rewrite queued appends into old incr. + let pre_drain = drain_pending_appends(rx, file)?; + file.sync_data().map_err(|e| AofError::Io { + path: manifest.incr_path(), + source: e, + })?; + + // Phase 2: acquire write locks on ALL (shard, db) pairs simultaneously. + // Lock order is (shard_idx, db_idx) ascending — must match anywhere else + // that acquires multiple locks to prevent deadlock (currently no other + // call site does, but the ordering discipline is documented for future + // maintainers). + let all_shards = shard_dbs.all_shard_dbs(); + let mut guards: Vec> = Vec::with_capacity(all_shards.len()); + for shard_locks in all_shards { + let mut shard_guards = Vec::with_capacity(shard_locks.len()); + for lock in shard_locks { + shard_guards.push(lock.write()); + } + guards.push(shard_guards); + } + + // Phase 3: drain appends completed between phase 1 and phase 2. + let mid_drain = drain_pending_appends(rx, file)?; + file.sync_data().map_err(|e| AofError::Io { + path: manifest.incr_path(), + source: e, + })?; + + // Phase 4: snapshot under locks. let db_count = shard_dbs.db_count(); let mut merged: Vec<( Vec<( @@ -743,12 +885,10 @@ fn do_rewrite_sharded( )>, u32, )> = (0..db_count).map(|_| (Vec::new(), 0u32)).collect(); - - for shard_locks in shard_dbs.all_shard_dbs() { - for (db_idx, lock) in shard_locks.iter().enumerate() { - let guard = lock.read(); + let now_ms = current_time_ms(); + for shard_guards in &guards { + for (db_idx, guard) in shard_guards.iter().enumerate() { let base_ts = guard.base_timestamp(); - let now_ms = current_time_ms(); if merged[db_idx].0.is_empty() { merged[db_idx].1 = base_ts; } @@ -760,6 +900,10 @@ fn do_rewrite_sharded( } } + // Phase 5: release locks before the expensive disk write. + drop(guards); + + // Phase 6: write new base, advance manifest, reopen. let rdb_bytes = crate::persistence::rdb::save_snapshot_to_bytes(&merged)?; let new_incr = manifest.advance(&rdb_bytes)?; @@ -772,6 +916,13 @@ fn do_rewrite_sharded( source: e, })?; + info!( + "AOF rewrite complete (sharded): drained {}+{} pre-snapshot appends, seq={}", + pre_drain.drained, mid_drain.drained, manifest.seq + ); + if pre_drain.shutdown_requested || mid_drain.shutdown_requested { + warn!("AOF writer: shutdown requested during rewrite (will honor on next recv)"); + } Ok(()) } diff --git a/src/persistence/aof_manifest.rs b/src/persistence/aof_manifest.rs index 84b884a8..34a54974 100644 --- a/src/persistence/aof_manifest.rs +++ b/src/persistence/aof_manifest.rs @@ -73,22 +73,23 @@ impl AofManifest { Ok(manifest) } - /// Load manifest from disk. Returns `None` if manifest doesn't exist. - pub fn load(dir: &Path) -> Option { + /// Load manifest from disk. + /// + /// Returns: + /// - `Ok(None)` — manifest file does not exist (fresh install or legacy single-file AOF) + /// - `Ok(Some(manifest))` — manifest loaded successfully + /// - `Err(_)` — manifest file exists but is unreadable or corrupt. + /// Callers MUST treat this as fatal: overwriting a corrupt manifest with a + /// fresh one silently destroys the reference to the real base RDB and loses data. + pub fn load(dir: &Path) -> std::io::Result> { let aof_dir = dir.join(AOF_DIR_NAME); let manifest_path = aof_dir.join(MANIFEST_NAME); if !manifest_path.exists() { - return None; + return Ok(None); } - let content = match std::fs::read_to_string(&manifest_path) { - Ok(c) => c, - Err(e) => { - error!("Failed to read AOF manifest: {}", e); - return None; - } - }; + let content = std::fs::read_to_string(&manifest_path)?; let mut seq = 0u64; for line in content.lines() { @@ -101,14 +102,67 @@ impl AofManifest { } if seq == 0 { - error!("AOF manifest has no valid sequence number"); - return None; + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + format!( + "AOF manifest at {} has no valid sequence number", + manifest_path.display() + ), + )); } - Some(Self { + let manifest = Self { dir: dir.to_path_buf(), seq, - }) + }; + + // Best-effort orphan cleanup: delete stray base/incr files from aborted + // rewrites. A crash between advance() steps 1-3 leaves a new base RDB on + // disk that the active manifest never references. Without this sweep, + // repeated crashes during rewrite can fill the disk with zombie files. + manifest.cleanup_orphans(); + + Ok(Some(manifest)) + } + + /// Delete any base/incr files in `appendonlydir/` that do not match the + /// current sequence. Best-effort — logs but does not propagate errors. + fn cleanup_orphans(&self) { + let aof_dir = self.aof_dir(); + let entries = match std::fs::read_dir(&aof_dir) { + Ok(e) => e, + Err(_) => return, + }; + let current_base = format!("moon.aof.{}.base.rdb", self.seq); + let current_incr = format!("moon.aof.{}.incr.aof", self.seq); + for entry in entries.flatten() { + let name = entry.file_name(); + let name_str = match name.to_str() { + Some(s) => s, + None => continue, + }; + // Keep manifest, current base, current incr. Delete any other moon.aof.*. + if name_str == MANIFEST_NAME || name_str == current_base || name_str == current_incr { + continue; + } + let is_moon_aof = name_str.starts_with("moon.aof.") + && (name_str.ends_with(".base.rdb") + || name_str.ends_with(".incr.aof") + || name_str.ends_with(".rdb.tmp") + || name_str.ends_with(".tmp")); + if !is_moon_aof { + continue; + } + let path = entry.path(); + match std::fs::remove_file(&path) { + Ok(()) => info!("AOF orphan cleanup: removed {}", path.display()), + Err(e) => warn!( + "AOF orphan cleanup: failed to remove {}: {}", + path.display(), + e + ), + } + } } /// Write the manifest file atomically (write tmp + rename). @@ -248,6 +302,14 @@ pub fn replay_multi_part( } /// Replay pure RESP commands from a byte slice. +/// +/// **Corruption handling:** On mid-stream parse errors this returns an error +/// rather than silently resyncing to the next `*` byte. Silent resync in a +/// multi-part AOF is dangerous: an undetected run of dropped commands leaves +/// the database in an inconsistent state that cannot be reconstructed. +/// Truncated tails (parser returns `Ok(None)` with bytes remaining) are +/// logged and treated as the legitimate end of the incremental log, matching +/// `replay_aof` semantics for crash-time tail truncation. fn replay_incr_resp( databases: &mut [crate::storage::Database], data: &[u8], @@ -273,16 +335,30 @@ fn replay_incr_resp( let name = match &arr[0] { Frame::BulkString(s) => s.as_ref(), Frame::SimpleString(s) => s.as_ref(), - _ => { - count += 1; - continue; + other => { + return Err(crate::error::MoonError::from( + crate::error::AofError::RewriteFailed { + detail: format!( + "AOF incr command at offset {} has non-string name frame: {:?}", + total_len - buf.len(), + std::mem::discriminant(other) + ), + }, + )); } }; (name as &[u8], &arr[1..]) } - _ => { - count += 1; - continue; + other => { + return Err(crate::error::MoonError::from( + crate::error::AofError::RewriteFailed { + detail: format!( + "AOF incr non-array frame at offset {}: {:?}", + total_len - buf.len(), + std::mem::discriminant(other) + ), + }, + )); } }; engine.replay_command(databases, cmd, cmd_args, &mut selected_db); @@ -292,20 +368,20 @@ fn replay_incr_resp( if !buf.is_empty() { let offset = total_len - buf.len(); warn!( - "AOF incr truncated: {} bytes at offset {}", + "AOF incr truncated tail: {} bytes at offset {} (treating as crash-time EOF)", buf.len(), offset ); } break; } - Err(_) => { - let _ = buf.split_to(1); - if let Some(pos) = buf.iter().position(|&b| b == b'*') { - let _ = buf.split_to(pos); - } else { - break; - } + Err(e) => { + let offset = total_len - buf.len(); + return Err(crate::error::MoonError::from( + crate::error::AofError::RewriteFailed { + detail: format!("AOF incr parse error at offset {}: {:?}", offset, e), + }, + )); } } } diff --git a/src/storage/db.rs b/src/storage/db.rs index a6455c65..5581ca50 100644 --- a/src/storage/db.rs +++ b/src/storage/db.rs @@ -453,8 +453,20 @@ impl Database { } /// Pre-size the internal hash table for an expected key count. - /// Eliminates segment splits during bulk load. + /// + /// WARNING: This REPLACES the internal hash table with a fresh one sized for + /// `additional` entries. It MUST be called on an empty `Database` (typically + /// immediately after `Database::new()` during RDB/AOF bulk load). Calling it + /// on a populated database silently discards all entries. + /// + /// Named `reserve` rather than `reset_with_capacity` to match the plan + /// nomenclature, but the debug assertion guards the misuse case. pub fn reserve(&mut self, additional: usize) { + debug_assert!( + self.data.is_empty(), + "Database::reserve() must only be called on an empty database (bulk-load pre-sizing); called with {} existing entries", + self.data.len() + ); if additional > self.data.len() { let new_table = DashTable::with_capacity(additional); self.data = new_table; From a4f51e289ce4a082e52357d5132da01bd9c7ed69 Mon Sep 17 00:00:00 2001 From: Tin Dang Date: Wed, 8 Apr 2026 23:30:39 +0700 Subject: [PATCH 3/6] fix(aof): legacy upgrade double-replay, base rdb fsync, missing-base guard MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses four real issues from PR #63 review (qodo + coderabbit). Three were P0/P1 correctness bugs; one is correctness-adjacent. 1. Legacy AOF double-replay on upgrade (qodo #2 / coderabbit critical) On first upgrade from legacy single-file AOF, restore_from_persistence replays appendonly.aof into the databases, then main.rs used to call initialize() which created an empty manifest (no base). On the NEXT boot the multi-part replay path ran clear() and then had nothing to load → all legacy state lost. Additionally the legacy file remained on disk, so v2 recovery kept replaying it on subsequent boots, double-applying on top of whatever multi-part state existed. Fix: at first upgrade, if restore_from_persistence loaded any state, serialize it via rdb::save_to_bytes and create the manifest with a real base seq 1 via AofManifest::initialize_with_base(). Rename the legacy appendonly.aof to appendonly.aof.legacy so v2 recovery on the next boot can't find it. Also retire the legacy file after a successful replay_multi_part for the second-boot case. 2. Base RDB not fsynced before manifest publish (qodo #6) AofManifest::advance used std::fs::write + rename, which renames an open file whose contents aren't guaranteed to be on disk. A crash after the manifest write could publish a seq pointing at a base whose contents weren't durable. Fix: explicit File::create + write_all + sync_data + rename. Same pattern applied to initialize_with_base. 3. Multi-part replay clears databases before loading Prevents the double-apply of non-idempotent commands from any state that earlier recovery phases (WAL, legacy AOF) may have loaded. The multi-part AOF is the authoritative source. 4. Missing base + non-empty incr is now an error (coderabbit minor) Previously warned and continued, which would apply deltas on empty state. Now returns an error. Empty-incr case (fresh initialize) is still tolerated. Already addressed in earlier commits (noted in review): - coderabbit: infinite busy-wait on missing manifest — ca2ec51 - coderabbit: silent corruption skip in replay_incr_resp — ca2ec51 - qodo #3: monoio manifest wait can hang on corrupt — ca2ec51 - qodo #4: corrupt manifest reinitialized — ca2ec51 Validation on moon-dev (Linux aarch64): cargo fmt --check ok cargo clippy --release -- -D warnings ok cargo clippy --no-default-features --features runtime-tokio,jemalloc -- -D warnings ok cargo test --release --lib 1858 pass Manual smoke tests passed: 1. BGREWRITEAOF with mixed R/W load — seq advanced, manifest correct 2. Crash-recovery kill -9 mid-rewrite — 3000/3000 keys recovered 3. Double-apply regression: 2000 concurrent INCRs during BGREWRITEAOF — base had snapshot state, incr had remainder, restart counter=2000 4. First-upgrade from legacy appendonly.aof — state captured as base seq 1, legacy file retired, all keys survive next boot without BGREWRITEAOF 5. Corrupt manifest (seq 0) — server refuses to start with clear error --- src/main.rs | 71 +++++++++++++++++++++++++-- src/persistence/aof_manifest.rs | 87 ++++++++++++++++++++++++++++++--- src/storage/db.rs | 11 +++++ 3 files changed, 159 insertions(+), 10 deletions(-) diff --git a/src/main.rs b/src/main.rs index 0439c491..9f01831d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -260,6 +260,14 @@ fn main() -> anyhow::Result<()> { })?; if let Some(ref manifest) = manifest_opt { if num_shards == 1 { + // Multi-part AOF is authoritative. Wipe any state that earlier + // recovery phases (per-shard WAL replay, legacy appendonly.aof + // fallback inside restore_from_persistence) may have loaded — + // otherwise non-idempotent commands from the incr log would + // double-apply on top of that pre-existing state. + for db in shards[0].databases.iter_mut() { + db.clear(); + } let loaded = moon::persistence::aof_manifest::replay_multi_part( &mut shards[0].databases, manifest, @@ -270,6 +278,28 @@ fn main() -> anyhow::Result<()> { "AOF multi-part loaded (seq {}): {} entries", manifest.seq, loaded ); + + // Retire legacy appendonly.aof so future boots don't double- + // replay it via restore_from_persistence's fallback path. + // Rename (not delete) so an operator can recover if something + // went wrong. + let legacy = base_dir.join("appendonly.aof"); + if legacy.exists() { + let retired = base_dir.join("appendonly.aof.legacy"); + if let Err(e) = std::fs::rename(&legacy, &retired) { + tracing::warn!( + "Failed to retire legacy AOF {}: {}", + legacy.display(), + e + ); + } else { + info!( + "Retired legacy AOF {} → {}", + legacy.display(), + retired.display() + ); + } + } } else { tracing::warn!( "Multi-part AOF skipped in multi-shard mode (not yet supported)" @@ -278,10 +308,43 @@ fn main() -> anyhow::Result<()> { } else { // No manifest present — first boot after upgrade from legacy // single-file AOF (v2 recovery already loaded it above) or - // fresh install. Initialize now so the writer thread can - // unblock and BGREWRITEAOF can advance from seq 1. - AofManifest::initialize(&base_dir) - .with_context(|| "failed to initialize AOF manifest")?; + // fresh install. + // + // If restore_from_persistence loaded any state (from WAL or + // legacy appendonly.aof), we MUST capture it as the seq 1 + // base RDB. Otherwise on the next boot the multi-part replay + // path would clear the databases and lose the legacy state. + // Only shard 0 is relevant in single-shard mode (the only + // mode the multi-part path currently supports). + let has_state = + num_shards == 1 && shards[0].databases.iter().any(|db| db.len() > 0); + if has_state { + let rdb_bytes = moon::persistence::rdb::save_to_bytes(&shards[0].databases) + .with_context(|| "failed to serialize legacy state for AOF base")?; + AofManifest::initialize_with_base(&base_dir, &rdb_bytes) + .with_context(|| "failed to initialize AOF manifest with base")?; + info!( + "First-upgrade: captured legacy state as AOF base seq 1 ({} bytes)", + rdb_bytes.len() + ); + // Retire legacy appendonly.aof — its contents are now in + // the base RDB, and leaving it would cause v2 recovery on + // the next boot to double-replay it. + let legacy = base_dir.join("appendonly.aof"); + if legacy.exists() { + let retired = base_dir.join("appendonly.aof.legacy"); + if let Err(e) = std::fs::rename(&legacy, &retired) { + tracing::warn!( + "Failed to retire legacy AOF {}: {}", + legacy.display(), + e + ); + } + } + } else { + AofManifest::initialize(&base_dir) + .with_context(|| "failed to initialize AOF manifest")?; + } } } } diff --git a/src/persistence/aof_manifest.rs b/src/persistence/aof_manifest.rs index 34a54974..b1671e2e 100644 --- a/src/persistence/aof_manifest.rs +++ b/src/persistence/aof_manifest.rs @@ -63,6 +63,11 @@ impl AofManifest { } /// Create the `appendonlydir/` and write the initial manifest. + /// + /// Prefer [`Self::initialize_with_base`] when the in-memory databases + /// already contain state (e.g. first upgrade from legacy single-file AOF + /// or per-shard WAL) — otherwise subsequent boots cannot reconstruct that + /// state because there is no base RDB for `replay_multi_part` to load. pub fn initialize(dir: &Path) -> std::io::Result { let manifest = Self { dir: dir.to_path_buf(), @@ -73,6 +78,39 @@ impl AofManifest { Ok(manifest) } + /// Create the `appendonlydir/` and write an initial manifest with a base RDB + /// capturing the current in-memory state. + /// + /// Used on first upgrade from legacy persistence formats: after + /// `restore_from_persistence` has loaded state from the per-shard WAL or + /// `appendonly.aof`, this call materializes that state as the seq 1 base + /// RDB. Without a base, on the next boot the multi-part replay path would + /// clear the databases and then fail (missing base with non-empty incr) + /// or silently restart from empty state. + pub fn initialize_with_base(dir: &Path, rdb_bytes: &[u8]) -> std::io::Result { + let manifest = Self { + dir: dir.to_path_buf(), + seq: 1, + }; + std::fs::create_dir_all(manifest.aof_dir())?; + + // Write base RDB atomically: tmp file + fsync + rename. + let base_path = manifest.base_path(); + let tmp_path = base_path.with_extension("rdb.tmp"); + { + let mut f = std::fs::File::create(&tmp_path)?; + f.write_all(rdb_bytes)?; + f.sync_data()?; + } + std::fs::rename(&tmp_path, &base_path)?; + + // Create empty incr file so the writer has something to append to. + std::fs::File::create(manifest.incr_path())?; + + manifest.write_manifest()?; + Ok(manifest) + } + /// Load manifest from disk. /// /// Returns: @@ -196,13 +234,28 @@ impl AofManifest { source: e, })?; - // 1. Write new base RDB (atomic: tmp + rename) + // 1. Write new base RDB (atomic: tmp + fsync + rename). + // Must fsync the data BEFORE renaming — a rename without prior fsync + // can publish a file whose contents aren't durable, so a crash leaves + // the manifest pointing at an empty/partial base RDB. let new_base = self.base_path_seq(new_seq); let tmp_base = new_base.with_extension("rdb.tmp"); - std::fs::write(&tmp_base, rdb_bytes).map_err(|e| crate::error::AofError::Io { - path: tmp_base.clone(), - source: e, - })?; + { + let mut f = + std::fs::File::create(&tmp_base).map_err(|e| crate::error::AofError::Io { + path: tmp_base.clone(), + source: e, + })?; + f.write_all(rdb_bytes) + .map_err(|e| crate::error::AofError::Io { + path: tmp_base.clone(), + source: e, + })?; + f.sync_data().map_err(|e| crate::error::AofError::Io { + path: tmp_base.clone(), + source: e, + })?; + } std::fs::rename(&tmp_base, &new_base).map_err(|e| { crate::error::AofError::RewriteFailed { detail: format!("rename base: {}", e), @@ -279,7 +332,29 @@ pub fn replay_multi_part( } } } else { - warn!("AOF base RDB not found: {}", base_path.display()); + // Missing base is tolerable only when the incr log is also empty + // (fresh manifest from initialize(), or first boot after legacy + // upgrade). If there's incremental content but no base, replaying + // deltas (DEL, EXPIRE, HINCRBY, …) on an empty database produces + // incorrect state — fail loudly rather than silently corrupt. + let incr_path = manifest.incr_path(); + let incr_len = std::fs::metadata(&incr_path).map(|m| m.len()).unwrap_or(0); + if incr_len > 0 { + return Err(crate::error::MoonError::from( + crate::error::AofError::RewriteFailed { + detail: format!( + "AOF base RDB missing at {} but incr {} is {} bytes; refusing to replay incr against empty state", + base_path.display(), + incr_path.display(), + incr_len, + ), + }, + )); + } + warn!( + "AOF base RDB not found: {} (incr empty, treating as fresh init)", + base_path.display() + ); } // Replay incremental RESP diff --git a/src/storage/db.rs b/src/storage/db.rs index 5581ca50..0871cd71 100644 --- a/src/storage/db.rs +++ b/src/storage/db.rs @@ -434,6 +434,17 @@ impl Database { self.data.insert(CompactKey::from(key), entry); } + /// Clear all entries and reset memory accounting. + /// + /// Used during multi-part AOF recovery to wipe any state populated by + /// earlier recovery phases (per-shard WAL replay, legacy appendonly.aof) + /// before loading the authoritative base RDB + incr log. Without this, + /// non-idempotent commands from pre-existing state would be double-applied. + pub fn clear(&mut self) { + self.data = DashTable::new(); + self.used_memory = 0; + } + /// Bulk-load insert: skip duplicate check, version tracking, and per-key memory accounting. /// /// Used exclusively during RDB/AOF restore where keys are guaranteed unique and From a92a0066a1e08523f2e161001fe2983ccd71af36 Mon Sep 17 00:00:00 2001 From: Tin Dang Date: Wed, 8 Apr 2026 23:35:18 +0700 Subject: [PATCH 4/6] chore(rdb): remove dead zero-copy plumbing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses the last two cosmetic PR #63 comments: - qodo: #[allow(dead_code)] on read_bytes_zero_copy lacked justification - coderabbit: unnecessary Bytes::copy_from_slice(data) full-buffer copy fed into read_entry_zero_copy's ignored _shared_buf parameter The "zero-copy" path through read_entry_zero_copy was never actually zero-copy — read_bytes_zero_copy was defined but never called, and read_entry_zero_copy ignored the buffer it was given. This commit: - Deletes read_bytes_zero_copy (truly dead) - Removes the unused _shared_buf parameter from read_entry_zero_copy - Removes the Bytes::copy_from_slice(data) allocation in load_from_bytes that existed solely to feed that parameter - Updates the two call sites - Documents the rationale so a future zero-copy revival adds the plumbing as part of a landed change, not as vestigial code No behavior change; smaller binary, one fewer full-buffer allocation on RDB load_from_bytes. Validation (moon-dev): cargo fmt --check ok cargo clippy --release -- -D warnings ok cargo clippy --no-default-features --features runtime-tokio,jemalloc -- -D warnings ok --- src/persistence/rdb.rs | 37 ++++++++++--------------------------- 1 file changed, 10 insertions(+), 27 deletions(-) diff --git a/src/persistence/rdb.rs b/src/persistence/rdb.rs index dbc8a3ba..f60841b6 100644 --- a/src/persistence/rdb.rs +++ b/src/persistence/rdb.rs @@ -318,7 +318,7 @@ pub fn load(databases: &mut [Database], path: &Path) -> Result } } type_tag => { - match read_entry_zero_copy(&mut cursor, type_tag, &shared_buf, now_secs) { + match read_entry_zero_copy(&mut cursor, type_tag, now_secs) { Ok((key, entry)) => { if entry.has_expiry() && entry.is_expired_at(now_secs, now_ms) { continue; @@ -520,11 +520,17 @@ fn skip_bytes_field(data: &[u8], pos: usize) -> Option { } } -/// Zero-copy variant of read_entry: uses shared Bytes buffer and cached timestamps. +/// Variant of read_entry using cached timestamps to avoid per-entry syscalls. +/// +/// Earlier revisions threaded a `shared_buf: &Bytes` through this path for +/// zero-copy slicing via `read_bytes_zero_copy`, but that helper was never +/// wired up — `read_bytes` currently always heap-allocates. The parameter +/// and the caller-side `Bytes::copy_from_slice(data)` that fed it have been +/// removed; restoring true zero-copy should add it back as part of a single +/// landed change, not as vestigial plumbing. fn read_entry_zero_copy( cursor: &mut Cursor<&[u8]>, type_tag: u8, - _shared_buf: &Bytes, cached_secs: u32, ) -> Result<(Bytes, Entry), MoonError> { let key = read_bytes(cursor)?; @@ -803,7 +809,6 @@ pub fn load_from_bytes( let now_ms = current_time_ms(); let now_secs = (now_ms / 1000) as u32; - let shared_buf = Bytes::copy_from_slice(data); let mut total_keys = 0usize; let mut current_db: usize = 0; @@ -840,7 +845,7 @@ pub fn load_from_bytes( .into()); } } - type_tag => match read_entry_zero_copy(&mut cursor, type_tag, &shared_buf, now_secs) { + type_tag => match read_entry_zero_copy(&mut cursor, type_tag, now_secs) { Ok((key, entry)) => { if entry.has_expiry() && entry.is_expired_at(now_secs, now_ms) { continue; @@ -1351,28 +1356,6 @@ pub(crate) fn read_bytes_vec(cursor: &mut Cursor<&[u8]>) -> Result, Moon Ok(slice.to_vec()) } -/// Zero-copy read: returns a `Bytes` slice of the shared buffer (no heap alloc). -#[allow(dead_code)] -pub(crate) fn read_bytes_zero_copy( - cursor: &mut Cursor<&[u8]>, - shared_buf: &Bytes, -) -> Result { - let len = read_u32(cursor)? as usize; - let pos = cursor.position() as usize; - let remaining = cursor.get_ref().len() - pos; - if len > remaining { - return Err(RdbError::Corrupted { - detail: format!( - "read_bytes_zero_copy: length {} exceeds remaining {}", - len, remaining - ), - } - .into()); - } - cursor.set_position((pos + len) as u64); - Ok(shared_buf.slice(pos..pos + len)) -} - pub(crate) fn read_u32(cursor: &mut Cursor<&[u8]>) -> Result { let mut buf = [0u8; 4]; cursor.read_exact(&mut buf)?; From 9d544fa8a3efd93a3c921c7e8b43dbb25f9b01d8 Mon Sep 17 00:00:00 2001 From: Tin Dang Date: Thu, 9 Apr 2026 08:11:08 +0700 Subject: [PATCH 5/6] fix(rdb): load into temp databases then swap, preventing stale key survival MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit An RDB file (both standalone .rdb and AOF RDB-preamble) is a full point-in-time snapshot. Loading it must replace the in-memory state, not merge into it. Previously both rdb::load() and rdb::load_from_bytes() called insert_for_load() directly on the live databases, so keys that existed before the load but were absent from the RDB snapshot silently survived — producing mixed state. Fix: both load paths now create temporary Database instances, load entries into them, then swap the temps into the live slots on success. This provides: - Correctness: old keys not in the snapshot are gone after load. - Atomicity: if the load fails partway, original state is untouched. - Consistent metadata: recalculate_memory runs on temps before swap, so used_memory reflects exactly the loaded state. The swap is safe w.r.t. cold_index/cold_shard_dir because main.rs initializes those fields after restore_from_persistence completes. Validation (moon-dev): cargo fmt --check ok cargo clippy --release / --runtime-tokio,jemalloc ok cargo test --release --lib 1858 pass Manual: 150 keys through BGREWRITEAOF + incr + restart PASS --- src/persistence/rdb.rs | 99 ++++++++++++++++++++++++------------------ 1 file changed, 56 insertions(+), 43 deletions(-) diff --git a/src/persistence/rdb.rs b/src/persistence/rdb.rs index f60841b6..ae1dcabe 100644 --- a/src/persistence/rdb.rs +++ b/src/persistence/rdb.rs @@ -274,13 +274,18 @@ pub fn load(databases: &mut [Database], path: &Path) -> Result let now_ms = current_time_ms(); let now_secs = (now_ms / 1000) as u32; - // First pass: count entries per database for pre-sizing (Fix #2) - let entry_counts = count_entries_per_db(&cursor, databases.len()); - - // Pre-size DashTables to avoid segment splits during load (Fix #2) + // Load into temporary databases so old keys not present in the RDB + // snapshot are discarded. An RDB file is a full point-in-time snapshot + // and must replace state, not merge into it. Loading into temps also + // provides atomicity: on failure, the live databases are untouched. + let db_count = databases.len(); + let mut temp_dbs: Vec = (0..db_count).map(|_| Database::new()).collect(); + + // First pass: count entries per database for pre-sizing + let entry_counts = count_entries_per_db(&cursor, db_count); for (db_idx, &count) in entry_counts.iter().enumerate() { - if count > 0 && db_idx < databases.len() { - databases[db_idx].reserve(count); + if count > 0 && db_idx < db_count { + temp_dbs[db_idx].reserve(count); } } @@ -306,46 +311,43 @@ pub fn load(databases: &mut [Database], path: &Path) -> Result source: e, })?; current_db = db_idx[0] as usize; - if current_db >= databases.len() { + if current_db >= db_count { return Err(RdbError::Corrupted { detail: format!( "RDB references database {} but only {} configured", - current_db, - databases.len() + current_db, db_count ), } .into()); } } - type_tag => { - match read_entry_zero_copy(&mut cursor, type_tag, now_secs) { - Ok((key, entry)) => { - if entry.has_expiry() && entry.is_expired_at(now_secs, now_ms) { - continue; - } - if current_db < databases.len() { - // Fix #3: skip duplicate check + memory accounting - databases[current_db].insert_for_load(key, entry); - total_keys += 1; - } + type_tag => match read_entry_zero_copy(&mut cursor, type_tag, now_secs) { + Ok((key, entry)) => { + if entry.has_expiry() && entry.is_expired_at(now_secs, now_ms) { + continue; } - Err(e) => { - tracing::warn!( - "RDB load: corrupted entry at offset {}: {}. {} keys loaded.", - cursor.position(), - e, - total_keys - ); - break; + if current_db < db_count { + temp_dbs[current_db].insert_for_load(key, entry); + total_keys += 1; } } - } + Err(e) => { + tracing::warn!( + "RDB load: corrupted entry at offset {}: {}. {} keys loaded.", + cursor.position(), + e, + total_keys + ); + break; + } + }, } } - // Fix #3: single-pass memory recalculation after all inserts - for db in databases.iter_mut() { - db.recalculate_memory(); + // Recalculate memory on temp databases, then swap into live ones. + for (live, mut temp) in databases.iter_mut().zip(temp_dbs.into_iter()) { + temp.recalculate_memory(); + *live = temp; } Ok(total_keys) @@ -812,11 +814,19 @@ pub fn load_from_bytes( let mut total_keys = 0usize; let mut current_db: usize = 0; - // Pre-size DashTables - let entry_counts = count_entries_per_db(&cursor, databases.len()); + // Load into temporary databases so that: + // (a) If the load fails partway, original state is untouched. + // (b) Old keys not present in the RDB snapshot don't survive — an RDB + // preamble is a full point-in-time snapshot and must replace state, + // not merge into it. + let db_count = databases.len(); + let mut temp_dbs: Vec = (0..db_count).map(|_| Database::new()).collect(); + + // Pre-size DashTables on the temporary databases + let entry_counts = count_entries_per_db(&cursor, db_count); for (db_idx, &count) in entry_counts.iter().enumerate() { - if count > 0 && db_idx < databases.len() { - databases[db_idx].reserve(count); + if count > 0 && db_idx < db_count { + temp_dbs[db_idx].reserve(count); } } @@ -834,12 +844,11 @@ pub fn load_from_bytes( source: e, })?; current_db = db_idx[0] as usize; - if current_db >= databases.len() { + if current_db >= db_count { return Err(RdbError::Corrupted { detail: format!( "RDB preamble references database {} but only {} configured", - current_db, - databases.len() + current_db, db_count ), } .into()); @@ -850,8 +859,8 @@ pub fn load_from_bytes( if entry.has_expiry() && entry.is_expired_at(now_secs, now_ms) { continue; } - if current_db < databases.len() { - databases[current_db].insert_for_load(key, entry); + if current_db < db_count { + temp_dbs[current_db].insert_for_load(key, entry); total_keys += 1; } } @@ -860,8 +869,12 @@ pub fn load_from_bytes( } } - for db in databases.iter_mut() { - db.recalculate_memory(); + // Recalculate memory on temp databases, then swap into the live ones. + // This replaces the entire in-memory state for each database — old keys + // that were not in the RDB snapshot are discarded. + for (live, mut temp) in databases.iter_mut().zip(temp_dbs.into_iter()) { + temp.recalculate_memory(); + *live = temp; } Ok((total_keys, rdb_len)) From eced95f9bd2be9b87952f0b80a29928bc5b336e8 Mon Sep 17 00:00:00 2001 From: Tin Dang Date: Thu, 9 Apr 2026 08:52:14 +0700 Subject: [PATCH 6/6] fix(aof,rdb): manifest validation, RDB error propagation, CRC O(n), writer timeout MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Four fixes from code review: 1. aof_manifest::load — validate base/incr records before orphan cleanup A truncated manifest containing only "seq 2" (no base/incr lines) would pass the seq > 0 check, then cleanup_orphans would delete files matching the PREVIOUS valid seq — destroying the actual recovery data. Now load() requires all three records (seq, base, incr) to be present. Truncated manifests return Err, which callers already treat as fatal. 2. rdb::load + rdb::load_from_bytes — return Err on corrupted entries Both load paths now load into temp_dbs, but on Err from read_entry_zero_copy they used to break and fall through to the swap, committing partially-loaded temp databases into live state. Now both paths return Err immediately, leaving live databases untouched. The error includes byte offset and key count for diagnosis. 3. rdb::load_from_bytes — single-pass CRC scan (O(n) vs O(n²)) The EOF_MARKER search was re-hashing data[0..=i] from scratch for each candidate byte. Now maintains a running crc32fast::Hasher updated byte-by-byte, cloning at each candidate position. On a 10MB RDB preamble with k candidates, this reduces from O(n*k) hash bytes to O(n) total. 4. aof::aof_writer_task — bound manifest wait with cancel + timeout The monoio writer's manifest wait loop now checks the CancellationToken each iteration and enforces a 60s hard timeout. Previously if main.rs failed to create the manifest (disk full, permission error), the writer would spin forever, blocking graceful shutdown. Now exits cleanly with a diagnostic log. Skipped: rdb.rs submodule split (1726 lines, above 1500 limit but high-risk churn for a correctness PR — tracked for follow-up). Validation (moon-dev): cargo fmt --check ok cargo clippy --release / --runtime-tokio,jemalloc ok cargo test --release --lib 1858 pass Smoke: 700 keys + INCR counter through BGREWRITEAOF + restart PASS --- src/persistence/aof.rs | 18 +++++++++++ src/persistence/aof_manifest.rs | 26 +++++++++++++++ src/persistence/rdb.rs | 56 ++++++++++++++++++++++++--------- 3 files changed, 85 insertions(+), 15 deletions(-) diff --git a/src/persistence/aof.rs b/src/persistence/aof.rs index bd8de088..840f1542 100644 --- a/src/persistence/aof.rs +++ b/src/persistence/aof.rs @@ -129,7 +129,25 @@ pub async fn aof_writer_task( // // A corrupt manifest is fatal — exit the writer so the server startup // notices and fails loud rather than silently overwriting. + // + // Bounded wait: check the cancellation token each iteration and enforce + // a hard timeout so the writer doesn't spin forever if main.rs fails to + // create the manifest (e.g. disk full, permission error). + let manifest_wait_start = Instant::now(); + const MANIFEST_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(60); let mut manifest = loop { + if cancel.is_cancelled() { + info!("AOF writer: cancelled while waiting for manifest"); + return; + } + if manifest_wait_start.elapsed() > MANIFEST_TIMEOUT { + error!( + "AOF writer: manifest not found at {} after {:?}. Writer exiting; check recovery logs.", + base_dir.display(), + MANIFEST_TIMEOUT, + ); + return; + } match AofManifest::load(&base_dir) { Ok(Some(m)) => break m, Ok(None) => { diff --git a/src/persistence/aof_manifest.rs b/src/persistence/aof_manifest.rs index b1671e2e..7f884e9e 100644 --- a/src/persistence/aof_manifest.rs +++ b/src/persistence/aof_manifest.rs @@ -130,12 +130,18 @@ impl AofManifest { let content = std::fs::read_to_string(&manifest_path)?; let mut seq = 0u64; + let mut has_base_record = false; + let mut has_incr_record = false; for line in content.lines() { let line = line.trim(); if let Some(val) = line.strip_prefix("seq ") { if let Ok(n) = val.parse::() { seq = n; } + } else if line.starts_with("base ") { + has_base_record = true; + } else if line.starts_with("incr ") { + has_incr_record = true; } } @@ -149,6 +155,23 @@ impl AofManifest { )); } + // A valid manifest must have all three records (seq, base, incr). + // A truncated manifest with only "seq N" but no base/incr lines could + // trigger orphan cleanup that deletes the real base RDB referenced by + // the previous valid manifest. Require all records before proceeding. + if !has_base_record || !has_incr_record { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + format!( + "AOF manifest at {} is truncated: seq={} base={} incr={}", + manifest_path.display(), + seq, + has_base_record, + has_incr_record, + ), + )); + } + let manifest = Self { dir: dir.to_path_buf(), seq, @@ -158,6 +181,9 @@ impl AofManifest { // rewrites. A crash between advance() steps 1-3 leaves a new base RDB on // disk that the active manifest never references. Without this sweep, // repeated crashes during rewrite can fill the disk with zombie files. + // + // Safe to call here: we verified the manifest has all three records + // (seq, base, incr), so cleanup_orphans won't delete the active files. manifest.cleanup_orphans(); Ok(Some(manifest)) diff --git a/src/persistence/rdb.rs b/src/persistence/rdb.rs index ae1dcabe..544845e1 100644 --- a/src/persistence/rdb.rs +++ b/src/persistence/rdb.rs @@ -332,19 +332,24 @@ pub fn load(databases: &mut [Database], path: &Path) -> Result } } Err(e) => { - tracing::warn!( - "RDB load: corrupted entry at offset {}: {}. {} keys loaded.", - cursor.position(), - e, - total_keys - ); - break; + // Do NOT swap partially-loaded temp_dbs into live databases. + // A corrupted-but-checksummed RDB must not commit partial state. + return Err(RdbError::Corrupted { + detail: format!( + "RDB load: corrupted entry at offset {}: {}. {} keys loaded before failure.", + cursor.position(), + e, + total_keys + ), + } + .into()); } }, } } // Recalculate memory on temp databases, then swap into live ones. + // Only reached if all entries parsed successfully — no partial state. for (live, mut temp) in databases.iter_mut().zip(temp_dbs.into_iter()) { temp.recalculate_memory(); *live = temp; @@ -753,11 +758,23 @@ pub fn load_from_bytes( // The RDB section is: header + entries + EOF_MARKER(1) + CRC32(4). // We scan for EOF_MARKER (0xFF) — the first one after the header that's // immediately followed by a valid CRC32 of the preceding bytes. + // + // Single-pass: maintain a running CRC hasher updated byte-by-byte. + // When we hit a candidate EOF_MARKER at position i (i >= 5), clone + // the hasher (which includes data[0..i]), finalize with the EOF byte, + // and compare against the stored CRC at data[i+1..i+5]. This avoids + // re-hashing the entire prefix for each candidate (O(n) vs O(n²)). let mut rdb_end = None; - // Start scanning after header (MOON + version = 5 bytes) + let mut running_hasher = Hasher::new(); + // Feed bytes 0..5 (header) into the running hasher + if data.len() > 5 { + running_hasher.update(&data[..5]); + } for i in 5..data.len().saturating_sub(4) { if data[i] == EOF_MARKER { - let payload = &data[..=i]; // everything up to and including EOF_MARKER + // Clone running hasher (covers data[0..i]), then finalize with EOF byte + let mut candidate = running_hasher.clone(); + candidate.update(&[EOF_MARKER]); if let Some(checksum_bytes) = data.get(i + 1..i + 5) { let stored = u32::from_le_bytes([ checksum_bytes[0], @@ -765,14 +782,14 @@ pub fn load_from_bytes( checksum_bytes[2], checksum_bytes[3], ]); - let mut hasher = Hasher::new(); - hasher.update(payload); - if hasher.finalize() == stored { + if candidate.finalize() == stored { rdb_end = Some(i + 5); // past CRC32 break; } } } + // Feed this byte into the running hasher for the next iteration + running_hasher.update(&data[i..i + 1]); } let rdb_len = rdb_end.ok_or_else(|| { @@ -864,14 +881,23 @@ pub fn load_from_bytes( total_keys += 1; } } - Err(_) => break, + Err(e) => { + return Err(RdbError::Corrupted { + detail: format!( + "RDB preamble: corrupted entry at offset {}: {}. {} keys loaded before failure.", + cursor.position(), + e, + total_keys + ), + } + .into()); + } }, } } // Recalculate memory on temp databases, then swap into the live ones. - // This replaces the entire in-memory state for each database — old keys - // that were not in the RDB snapshot are discarded. + // Only reached if all entries parsed successfully — no partial state. for (live, mut temp) in databases.iter_mut().zip(temp_dbs.into_iter()) { temp.recalculate_memory(); *live = temp;