diff --git a/src/command/persistence.rs b/src/command/persistence.rs index 928a2b1f..35423fab 100644 --- a/src/command/persistence.rs +++ b/src/command/persistence.rs @@ -212,6 +212,21 @@ pub fn bgrewriteaof_start(aof_tx: &channel::MpscSender, db: SharedDa } } +/// Start BGREWRITEAOF in sharded mode using ShardDatabases. +pub fn bgrewriteaof_start_sharded( + aof_tx: &channel::MpscSender, + shard_databases: std::sync::Arc, +) -> Frame { + match aof_tx.try_send(AofMessage::RewriteSharded(shard_databases)) { + Ok(()) => Frame::SimpleString(Bytes::from_static( + b"Background append only file rewriting started", + )), + Err(_) => Frame::Error(Bytes::from_static( + b"ERR Background AOF rewrite failed to start", + )), + } +} + /// SAVE command: synchronous save to disk. Blocks until complete. /// /// Clones all entries under read locks (same as BGSAVE), then serializes diff --git a/src/main.rs b/src/main.rs index 089b6a6b..9f01831d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -240,6 +240,115 @@ fn main() -> anyhow::Result<()> { }) .collect(); + // Multi-part AOF replay layered on top of v2/v3 recovery. + // Priority: if appendonlydir/ manifest exists → load multi-part (skip legacy v2 fallback). + // Otherwise v2 already handled legacy appendonly.aof during restore_from_persistence. + // + // A corrupt manifest is FATAL: overwriting it silently destroys the reference + // to the real base RDB and loses all persisted data. + if config.appendonly == "yes" { + if let Some(ref dir) = persistence_dir { + use anyhow::Context; + use moon::persistence::aof_manifest::AofManifest; + use moon::persistence::replay::DispatchReplayEngine; + let base_dir = std::path::PathBuf::from(dir); + let manifest_opt = AofManifest::load(&base_dir).with_context(|| { + format!( + "AOF manifest at {}/appendonlydir/ is corrupt; refusing to start to avoid data loss. Inspect manually before deleting.", + base_dir.display() + ) + })?; + if let Some(ref manifest) = manifest_opt { + if num_shards == 1 { + // Multi-part AOF is authoritative. Wipe any state that earlier + // recovery phases (per-shard WAL replay, legacy appendonly.aof + // fallback inside restore_from_persistence) may have loaded — + // otherwise non-idempotent commands from the incr log would + // double-apply on top of that pre-existing state. + for db in shards[0].databases.iter_mut() { + db.clear(); + } + let loaded = moon::persistence::aof_manifest::replay_multi_part( + &mut shards[0].databases, + manifest, + &DispatchReplayEngine, + ) + .with_context(|| "multi-part AOF replay failed")?; + info!( + "AOF multi-part loaded (seq {}): {} entries", + manifest.seq, loaded + ); + + // Retire legacy appendonly.aof so future boots don't double- + // replay it via restore_from_persistence's fallback path. + // Rename (not delete) so an operator can recover if something + // went wrong. + let legacy = base_dir.join("appendonly.aof"); + if legacy.exists() { + let retired = base_dir.join("appendonly.aof.legacy"); + if let Err(e) = std::fs::rename(&legacy, &retired) { + tracing::warn!( + "Failed to retire legacy AOF {}: {}", + legacy.display(), + e + ); + } else { + info!( + "Retired legacy AOF {} → {}", + legacy.display(), + retired.display() + ); + } + } + } else { + tracing::warn!( + "Multi-part AOF skipped in multi-shard mode (not yet supported)" + ); + } + } else { + // No manifest present — first boot after upgrade from legacy + // single-file AOF (v2 recovery already loaded it above) or + // fresh install. + // + // If restore_from_persistence loaded any state (from WAL or + // legacy appendonly.aof), we MUST capture it as the seq 1 + // base RDB. Otherwise on the next boot the multi-part replay + // path would clear the databases and lose the legacy state. + // Only shard 0 is relevant in single-shard mode (the only + // mode the multi-part path currently supports). + let has_state = + num_shards == 1 && shards[0].databases.iter().any(|db| db.len() > 0); + if has_state { + let rdb_bytes = moon::persistence::rdb::save_to_bytes(&shards[0].databases) + .with_context(|| "failed to serialize legacy state for AOF base")?; + AofManifest::initialize_with_base(&base_dir, &rdb_bytes) + .with_context(|| "failed to initialize AOF manifest with base")?; + info!( + "First-upgrade: captured legacy state as AOF base seq 1 ({} bytes)", + rdb_bytes.len() + ); + // Retire legacy appendonly.aof — its contents are now in + // the base RDB, and leaving it would cause v2 recovery on + // the next boot to double-replay it. + let legacy = base_dir.join("appendonly.aof"); + if legacy.exists() { + let retired = base_dir.join("appendonly.aof.legacy"); + if let Err(e) = std::fs::rename(&legacy, &retired) { + tracing::warn!( + "Failed to retire legacy AOF {}: {}", + legacy.display(), + e + ); + } + } + } else { + AofManifest::initialize(&base_dir) + .with_context(|| "failed to initialize AOF manifest")?; + } + } + } + } + // Extract databases from all shards and wrap in ShardDatabases let all_dbs: Vec> = shards .iter_mut() diff --git a/src/persistence/aof.rs b/src/persistence/aof.rs index b0161ca7..840f1542 100644 --- a/src/persistence/aof.rs +++ b/src/persistence/aof.rs @@ -61,6 +61,8 @@ pub enum AofMessage { Append(Bytes), /// Trigger a full AOF rewrite (compaction) using current database state. Rewrite(SharedDatabases), + /// Trigger AOF rewrite in sharded mode (all shards' databases). + RewriteSharded(Arc), /// Shut down the AOF writer task gracefully. Shutdown, } @@ -107,38 +109,160 @@ pub async fn aof_writer_task( #[cfg(feature = "runtime-tokio")] interval.tick().await; // consume first tick - // Monoio fallback: AOF writer uses sync I/O in a simple recv loop. + // Monoio path: multi-part AOF (base RDB + incremental RESP) with sync I/O. + // + // On startup, if appendonlydir/ exists with a manifest, open the current + // incr file for appending. Otherwise start fresh with seq 1. + // On BGREWRITEAOF: snapshot → write new base RDB → create new incr → advance manifest. #[cfg(feature = "runtime-monoio")] { + use crate::persistence::aof_manifest::AofManifest; use std::io::Write; + + // Resolve the persistence base directory from aof_path's parent. + let base_dir = aof_path.parent().unwrap_or(Path::new(".")).to_path_buf(); + + // Load manifest — do NOT create one here if it doesn't exist. + // main.rs recovery runs concurrently and must finish before a manifest + // is created, to avoid racing against legacy single-file AOF detection. + // main.rs will create the manifest after recovery completes. + // + // A corrupt manifest is fatal — exit the writer so the server startup + // notices and fails loud rather than silently overwriting. + // + // Bounded wait: check the cancellation token each iteration and enforce + // a hard timeout so the writer doesn't spin forever if main.rs fails to + // create the manifest (e.g. disk full, permission error). + let manifest_wait_start = Instant::now(); + const MANIFEST_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(60); + let mut manifest = loop { + if cancel.is_cancelled() { + info!("AOF writer: cancelled while waiting for manifest"); + return; + } + if manifest_wait_start.elapsed() > MANIFEST_TIMEOUT { + error!( + "AOF writer: manifest not found at {} after {:?}. Writer exiting; check recovery logs.", + base_dir.display(), + MANIFEST_TIMEOUT, + ); + return; + } + match AofManifest::load(&base_dir) { + Ok(Some(m)) => break m, + Ok(None) => { + // main.rs recovery hasn't created the manifest yet — wait. + std::thread::sleep(std::time::Duration::from_millis(50)); + } + Err(e) => { + error!( + "AOF manifest corrupt at {}: {}. Writer exiting; persistence disabled.", + base_dir.display(), + e + ); + return; + } + } + }; + + // Open the current incremental file for appending + let incr_path = manifest.incr_path(); let mut file = match std::fs::OpenOptions::new() .create(true) .append(true) - .open(&aof_path) + .open(&incr_path) { Ok(f) => f, Err(e) => { - error!("Failed to open AOF file {}: {}", aof_path.display(), e); + error!( + "Failed to open AOF incr file {}: {}", + incr_path.display(), + e + ); return; } }; + info!( + "AOF writer: seq {}, incr={}", + manifest.seq, + incr_path.display() + ); + + let mut last_fsync = Instant::now(); + + let mut write_error = false; + loop { - // Use blocking recv since monoio doesn't support tokio::select! match rx.recv() { Ok(AofMessage::Append(data)) => { - let _ = file.write_all(&data); - if fsync == FsyncPolicy::Always { - let _ = file.flush(); - let _ = std::io::Write::flush(&mut file); + if write_error { + continue; // Drop appends after persistent I/O failure + } + if let Err(e) = file.write_all(&data) { + error!( + "AOF write failed (seq {}): {}. Persistence degraded.", + manifest.seq, e + ); + write_error = true; + continue; + } + match fsync { + FsyncPolicy::Always => { + if let Err(e) = file.flush().and_then(|_| file.sync_data()) { + error!("AOF sync failed (seq {}, always): {}", manifest.seq, e); + write_error = true; + } + } + FsyncPolicy::EverySec => { + if last_fsync.elapsed() >= std::time::Duration::from_secs(1) { + if let Err(e) = file.flush().and_then(|_| file.sync_data()) { + error!( + "AOF sync failed (seq {}, everysec): {}", + manifest.seq, e + ); + // Non-fatal for everysec: retry next interval + } else { + last_fsync = Instant::now(); + } + } + } + FsyncPolicy::No => {} } } Ok(AofMessage::Shutdown) | Err(_) => { - let _ = file.flush(); - info!("AOF writer shutting down (monoio)"); + if !write_error { + if let Err(e) = file.flush().and_then(|_| file.sync_data()) { + error!("AOF final sync failed (seq {}): {}", manifest.seq, e); + } + } + info!("AOF writer shutting down (monoio, seq {})", manifest.seq); break; } - Ok(AofMessage::Rewrite(_db)) => { - // AOF rewrite under monoio: not yet implemented + Ok(AofMessage::Rewrite(db)) => { + if !write_error { + if let Err(e) = file.flush().and_then(|_| file.sync_data()) { + error!("AOF pre-rewrite sync failed (seq {}): {}", manifest.seq, e); + } + } + match do_rewrite_single(&db, &mut manifest, &mut file, &rx) { + Ok(()) => { + write_error = false; // Reset on successful rewrite + } + Err(e) => error!("AOF rewrite failed (seq {}): {}", manifest.seq, e), + } + } + Ok(AofMessage::RewriteSharded(shard_dbs)) => { + if !write_error { + if let Err(e) = file.flush().and_then(|_| file.sync_data()) { + error!("AOF pre-rewrite sync failed (seq {}): {}", manifest.seq, e); + } + } + match do_rewrite_sharded(&shard_dbs, &mut manifest, &mut file, &rx) { + Ok(()) => { + write_error = false; + } + Err(e) => error!("AOF rewrite failed (seq {}): {}", manifest.seq, e), + } } } } @@ -190,6 +314,19 @@ pub async fn aof_writer_task( } } } + Ok(AofMessage::RewriteSharded(shard_dbs)) => { + let _ = writer.flush().await; + let _ = writer.get_ref().sync_data().await; + if let Err(e) = rewrite_aof_sharded_sync(&shard_dbs, &aof_path) { + error!("AOF rewrite (sharded) failed: {}", e); + } + let reopen_result: Result = tokio::fs::OpenOptions::new() + .create(true).append(true).open(&aof_path).await; + match reopen_result { + Ok(f) => writer = tokio::io::BufWriter::new(f), + Err(e) => { error!("Failed to reopen AOF after rewrite: {}", e); return; } + } + } Ok(AofMessage::Shutdown) | Err(_) => { let _ = writer.flush().await; let _ = writer.get_ref().sync_data().await; @@ -233,8 +370,35 @@ pub fn replay_aof( return Ok(0); } - let total_len = data.len(); - let mut buf = BytesMut::from(&data[..]); + // Detect RDB preamble: if the file starts with "MOON" magic, load the binary + // RDB section first, then replay any RESP commands appended after it. + let (rdb_keys, resp_start) = if data.starts_with(b"MOON") { + match crate::persistence::rdb::load_from_bytes(databases, &data) { + Ok((keys, consumed)) => { + info!( + "AOF RDB preamble loaded: {} keys ({} bytes)", + keys, consumed + ); + (keys, consumed) + } + Err(e) => { + // Data starts with MOON magic — it IS RDB format. + // Falling back to RESP would parse garbage. Propagate the error. + return Err(e); + } + } + } else { + (0, 0) + }; + + // If the entire file was RDB (no RESP tail), we're done + if resp_start >= data.len() { + return Ok(rdb_keys); + } + + let resp_data = &data[resp_start..]; + let total_len = resp_data.len(); + let mut buf = BytesMut::from(resp_data); let config = ParseConfig::default(); let mut selected_db: usize = 0; let mut count: usize = 0; @@ -314,12 +478,13 @@ pub fn replay_aof( ); } - Ok(count) + Ok(rdb_keys + count) } /// Generate synthetic RESP commands from the current database state for AOF rewriting. /// /// Produces commands for all 5 data types plus PEXPIRE for keys with TTL. +#[allow(dead_code)] // Retained for RESP-only AOF rewrite fallback and testing pub fn generate_rewrite_commands(databases: &[Database]) -> BytesMut { let mut buf = BytesMut::new(); let now_ms = current_time_ms(); @@ -518,12 +683,11 @@ pub fn generate_rewrite_commands(databases: &[Database]) -> BytesMut { buf } -/// Rewrite the AOF file with synthetic commands from current database state. +/// Snapshot databases and generate compacted AOF commands. /// -/// Writes to a temporary file first, then atomically renames for crash safety. -#[cfg(feature = "runtime-tokio")] -pub async fn rewrite_aof(db: SharedDatabases, aof_path: &Path) -> Result<(), MoonError> { - // Clone database state: lock each db individually with read lock +/// Shared by both the async (tokio) and sync (monoio) rewrite paths. +#[allow(dead_code)] +fn snapshot_and_generate(db: &SharedDatabases) -> BytesMut { let snapshot: Vec<(Vec<(CompactKey, Entry)>, u32)> = db .iter() .map(|lock| { @@ -538,7 +702,6 @@ pub async fn rewrite_aof(db: SharedDatabases, aof_path: &Path) -> Result<(), Moo }) .collect(); - // Reconstruct temporary Database objects for generate_rewrite_commands let mut temp_dbs: Vec = Vec::with_capacity(snapshot.len()); for (entries, _base_ts) in &snapshot { let mut db = Database::new(); @@ -548,11 +711,315 @@ pub async fn rewrite_aof(db: SharedDatabases, aof_path: &Path) -> Result<(), Moo temp_dbs.push(db); } - let commands = generate_rewrite_commands(&temp_dbs); + generate_rewrite_commands(&temp_dbs) +} + +/// Drain any queued `AofMessage::Append` messages to the current incr file. +/// +/// Called during rewrite to catch in-flight appends that handlers sent before +/// the writer thread could enter the rewrite routine. Messages of other variants +/// are dropped silently (duplicate rewrites while a rewrite is in progress) or +/// returned via the flag for Shutdown (caller is responsible for honoring it +/// after the rewrite completes). +#[cfg(feature = "runtime-monoio")] +#[derive(Default)] +struct DrainOutcome { + drained: usize, + shutdown_requested: bool, +} + +#[cfg(feature = "runtime-monoio")] +fn drain_pending_appends( + rx: &channel::MpscReceiver, + file: &mut std::fs::File, +) -> Result { + use std::io::Write; + let mut outcome = DrainOutcome::default(); + while let Ok(msg) = rx.try_recv() { + match msg { + AofMessage::Append(data) => { + file.write_all(&data).map_err(|e| AofError::Io { + path: PathBuf::from(""), + source: e, + })?; + outcome.drained += 1; + } + AofMessage::Shutdown => { + outcome.shutdown_requested = true; + } + AofMessage::Rewrite(_) | AofMessage::RewriteSharded(_) => { + // Already rewriting — drop redundant request. + } + } + } + Ok(outcome) +} + +/// Multi-part rewrite: snapshot single-shard databases → RDB base → advance manifest. +/// +/// Correctness ordering (prevents double-apply of non-idempotent commands like +/// INCR/LPUSH/SADD after rewrite): +/// +/// 1. Drain any queued appends into the OLD incr file and fsync. +/// 2. Acquire write locks on all databases in the shard. This blocks handlers +/// from applying new writes or queueing new appends for the locked dbs. +/// 3. Drain the channel once more — catches appends for writes that the +/// handler completed between step 1 and step 2. +/// 4. Snapshot every database under the write locks. Because no handler can +/// mutate the dbs while we hold the locks, the snapshot is atomic with +/// respect to the post-drain channel state. +/// 5. Release the write locks. New handler writes from here on queue in the +/// channel and will be processed into the NEW incr file after rotation. +/// 6. Write the new base RDB, advance the manifest, reopen the file handle. +/// +/// Invariant: any write captured in the new base is NOT in the new incr file +/// (handlers were blocked between drain and snapshot), and any write NOT in +/// the new base IS in the new incr file (queued after lock release). +#[cfg(feature = "runtime-monoio")] +fn do_rewrite_single( + db: &SharedDatabases, + manifest: &mut crate::persistence::aof_manifest::AofManifest, + file: &mut std::fs::File, + rx: &channel::MpscReceiver, +) -> Result<(), MoonError> { + // Phase 1: drain pre-rewrite queued appends into old incr, fsync. + let pre_drain = drain_pending_appends(rx, file)?; + file.sync_data().map_err(|e| AofError::Io { + path: manifest.incr_path(), + source: e, + })?; + + // Phase 2: acquire write locks on every database in the shard. + // Order is consistent (index-ascending) so concurrent callers would + // serialize without deadlock — but in practice only this thread + // acquires multi-db locks. + let guards: Vec<_> = db.iter().map(|lock| lock.write()).collect(); + + // Phase 3: drain any appends the handlers sent between phase 1 and phase 2. + let mid_drain = drain_pending_appends(rx, file)?; + file.sync_data().map_err(|e| AofError::Io { + path: manifest.incr_path(), + source: e, + })?; + + // Phase 4: snapshot under the write locks. No mutation is possible. + let now_ms = current_time_ms(); + let snapshot: Vec<( + Vec<( + crate::storage::compact_key::CompactKey, + crate::storage::entry::Entry, + )>, + u32, + )> = guards + .iter() + .map(|guard| { + let base_ts = guard.base_timestamp(); + let entries: Vec<_> = guard + .data() + .iter() + .filter(|(_, v)| !v.is_expired_at(base_ts, now_ms)) + .map(|(k, v)| (k.clone(), v.clone())) + .collect(); + (entries, base_ts) + }) + .collect(); + + // Phase 5: release locks. Handlers resume; new appends queue in the channel + // and will be processed into the new incr after step 6. + drop(guards); + + // Phase 6: write new base, advance manifest, reopen. + let rdb_bytes = crate::persistence::rdb::save_snapshot_to_bytes(&snapshot)?; + let new_incr = manifest.advance(&rdb_bytes)?; + + *file = std::fs::OpenOptions::new() + .create(true) + .append(true) + .open(&new_incr) + .map_err(|e| AofError::Io { + path: new_incr, + source: e, + })?; + + info!( + "AOF rewrite complete (single): drained {}+{} pre-snapshot appends, seq={}", + pre_drain.drained, mid_drain.drained, manifest.seq + ); + if pre_drain.shutdown_requested || mid_drain.shutdown_requested { + // Caller doesn't currently observe this; logging is the escape hatch. + warn!("AOF writer: shutdown requested during rewrite (will honor on next recv)"); + } + Ok(()) +} + +/// Multi-part rewrite: snapshot all shards → merged RDB base → advance manifest. +/// +/// See [`do_rewrite_single`] for the ordering rationale. The multi-shard variant +/// holds write locks on every (shard, db) pair simultaneously for the duration +/// of the snapshot. This creates a brief global write pause, but it is the only +/// way to guarantee a torn-free snapshot without per-message sequence numbers. +#[cfg(feature = "runtime-monoio")] +fn do_rewrite_sharded( + shard_dbs: &crate::shard::shared_databases::ShardDatabases, + manifest: &mut crate::persistence::aof_manifest::AofManifest, + file: &mut std::fs::File, + rx: &channel::MpscReceiver, +) -> Result<(), MoonError> { + // Phase 1: drain pre-rewrite queued appends into old incr. + let pre_drain = drain_pending_appends(rx, file)?; + file.sync_data().map_err(|e| AofError::Io { + path: manifest.incr_path(), + source: e, + })?; + + // Phase 2: acquire write locks on ALL (shard, db) pairs simultaneously. + // Lock order is (shard_idx, db_idx) ascending — must match anywhere else + // that acquires multiple locks to prevent deadlock (currently no other + // call site does, but the ordering discipline is documented for future + // maintainers). + let all_shards = shard_dbs.all_shard_dbs(); + let mut guards: Vec> = Vec::with_capacity(all_shards.len()); + for shard_locks in all_shards { + let mut shard_guards = Vec::with_capacity(shard_locks.len()); + for lock in shard_locks { + shard_guards.push(lock.write()); + } + guards.push(shard_guards); + } + + // Phase 3: drain appends completed between phase 1 and phase 2. + let mid_drain = drain_pending_appends(rx, file)?; + file.sync_data().map_err(|e| AofError::Io { + path: manifest.incr_path(), + source: e, + })?; + + // Phase 4: snapshot under locks. + let db_count = shard_dbs.db_count(); + let mut merged: Vec<( + Vec<( + crate::storage::compact_key::CompactKey, + crate::storage::entry::Entry, + )>, + u32, + )> = (0..db_count).map(|_| (Vec::new(), 0u32)).collect(); + let now_ms = current_time_ms(); + for shard_guards in &guards { + for (db_idx, guard) in shard_guards.iter().enumerate() { + let base_ts = guard.base_timestamp(); + if merged[db_idx].0.is_empty() { + merged[db_idx].1 = base_ts; + } + for (key, entry) in guard.data().iter() { + if !entry.is_expired_at(base_ts, now_ms) { + merged[db_idx].0.push((key.clone(), entry.clone())); + } + } + } + } + + // Phase 5: release locks before the expensive disk write. + drop(guards); + + // Phase 6: write new base, advance manifest, reopen. + let rdb_bytes = crate::persistence::rdb::save_snapshot_to_bytes(&merged)?; + let new_incr = manifest.advance(&rdb_bytes)?; + + *file = std::fs::OpenOptions::new() + .create(true) + .append(true) + .open(&new_incr) + .map_err(|e| AofError::Io { + path: new_incr, + source: e, + })?; + + info!( + "AOF rewrite complete (sharded): drained {}+{} pre-snapshot appends, seq={}", + pre_drain.drained, mid_drain.drained, manifest.seq + ); + if pre_drain.shutdown_requested || mid_drain.shutdown_requested { + warn!("AOF writer: shutdown requested during rewrite (will honor on next recv)"); + } + Ok(()) +} + +/// Rewrite the AOF file with RDB preamble (binary base + empty RESP incremental). +/// +/// Uses the same strategy as Redis 7+ `aof-use-rdb-preamble yes`: +/// the rewritten AOF starts with a full RDB snapshot (compact binary), +/// and new writes are appended as RESP after it. On startup, the loader +/// detects the RDB magic and reads the binary preamble, then switches +/// to RESP parsing for any incremental commands appended after. +#[allow(dead_code)] // Retained for legacy single-file and tokio path +fn rewrite_aof_sync(db: &SharedDatabases, aof_path: &Path) -> Result<(), MoonError> { + // Snapshot under read locks, build temp Database objects for RDB serialization + let snapshot: Vec = db + .iter() + .map(|lock| { + let guard = lock.read(); + let mut temp = Database::new(); + let now_ms = current_time_ms(); + for (k, v) in guard.data().iter() { + if !v.is_expired_at(guard.base_timestamp(), now_ms) { + temp.set(k.to_bytes(), v.clone()); + } + } + temp + }) + .collect(); + + let rdb_bytes = crate::persistence::rdb::save_to_bytes(&snapshot)?; + + let tmp_path = aof_path.with_extension("aof.tmp"); + std::fs::write(&tmp_path, &rdb_bytes).map_err(|e| AofError::Io { + path: tmp_path.clone(), + source: e, + })?; + std::fs::rename(&tmp_path, aof_path).map_err(|e| AofError::RewriteFailed { + detail: format!( + "rename {} -> {}: {}", + tmp_path.display(), + aof_path.display(), + e + ), + })?; + + info!( + "AOF rewrite complete (RDB preamble): {} bytes", + rdb_bytes.len() + ); + Ok(()) +} + +/// Rewrite the AOF in sharded mode with RDB preamble. +/// +/// Merges all shards' databases into a single RDB snapshot, writes it as +/// the AOF base file. New incremental writes are appended as RESP after. +#[allow(dead_code)] +fn rewrite_aof_sharded_sync( + shard_dbs: &crate::shard::shared_databases::ShardDatabases, + aof_path: &Path, +) -> Result<(), MoonError> { + let db_count = shard_dbs.db_count(); + let now_ms = current_time_ms(); + let mut merged_dbs: Vec = (0..db_count).map(|_| Database::new()).collect(); + + for shard_locks in shard_dbs.all_shard_dbs() { + for (db_idx, lock) in shard_locks.iter().enumerate() { + let guard = lock.read(); + for (key, entry) in guard.data().iter() { + if !entry.is_expired_at(guard.base_timestamp(), now_ms) { + merged_dbs[db_idx].set(key.to_bytes(), entry.clone()); + } + } + } + } + + let rdb_bytes = crate::persistence::rdb::save_to_bytes(&merged_dbs)?; - // Write to temp file, then atomic rename let tmp_path = aof_path.with_extension("aof.tmp"); - std::fs::write(&tmp_path, &commands).map_err(|e| AofError::Io { + std::fs::write(&tmp_path, &rdb_bytes).map_err(|e| AofError::Io { path: tmp_path.clone(), source: e, })?; @@ -565,10 +1032,30 @@ pub async fn rewrite_aof(db: SharedDatabases, aof_path: &Path) -> Result<(), Moo ), })?; - info!("AOF rewrite complete: {} bytes", commands.len()); + info!( + "AOF rewrite (sharded, RDB preamble) complete: {} bytes", + rdb_bytes.len() + ); Ok(()) } +/// Reopen AOF file in append mode after atomic rewrite replaced it. +#[allow(dead_code)] +fn reopen_aof_sync(aof_path: &Path) -> Result { + std::fs::OpenOptions::new() + .create(true) + .append(true) + .open(aof_path) +} + +/// Rewrite the AOF file (tokio async wrapper). +/// +/// Delegates to `rewrite_aof_sync` — the actual I/O is synchronous (temp write + rename). +#[cfg(feature = "runtime-tokio")] +pub async fn rewrite_aof(db: SharedDatabases, aof_path: &Path) -> Result<(), MoonError> { + rewrite_aof_sync(&db, aof_path) +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/persistence/aof_manifest.rs b/src/persistence/aof_manifest.rs new file mode 100644 index 00000000..7f884e9e --- /dev/null +++ b/src/persistence/aof_manifest.rs @@ -0,0 +1,491 @@ +//! Multi-part AOF manifest: tracks base (RDB) and incremental (RESP) files. +//! +//! Implements the same directory-based AOF format as Redis 7+: +//! ```text +//! appendonlydir/ +//! moon.aof.1.base.rdb # RDB snapshot base +//! moon.aof.1.incr.aof # Incremental RESP since base +//! moon.aof.manifest # This file +//! ``` +//! +//! The manifest is a simple text file listing the active base and incremental +//! files with their sequence numbers. On BGREWRITEAOF, the sequence increments, +//! a new base + incr pair is created, and old files are deleted. + +use std::io::Write; +use std::path::{Path, PathBuf}; + +use tracing::{error, info, warn}; + +const MANIFEST_NAME: &str = "moon.aof.manifest"; +const AOF_DIR_NAME: &str = "appendonlydir"; + +/// Active AOF file set tracked by the manifest. +#[derive(Debug, Clone)] +pub struct AofManifest { + /// Base directory (parent of `appendonlydir/`) + pub dir: PathBuf, + /// Current sequence number (incremented on each rewrite) + pub seq: u64, +} + +impl AofManifest { + /// Path to the `appendonlydir/` directory. + pub fn aof_dir(&self) -> PathBuf { + self.dir.join(AOF_DIR_NAME) + } + + /// Path to the manifest file. + pub fn manifest_path(&self) -> PathBuf { + self.aof_dir().join(MANIFEST_NAME) + } + + /// Path to the base RDB file for the current sequence. + pub fn base_path(&self) -> PathBuf { + self.aof_dir() + .join(format!("moon.aof.{}.base.rdb", self.seq)) + } + + /// Path to the incremental RESP file for the current sequence. + pub fn incr_path(&self) -> PathBuf { + self.aof_dir() + .join(format!("moon.aof.{}.incr.aof", self.seq)) + } + + /// Path to the base RDB file for a given sequence. + pub fn base_path_seq(&self, seq: u64) -> PathBuf { + self.aof_dir().join(format!("moon.aof.{}.base.rdb", seq)) + } + + /// Path to the incremental RESP file for a given sequence. + pub fn incr_path_seq(&self, seq: u64) -> PathBuf { + self.aof_dir().join(format!("moon.aof.{}.incr.aof", seq)) + } + + /// Create the `appendonlydir/` and write the initial manifest. + /// + /// Prefer [`Self::initialize_with_base`] when the in-memory databases + /// already contain state (e.g. first upgrade from legacy single-file AOF + /// or per-shard WAL) — otherwise subsequent boots cannot reconstruct that + /// state because there is no base RDB for `replay_multi_part` to load. + pub fn initialize(dir: &Path) -> std::io::Result { + let manifest = Self { + dir: dir.to_path_buf(), + seq: 1, + }; + std::fs::create_dir_all(manifest.aof_dir())?; + manifest.write_manifest()?; + Ok(manifest) + } + + /// Create the `appendonlydir/` and write an initial manifest with a base RDB + /// capturing the current in-memory state. + /// + /// Used on first upgrade from legacy persistence formats: after + /// `restore_from_persistence` has loaded state from the per-shard WAL or + /// `appendonly.aof`, this call materializes that state as the seq 1 base + /// RDB. Without a base, on the next boot the multi-part replay path would + /// clear the databases and then fail (missing base with non-empty incr) + /// or silently restart from empty state. + pub fn initialize_with_base(dir: &Path, rdb_bytes: &[u8]) -> std::io::Result { + let manifest = Self { + dir: dir.to_path_buf(), + seq: 1, + }; + std::fs::create_dir_all(manifest.aof_dir())?; + + // Write base RDB atomically: tmp file + fsync + rename. + let base_path = manifest.base_path(); + let tmp_path = base_path.with_extension("rdb.tmp"); + { + let mut f = std::fs::File::create(&tmp_path)?; + f.write_all(rdb_bytes)?; + f.sync_data()?; + } + std::fs::rename(&tmp_path, &base_path)?; + + // Create empty incr file so the writer has something to append to. + std::fs::File::create(manifest.incr_path())?; + + manifest.write_manifest()?; + Ok(manifest) + } + + /// Load manifest from disk. + /// + /// Returns: + /// - `Ok(None)` — manifest file does not exist (fresh install or legacy single-file AOF) + /// - `Ok(Some(manifest))` — manifest loaded successfully + /// - `Err(_)` — manifest file exists but is unreadable or corrupt. + /// Callers MUST treat this as fatal: overwriting a corrupt manifest with a + /// fresh one silently destroys the reference to the real base RDB and loses data. + pub fn load(dir: &Path) -> std::io::Result> { + let aof_dir = dir.join(AOF_DIR_NAME); + let manifest_path = aof_dir.join(MANIFEST_NAME); + + if !manifest_path.exists() { + return Ok(None); + } + + let content = std::fs::read_to_string(&manifest_path)?; + + let mut seq = 0u64; + let mut has_base_record = false; + let mut has_incr_record = false; + for line in content.lines() { + let line = line.trim(); + if let Some(val) = line.strip_prefix("seq ") { + if let Ok(n) = val.parse::() { + seq = n; + } + } else if line.starts_with("base ") { + has_base_record = true; + } else if line.starts_with("incr ") { + has_incr_record = true; + } + } + + if seq == 0 { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + format!( + "AOF manifest at {} has no valid sequence number", + manifest_path.display() + ), + )); + } + + // A valid manifest must have all three records (seq, base, incr). + // A truncated manifest with only "seq N" but no base/incr lines could + // trigger orphan cleanup that deletes the real base RDB referenced by + // the previous valid manifest. Require all records before proceeding. + if !has_base_record || !has_incr_record { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + format!( + "AOF manifest at {} is truncated: seq={} base={} incr={}", + manifest_path.display(), + seq, + has_base_record, + has_incr_record, + ), + )); + } + + let manifest = Self { + dir: dir.to_path_buf(), + seq, + }; + + // Best-effort orphan cleanup: delete stray base/incr files from aborted + // rewrites. A crash between advance() steps 1-3 leaves a new base RDB on + // disk that the active manifest never references. Without this sweep, + // repeated crashes during rewrite can fill the disk with zombie files. + // + // Safe to call here: we verified the manifest has all three records + // (seq, base, incr), so cleanup_orphans won't delete the active files. + manifest.cleanup_orphans(); + + Ok(Some(manifest)) + } + + /// Delete any base/incr files in `appendonlydir/` that do not match the + /// current sequence. Best-effort — logs but does not propagate errors. + fn cleanup_orphans(&self) { + let aof_dir = self.aof_dir(); + let entries = match std::fs::read_dir(&aof_dir) { + Ok(e) => e, + Err(_) => return, + }; + let current_base = format!("moon.aof.{}.base.rdb", self.seq); + let current_incr = format!("moon.aof.{}.incr.aof", self.seq); + for entry in entries.flatten() { + let name = entry.file_name(); + let name_str = match name.to_str() { + Some(s) => s, + None => continue, + }; + // Keep manifest, current base, current incr. Delete any other moon.aof.*. + if name_str == MANIFEST_NAME || name_str == current_base || name_str == current_incr { + continue; + } + let is_moon_aof = name_str.starts_with("moon.aof.") + && (name_str.ends_with(".base.rdb") + || name_str.ends_with(".incr.aof") + || name_str.ends_with(".rdb.tmp") + || name_str.ends_with(".tmp")); + if !is_moon_aof { + continue; + } + let path = entry.path(); + match std::fs::remove_file(&path) { + Ok(()) => info!("AOF orphan cleanup: removed {}", path.display()), + Err(e) => warn!( + "AOF orphan cleanup: failed to remove {}: {}", + path.display(), + e + ), + } + } + } + + /// Write the manifest file atomically (write tmp + rename). + pub fn write_manifest(&self) -> std::io::Result<()> { + let manifest_path = self.manifest_path(); + let tmp_path = manifest_path.with_extension("tmp"); + + let content = format!( + "seq {}\nbase moon.aof.{}.base.rdb\nincr moon.aof.{}.incr.aof\n", + self.seq, self.seq, self.seq + ); + + let mut f = std::fs::File::create(&tmp_path)?; + f.write_all(content.as_bytes())?; + f.sync_data()?; + std::fs::rename(&tmp_path, &manifest_path)?; + Ok(()) + } + + /// Advance to the next sequence: write new base RDB, create new incr file, + /// update manifest, delete old files. + /// + /// Returns the path to the new incremental file (caller should switch writing to it). + pub fn advance(&mut self, rdb_bytes: &[u8]) -> Result { + let old_seq = self.seq; + let new_seq = old_seq + 1; + + let aof_dir = self.aof_dir(); + std::fs::create_dir_all(&aof_dir).map_err(|e| crate::error::AofError::Io { + path: aof_dir.clone(), + source: e, + })?; + + // 1. Write new base RDB (atomic: tmp + fsync + rename). + // Must fsync the data BEFORE renaming — a rename without prior fsync + // can publish a file whose contents aren't durable, so a crash leaves + // the manifest pointing at an empty/partial base RDB. + let new_base = self.base_path_seq(new_seq); + let tmp_base = new_base.with_extension("rdb.tmp"); + { + let mut f = + std::fs::File::create(&tmp_base).map_err(|e| crate::error::AofError::Io { + path: tmp_base.clone(), + source: e, + })?; + f.write_all(rdb_bytes) + .map_err(|e| crate::error::AofError::Io { + path: tmp_base.clone(), + source: e, + })?; + f.sync_data().map_err(|e| crate::error::AofError::Io { + path: tmp_base.clone(), + source: e, + })?; + } + std::fs::rename(&tmp_base, &new_base).map_err(|e| { + crate::error::AofError::RewriteFailed { + detail: format!("rename base: {}", e), + } + })?; + + // 2. Create empty new incremental file + let new_incr = self.incr_path_seq(new_seq); + std::fs::File::create(&new_incr).map_err(|e| crate::error::AofError::Io { + path: new_incr.clone(), + source: e, + })?; + + // 3. Update manifest (atomic) + self.seq = new_seq; + self.write_manifest() + .map_err(|e| crate::error::AofError::Io { + path: self.manifest_path(), + source: e, + })?; + + // 4. Delete old files (best-effort) + let old_base = self.base_path_seq(old_seq); + let old_incr = self.incr_path_seq(old_seq); + if old_base.exists() { + if let Err(e) = std::fs::remove_file(&old_base) { + warn!("Failed to delete old base {}: {}", old_base.display(), e); + } + } + if old_incr.exists() { + if let Err(e) = std::fs::remove_file(&old_incr) { + warn!("Failed to delete old incr {}: {}", old_incr.display(), e); + } + } + + info!( + "AOF advanced to seq {}: base={} bytes, incr={}", + new_seq, + rdb_bytes.len(), + new_incr.display() + ); + + Ok(new_incr) + } +} + +/// Replay multi-part AOF: load base RDB then replay incremental RESP. +/// +/// Returns total keys/commands loaded. +pub fn replay_multi_part( + databases: &mut [crate::storage::Database], + manifest: &AofManifest, + engine: &dyn crate::persistence::replay::CommandReplayEngine, +) -> Result { + let mut total = 0usize; + + // Load base RDB + let base_path = manifest.base_path(); + if base_path.exists() { + match crate::persistence::rdb::load(databases, &base_path) { + Ok(n) => { + info!( + "AOF base RDB loaded: {} keys from {}", + n, + base_path.display() + ); + total += n; + } + Err(e) => { + // Base RDB is corrupt or unreadable — applying incremental + // deltas on top of missing/corrupt base gives wrong results. + error!("AOF base RDB load failed: {}", e); + return Err(e); + } + } + } else { + // Missing base is tolerable only when the incr log is also empty + // (fresh manifest from initialize(), or first boot after legacy + // upgrade). If there's incremental content but no base, replaying + // deltas (DEL, EXPIRE, HINCRBY, …) on an empty database produces + // incorrect state — fail loudly rather than silently corrupt. + let incr_path = manifest.incr_path(); + let incr_len = std::fs::metadata(&incr_path).map(|m| m.len()).unwrap_or(0); + if incr_len > 0 { + return Err(crate::error::MoonError::from( + crate::error::AofError::RewriteFailed { + detail: format!( + "AOF base RDB missing at {} but incr {} is {} bytes; refusing to replay incr against empty state", + base_path.display(), + incr_path.display(), + incr_len, + ), + }, + )); + } + warn!( + "AOF base RDB not found: {} (incr empty, treating as fresh init)", + base_path.display() + ); + } + + // Replay incremental RESP + let incr_path = manifest.incr_path(); + if incr_path.exists() { + let data = std::fs::read(&incr_path)?; + if !data.is_empty() { + // Pure RESP — use replay_aof_resp (no RDB preamble detection needed) + let count = replay_incr_resp(databases, &data, engine)?; + info!( + "AOF incr replayed: {} commands from {}", + count, + incr_path.display() + ); + total += count; + } + } + + Ok(total) +} + +/// Replay pure RESP commands from a byte slice. +/// +/// **Corruption handling:** On mid-stream parse errors this returns an error +/// rather than silently resyncing to the next `*` byte. Silent resync in a +/// multi-part AOF is dangerous: an undetected run of dropped commands leaves +/// the database in an inconsistent state that cannot be reconstructed. +/// Truncated tails (parser returns `Ok(None)` with bytes remaining) are +/// logged and treated as the legitimate end of the incremental log, matching +/// `replay_aof` semantics for crash-time tail truncation. +fn replay_incr_resp( + databases: &mut [crate::storage::Database], + data: &[u8], + engine: &dyn crate::persistence::replay::CommandReplayEngine, +) -> Result { + use crate::protocol::{Frame, ParseConfig, parse}; + use bytes::BytesMut; + + let total_len = data.len(); + let mut buf = BytesMut::from(data); + let config = ParseConfig::default(); + let mut selected_db: usize = 0; + let mut count: usize = 0; + + loop { + if buf.is_empty() { + break; + } + match parse::parse(&mut buf, &config) { + Ok(Some(frame)) => { + let (cmd, cmd_args) = match &frame { + Frame::Array(arr) if !arr.is_empty() => { + let name = match &arr[0] { + Frame::BulkString(s) => s.as_ref(), + Frame::SimpleString(s) => s.as_ref(), + other => { + return Err(crate::error::MoonError::from( + crate::error::AofError::RewriteFailed { + detail: format!( + "AOF incr command at offset {} has non-string name frame: {:?}", + total_len - buf.len(), + std::mem::discriminant(other) + ), + }, + )); + } + }; + (name as &[u8], &arr[1..]) + } + other => { + return Err(crate::error::MoonError::from( + crate::error::AofError::RewriteFailed { + detail: format!( + "AOF incr non-array frame at offset {}: {:?}", + total_len - buf.len(), + std::mem::discriminant(other) + ), + }, + )); + } + }; + engine.replay_command(databases, cmd, cmd_args, &mut selected_db); + count += 1; + } + Ok(None) => { + if !buf.is_empty() { + let offset = total_len - buf.len(); + warn!( + "AOF incr truncated tail: {} bytes at offset {} (treating as crash-time EOF)", + buf.len(), + offset + ); + } + break; + } + Err(e) => { + let offset = total_len - buf.len(); + return Err(crate::error::MoonError::from( + crate::error::AofError::RewriteFailed { + detail: format!("AOF incr parse error at offset {}: {:?}", offset, e), + }, + )); + } + } + } + + Ok(count) +} diff --git a/src/persistence/mod.rs b/src/persistence/mod.rs index ded689b1..5c51d4a5 100644 --- a/src/persistence/mod.rs +++ b/src/persistence/mod.rs @@ -1,4 +1,5 @@ pub mod aof; +pub mod aof_manifest; pub mod auto_save; pub mod checkpoint; pub mod clog; diff --git a/src/persistence/rdb.rs b/src/persistence/rdb.rs index a14ec0d6..544845e1 100644 --- a/src/persistence/rdb.rs +++ b/src/persistence/rdb.rs @@ -47,7 +47,11 @@ const EOF_MARKER: u8 = 0xFF; /// Uses atomic write (write to .tmp, then rename) for crash safety. /// Expired keys are skipped. Empty databases are skipped. /// Footer contains CRC32 checksum of all preceding bytes. -pub fn save(databases: &[Database], path: &Path) -> Result<(), MoonError> { +/// Serialize all databases to RDB format in memory. +/// +/// Returns the complete RDB byte stream (header + entries + footer + CRC32). +/// Used by both `save()` (file) and AOF RDB-preamble rewrite. +pub fn save_to_bytes(databases: &[Database]) -> Result, MoonError> { let mut buf = Vec::new(); // Header @@ -60,7 +64,6 @@ pub fn save(databases: &[Database], path: &Path) -> Result<(), MoonError> { for (db_idx, db) in databases.iter().enumerate() { let base_ts = db.base_timestamp(); let data = db.data(); - // Collect non-expired entries let live: Vec<_> = data .iter() .filter(|(_, entry)| !entry.is_expired_at(base_ts, now_ms)) @@ -86,6 +89,12 @@ pub fn save(databases: &[Database], path: &Path) -> Result<(), MoonError> { let checksum = hasher.finalize(); buf.write_all(&checksum.to_le_bytes())?; + Ok(buf) +} + +pub fn save(databases: &[Database], path: &Path) -> Result<(), MoonError> { + let buf = save_to_bytes(databases)?; + // Atomic write: write to tmp, then rename let tmp_path = path.with_extension("rdb.tmp"); std::fs::write(&tmp_path, &buf).map_err(|e| RdbError::Io { @@ -154,6 +163,46 @@ pub fn save_from_snapshot( Ok(()) } +/// Serialize snapshot data (with correct base_ts per database) to RDB bytes in memory. +/// +/// Unlike `save_to_bytes(&[Database])` which reads base_ts from each Database, +/// this takes explicit (entries, base_ts) tuples — critical for AOF rewrite where +/// entries are cloned into temporary storage and the original base_ts must be preserved. +pub fn save_snapshot_to_bytes( + snapshot: &[(Vec<(CompactKey, Entry)>, u32)], +) -> Result, MoonError> { + let mut buf = Vec::new(); + + buf.write_all(RDB_MAGIC)?; + buf.write_all(&[RDB_VERSION])?; + + let now_ms = current_time_ms(); + + for (db_idx, (entries, base_ts)) in snapshot.iter().enumerate() { + let live: Vec<_> = entries + .iter() + .filter(|(_, e)| !e.is_expired_at(*base_ts, now_ms)) + .collect(); + if live.is_empty() { + continue; + } + + buf.write_all(&[DB_SELECTOR])?; + buf.write_all(&[db_idx as u8])?; + + for (key, entry) in live { + write_entry(&mut buf, key.as_bytes(), entry, *base_ts)?; + } + } + + buf.write_all(&[EOF_MARKER])?; + let mut hasher = Hasher::new(); + hasher.update(&buf); + buf.write_all(&hasher.finalize().to_le_bytes())?; + + Ok(buf) +} + /// Load an RDB file and populate databases. Returns total keys loaded. /// /// On any error (missing file, corrupt data, bad checksum), returns Err. @@ -175,25 +224,28 @@ pub fn load(databases: &mut [Database], path: &Path) -> Result .into()); } + // Wrap in Bytes for zero-copy slicing (shared refcount, no copy) + let shared_buf = Bytes::from(data); + // Verify CRC32: all bytes except last 4 vs last 4 bytes - let (payload, checksum_bytes) = data.split_at(data.len() - 4); + let payload_len = shared_buf.len() - 4; let stored_checksum = u32::from_le_bytes([ - checksum_bytes[0], - checksum_bytes[1], - checksum_bytes[2], - checksum_bytes[3], + shared_buf[payload_len], + shared_buf[payload_len + 1], + shared_buf[payload_len + 2], + shared_buf[payload_len + 3], ]); let mut hasher = Hasher::new(); - hasher.update(payload); + hasher.update(&shared_buf[..payload_len]); let computed_checksum = hasher.finalize(); if stored_checksum != computed_checksum { return Err(RdbError::ChecksumMismatch.into()); } - let mut cursor = Cursor::new(payload); + let mut cursor = Cursor::new(&shared_buf[..payload_len] as &[u8]); // Verify magic - let mut magic = [0u8; 4]; // "MOON" is 4 bytes + let mut magic = [0u8; 4]; cursor.read_exact(&mut magic).map_err(|e| RdbError::Io { path: path.to_path_buf(), source: e, @@ -218,18 +270,33 @@ pub fn load(databases: &mut [Database], path: &Path) -> Result .into()); } + // Cache timestamps once (Fix #4: avoid syscall per entry) let now_ms = current_time_ms(); + let now_secs = (now_ms / 1000) as u32; + + // Load into temporary databases so old keys not present in the RDB + // snapshot are discarded. An RDB file is a full point-in-time snapshot + // and must replace state, not merge into it. Loading into temps also + // provides atomicity: on failure, the live databases are untouched. + let db_count = databases.len(); + let mut temp_dbs: Vec = (0..db_count).map(|_| Database::new()).collect(); + + // First pass: count entries per database for pre-sizing + let entry_counts = count_entries_per_db(&cursor, db_count); + for (db_idx, &count) in entry_counts.iter().enumerate() { + if count > 0 && db_idx < db_count { + temp_dbs[db_idx].reserve(count); + } + } let mut total_keys = 0usize; - let mut skipped_entries = 0usize; let mut current_db: usize = 0; loop { let mut tag = [0u8; 1]; if cursor.read_exact(&mut tag).is_err() { - // Truncated tail: no more data to read, treat as implicit EOF tracing::warn!( - "RDB load: truncated tail after {} keys (no EOF marker), treating as end of file", + "RDB load: truncated tail after {} keys (no EOF marker)", total_keys ); break; @@ -244,63 +311,599 @@ pub fn load(databases: &mut [Database], path: &Path) -> Result source: e, })?; current_db = db_idx[0] as usize; - if current_db >= databases.len() { + if current_db >= db_count { return Err(RdbError::Corrupted { detail: format!( "RDB references database {} but only {} configured", - current_db, - databases.len() + current_db, db_count ), } .into()); } } - type_tag => { - // Mid-stream corruption recovery: log+skip on entry parse failure - match read_entry(&mut cursor, type_tag) { - Ok((key, entry)) => { - // Skip entries whose TTL is already in the past - if entry.has_expiry() && entry.is_expired_at(current_secs(), now_ms) { - continue; - } - if current_db < databases.len() { - databases[current_db].set(key, entry); - total_keys += 1; - } + type_tag => match read_entry_zero_copy(&mut cursor, type_tag, now_secs) { + Ok((key, entry)) => { + if entry.has_expiry() && entry.is_expired_at(now_secs, now_ms) { + continue; } - Err(e) => { - let offset = cursor.position(); - tracing::warn!( - "RDB load: skipping corrupted entry at offset {}: {}", - offset, - e - ); - skipped_entries += 1; - // Cannot reliably skip to next entry in a variable-length - // format without framing, so break out of the loop. - // Entries loaded so far are valid (checksum passed). - tracing::warn!( - "RDB load: stopping mid-stream recovery after {} skipped entries; \ - {} keys loaded successfully", - skipped_entries, + if current_db < db_count { + temp_dbs[current_db].insert_for_load(key, entry); + total_keys += 1; + } + } + Err(e) => { + // Do NOT swap partially-loaded temp_dbs into live databases. + // A corrupted-but-checksummed RDB must not commit partial state. + return Err(RdbError::Corrupted { + detail: format!( + "RDB load: corrupted entry at offset {}: {}. {} keys loaded before failure.", + cursor.position(), + e, total_keys + ), + } + .into()); + } + }, + } + } + + // Recalculate memory on temp databases, then swap into live ones. + // Only reached if all entries parsed successfully — no partial state. + for (live, mut temp) in databases.iter_mut().zip(temp_dbs.into_iter()) { + temp.recalculate_memory(); + *live = temp; + } + + Ok(total_keys) +} + +/// Fast first-pass: count entries per database without parsing values. +/// Scans type tags and skips over entry payloads to count keys per db_idx. +fn count_entries_per_db(cursor: &Cursor<&[u8]>, db_count: usize) -> Vec { + let mut counts = vec![0usize; db_count]; + let data = cursor.get_ref(); + let mut pos = cursor.position() as usize; + let mut current_db = 0usize; + + while pos < data.len() { + let tag = data[pos]; + pos += 1; + + match tag { + EOF_MARKER => break, + DB_SELECTOR => { + if pos < data.len() { + current_db = data[pos] as usize; + pos += 1; + } else { + break; + } + } + TYPE_STRING | TYPE_HASH | TYPE_LIST | TYPE_SET | TYPE_SORTED_SET | TYPE_STREAM => { + if current_db < db_count { + counts[current_db] += 1; + } + // Skip over the entry payload without parsing + if let Some(new_pos) = skip_entry(data, pos, tag) { + pos = new_pos; + } else { + break; + } + } + _ => break, + } + } + + counts +} + +/// Skip over an RDB entry's bytes without allocating or parsing values. +/// Returns the new position after the entry, or None if data is truncated. +fn skip_entry(data: &[u8], mut pos: usize, type_tag: u8) -> Option { + // Skip key + pos = skip_bytes_field(data, pos)?; + // Skip TTL (8 bytes) + pos = pos.checked_add(8)?; + if pos > data.len() { + return None; + } + + match type_tag { + TYPE_STRING => { + pos = skip_bytes_field(data, pos)?; + } + TYPE_HASH => { + let count = read_u32_raw(data, pos)?; + pos += 4; + for _ in 0..count { + pos = skip_bytes_field(data, pos)?; // field + pos = skip_bytes_field(data, pos)?; // value + } + } + TYPE_LIST | TYPE_SET => { + let count = read_u32_raw(data, pos)?; + pos += 4; + for _ in 0..count { + pos = skip_bytes_field(data, pos)?; + } + } + TYPE_SORTED_SET => { + let count = read_u32_raw(data, pos)?; + pos += 4; + for _ in 0..count { + pos = skip_bytes_field(data, pos)?; // member + pos = pos.checked_add(8)?; // f64 score + if pos > data.len() { + return None; + } + } + } + TYPE_STREAM => { + // entry_count(8) + last_id(16) + pos = pos.checked_add(24)?; + if pos > data.len() { + return None; + } + let entry_count = + u64::from_le_bytes(data[pos - 24..pos - 16].try_into().ok()?) as usize; + for _ in 0..entry_count { + pos = pos.checked_add(16)?; // StreamId (ms + seq) + if pos > data.len() { + return None; + } + let field_count = read_u32_raw(data, pos)?; + pos += 4; + for _ in 0..field_count { + pos = skip_bytes_field(data, pos)?; + pos = skip_bytes_field(data, pos)?; + } + } + // Consumer groups + let group_count = read_u32_raw(data, pos)?; + pos += 4; + for _ in 0..group_count { + pos = skip_bytes_field(data, pos)?; // group name + pos = pos.checked_add(16)?; // last_delivered_id + if pos > data.len() { + return None; + } + let pel_count = read_u32_raw(data, pos)?; + pos += 4; + for _ in 0..pel_count { + pos = pos.checked_add(16)?; // StreamId + if pos > data.len() { + return None; + } + pos = skip_bytes_field(data, pos)?; // consumer name + pos = pos.checked_add(16)?; // delivery_time + delivery_count + if pos > data.len() { + return None; + } + } + let consumer_count = read_u32_raw(data, pos)?; + pos += 4; + for _ in 0..consumer_count { + pos = skip_bytes_field(data, pos)?; // consumer name + pos = pos.checked_add(8)?; // seen_time + if pos > data.len() { + return None; + } + let pending_count = read_u32_raw(data, pos)?; + pos += 4; + for _ in 0..pending_count { + pos = pos.checked_add(16)?; // StreamId + if pos > data.len() { + return None; + } + } + } + } + } + _ => return None, + } + + Some(pos) +} + +/// Read u32 LE from raw bytes without cursor overhead. +#[inline] +fn read_u32_raw(data: &[u8], pos: usize) -> Option { + if pos + 4 > data.len() { + return None; + } + Some(u32::from_le_bytes(data[pos..pos + 4].try_into().ok()?) as usize) +} + +/// Skip a length-prefixed bytes field (4-byte LE length + payload). +#[inline] +fn skip_bytes_field(data: &[u8], pos: usize) -> Option { + let len = read_u32_raw(data, pos)?; + let new_pos = pos.checked_add(4)?.checked_add(len)?; + if new_pos > data.len() { + None + } else { + Some(new_pos) + } +} + +/// Variant of read_entry using cached timestamps to avoid per-entry syscalls. +/// +/// Earlier revisions threaded a `shared_buf: &Bytes` through this path for +/// zero-copy slicing via `read_bytes_zero_copy`, but that helper was never +/// wired up — `read_bytes` currently always heap-allocates. The parameter +/// and the caller-side `Bytes::copy_from_slice(data)` that fed it have been +/// removed; restoring true zero-copy should add it back as part of a single +/// landed change, not as vestigial plumbing. +fn read_entry_zero_copy( + cursor: &mut Cursor<&[u8]>, + type_tag: u8, + cached_secs: u32, +) -> Result<(Bytes, Entry), MoonError> { + let key = read_bytes(cursor)?; + + let mut ttl_buf = [0u8; 8]; + cursor.read_exact(&mut ttl_buf)?; + let ttl_ms = i64::from_le_bytes(ttl_buf); + let expires_at_ms = if ttl_ms > 0 { ttl_ms as u64 } else { 0 }; + + let value = match type_tag { + TYPE_STRING => { + // Fast path: build CompactValue directly from Vec, skipping RedisValue intermediate. + // This avoids: Vec → Bytes → RedisValue::String → from_redis_value → heap_string_vec + // and instead does: Vec → CompactValue directly (one Box alloc, zero copy). + let vec = read_bytes_vec(cursor)?; + let cv = if vec.len() <= 12 { + crate::storage::compact_value::CompactValue::from_redis_value(RedisValue::String( + Bytes::from(vec), + )) + } else { + crate::storage::compact_value::CompactValue::heap_string_vec_direct(vec) + }; + let mut entry = Entry::new_string(Bytes::new()); + entry.value = cv; + if expires_at_ms > 0 { + entry.set_expires_at_ms(cached_secs, expires_at_ms); + } + entry.set_last_access(cached_secs); + entry.set_access_counter(5); + return Ok((key, entry)); + } + TYPE_HASH => { + let count = read_u32(cursor)? as usize; + validate_count(cursor, count, 8, "hash")?; + let mut map = HashMap::with_capacity(count); + for _ in 0..count { + let field = read_bytes(cursor)?; + let val = read_bytes(cursor)?; + map.insert(field, val); + } + RedisValue::Hash(map) + } + TYPE_LIST => { + let count = read_u32(cursor)? as usize; + validate_count(cursor, count, 4, "list")?; + let mut list = VecDeque::with_capacity(count); + for _ in 0..count { + list.push_back(read_bytes(cursor)?); + } + RedisValue::List(list) + } + TYPE_SET => { + let count = read_u32(cursor)? as usize; + validate_count(cursor, count, 4, "set")?; + let mut set = HashSet::with_capacity(count); + for _ in 0..count { + set.insert(read_bytes(cursor)?); + } + RedisValue::Set(set) + } + TYPE_SORTED_SET => { + let count = read_u32(cursor)? as usize; + validate_count(cursor, count, 12, "sorted_set")?; + let mut members = HashMap::with_capacity(count); + let mut tree = BPTree::new(); + for _ in 0..count { + let member = read_bytes(cursor)?; + let mut score_buf = [0u8; 8]; + cursor.read_exact(&mut score_buf)?; + let score = f64::from_le_bytes(score_buf); + members.insert(member.clone(), score); + tree.insert(OrderedFloat(score), member); + } + RedisValue::SortedSetBPTree { tree, members } + } + TYPE_STREAM => { + // Stream parsing: reuse read_bytes (not zero-copy for this rare type) + let mut entry_count_buf = [0u8; 8]; + cursor.read_exact(&mut entry_count_buf)?; + let entry_count = u64::from_le_bytes(entry_count_buf) as usize; + let mut last_id_ms_buf = [0u8; 8]; + let mut last_id_seq_buf = [0u8; 8]; + cursor.read_exact(&mut last_id_ms_buf)?; + cursor.read_exact(&mut last_id_seq_buf)?; + let last_id = StreamId { + ms: u64::from_le_bytes(last_id_ms_buf), + seq: u64::from_le_bytes(last_id_seq_buf), + }; + let mut stream = StreamData::new(); + stream.last_id = last_id; + validate_count(cursor, entry_count, 20, "stream_entries")?; + for _ in 0..entry_count { + let mut ms_buf = [0u8; 8]; + let mut seq_buf = [0u8; 8]; + cursor.read_exact(&mut ms_buf)?; + cursor.read_exact(&mut seq_buf)?; + let id = StreamId { + ms: u64::from_le_bytes(ms_buf), + seq: u64::from_le_bytes(seq_buf), + }; + let field_count = read_u32(cursor)? as usize; + validate_count(cursor, field_count, 8, "stream_fields")?; + let mut fields = Vec::with_capacity(field_count); + for _ in 0..field_count { + fields.push((read_bytes(cursor)?, read_bytes(cursor)?)); + } + stream.entries.insert(id, fields); + stream.length += 1; + } + let group_count = read_u32(cursor)? as usize; + for _ in 0..group_count { + let group_name = read_bytes(cursor)?; + let mut gld_ms = [0u8; 8]; + let mut gld_seq = [0u8; 8]; + cursor.read_exact(&mut gld_ms)?; + cursor.read_exact(&mut gld_seq)?; + let last_delivered_id = StreamId { + ms: u64::from_le_bytes(gld_ms), + seq: u64::from_le_bytes(gld_seq), + }; + let pel_count = read_u32(cursor)? as usize; + let mut pel = BTreeMap::new(); + for _ in 0..pel_count { + let mut pid_ms = [0u8; 8]; + let mut pid_seq = [0u8; 8]; + cursor.read_exact(&mut pid_ms)?; + cursor.read_exact(&mut pid_seq)?; + let pid = StreamId { + ms: u64::from_le_bytes(pid_ms), + seq: u64::from_le_bytes(pid_seq), + }; + let consumer_name = read_bytes(cursor)?; + let mut dt_buf = [0u8; 8]; + let mut dc_buf = [0u8; 8]; + cursor.read_exact(&mut dt_buf)?; + cursor.read_exact(&mut dc_buf)?; + pel.insert( + pid, + crate::storage::stream::PendingEntry { + consumer: consumer_name, + delivery_time: u64::from_le_bytes(dt_buf), + delivery_count: u64::from_le_bytes(dc_buf), + }, + ); + } + let consumer_count = read_u32(cursor)? as usize; + let mut consumers = HashMap::new(); + for _ in 0..consumer_count { + let cname = read_bytes(cursor)?; + let mut st_buf = [0u8; 8]; + cursor.read_exact(&mut st_buf)?; + let seen_time = u64::from_le_bytes(st_buf); + let pending_count = read_u32(cursor)? as usize; + let mut pending = BTreeMap::new(); + for _ in 0..pending_count { + let mut cid_ms = [0u8; 8]; + let mut cid_seq = [0u8; 8]; + cursor.read_exact(&mut cid_ms)?; + cursor.read_exact(&mut cid_seq)?; + pending.insert( + StreamId { + ms: u64::from_le_bytes(cid_ms), + seq: u64::from_le_bytes(cid_seq), + }, + (), ); - break; } + consumers.insert( + cname.clone(), + crate::storage::stream::Consumer { + name: cname, + pending, + seen_time, + }, + ); } + stream.groups.insert( + group_name, + crate::storage::stream::ConsumerGroup { + last_delivered_id, + pel, + consumers, + }, + ); } + RedisValue::Stream(Box::new(stream)) } + _ => return Err(RdbError::UnsupportedType { type_tag }.into()), + }; + + let mut entry = Entry::new_string(Bytes::new()); + entry.value = crate::storage::compact_value::CompactValue::from_redis_value(value); + if expires_at_ms > 0 { + entry.set_expires_at_ms(cached_secs, expires_at_ms); } + entry.set_last_access(cached_secs); + entry.set_access_counter(5); - if skipped_entries > 0 { - tracing::warn!( - "RDB load completed with {} entries skipped due to corruption, {} keys loaded", - skipped_entries, - total_keys - ); + Ok((key, entry)) +} + +/// Load an RDB snapshot from a byte slice (for AOF RDB-preamble format). +/// +/// Returns `(keys_loaded, bytes_consumed)`. The caller can use `bytes_consumed` +/// to find the start of any RESP commands appended after the RDB preamble. +pub fn load_from_bytes( + databases: &mut [Database], + data: &[u8], +) -> Result<(usize, usize), MoonError> { + if data.len() < RDB_MAGIC.len() + 1 + 1 + 4 { + return Err(RdbError::Corrupted { + detail: "RDB preamble too small".into(), + } + .into()); } - Ok(total_keys) + // Find EOF_MARKER to determine RDB section length. + // The RDB section is: header + entries + EOF_MARKER(1) + CRC32(4). + // We scan for EOF_MARKER (0xFF) — the first one after the header that's + // immediately followed by a valid CRC32 of the preceding bytes. + // + // Single-pass: maintain a running CRC hasher updated byte-by-byte. + // When we hit a candidate EOF_MARKER at position i (i >= 5), clone + // the hasher (which includes data[0..i]), finalize with the EOF byte, + // and compare against the stored CRC at data[i+1..i+5]. This avoids + // re-hashing the entire prefix for each candidate (O(n) vs O(n²)). + let mut rdb_end = None; + let mut running_hasher = Hasher::new(); + // Feed bytes 0..5 (header) into the running hasher + if data.len() > 5 { + running_hasher.update(&data[..5]); + } + for i in 5..data.len().saturating_sub(4) { + if data[i] == EOF_MARKER { + // Clone running hasher (covers data[0..i]), then finalize with EOF byte + let mut candidate = running_hasher.clone(); + candidate.update(&[EOF_MARKER]); + if let Some(checksum_bytes) = data.get(i + 1..i + 5) { + let stored = u32::from_le_bytes([ + checksum_bytes[0], + checksum_bytes[1], + checksum_bytes[2], + checksum_bytes[3], + ]); + if candidate.finalize() == stored { + rdb_end = Some(i + 5); // past CRC32 + break; + } + } + } + // Feed this byte into the running hasher for the next iteration + running_hasher.update(&data[i..i + 1]); + } + + let rdb_len = rdb_end.ok_or_else(|| { + MoonError::from(RdbError::Corrupted { + detail: "RDB preamble: no valid EOF+CRC found".into(), + }) + })?; + + // Load using the same logic as `load`, but from the byte slice + let payload = &data[..rdb_len - 4]; // exclude CRC32 + let mut cursor = Cursor::new(payload); + + // Skip magic + version + let mut magic = [0u8; 4]; + cursor.read_exact(&mut magic).map_err(|e| RdbError::Io { + path: std::path::PathBuf::from(""), + source: e, + })?; + if &magic != RDB_MAGIC { + return Err(RdbError::Corrupted { + detail: "invalid RDB magic in AOF preamble".into(), + } + .into()); + } + let mut version = [0u8; 1]; + cursor.read_exact(&mut version).map_err(|e| RdbError::Io { + path: std::path::PathBuf::from(""), + source: e, + })?; + if version[0] != RDB_VERSION { + return Err(RdbError::UnsupportedVersion { + version: version[0] as u32, + } + .into()); + } + + let now_ms = current_time_ms(); + let now_secs = (now_ms / 1000) as u32; + let mut total_keys = 0usize; + let mut current_db: usize = 0; + + // Load into temporary databases so that: + // (a) If the load fails partway, original state is untouched. + // (b) Old keys not present in the RDB snapshot don't survive — an RDB + // preamble is a full point-in-time snapshot and must replace state, + // not merge into it. + let db_count = databases.len(); + let mut temp_dbs: Vec = (0..db_count).map(|_| Database::new()).collect(); + + // Pre-size DashTables on the temporary databases + let entry_counts = count_entries_per_db(&cursor, db_count); + for (db_idx, &count) in entry_counts.iter().enumerate() { + if count > 0 && db_idx < db_count { + temp_dbs[db_idx].reserve(count); + } + } + + loop { + let mut tag = [0u8; 1]; + if cursor.read_exact(&mut tag).is_err() { + break; + } + match tag[0] { + EOF_MARKER => break, + DB_SELECTOR => { + let mut db_idx = [0u8; 1]; + cursor.read_exact(&mut db_idx).map_err(|e| RdbError::Io { + path: std::path::PathBuf::from(""), + source: e, + })?; + current_db = db_idx[0] as usize; + if current_db >= db_count { + return Err(RdbError::Corrupted { + detail: format!( + "RDB preamble references database {} but only {} configured", + current_db, db_count + ), + } + .into()); + } + } + type_tag => match read_entry_zero_copy(&mut cursor, type_tag, now_secs) { + Ok((key, entry)) => { + if entry.has_expiry() && entry.is_expired_at(now_secs, now_ms) { + continue; + } + if current_db < db_count { + temp_dbs[current_db].insert_for_load(key, entry); + total_keys += 1; + } + } + Err(e) => { + return Err(RdbError::Corrupted { + detail: format!( + "RDB preamble: corrupted entry at offset {}: {}. {} keys loaded before failure.", + cursor.position(), + e, + total_keys + ), + } + .into()); + } + }, + } + } + + // Recalculate memory on temp databases, then swap into the live ones. + // Only reached if all entries parsed successfully — no partial state. + for (live, mut temp) in databases.iter_mut().zip(temp_dbs.into_iter()) { + temp.recalculate_memory(); + *live = temp; + } + + Ok((total_keys, rdb_len)) } /// Distribute keys from loaded databases to the correct per-shard databases. @@ -756,7 +1359,8 @@ fn validate_count( pub(crate) fn read_bytes(cursor: &mut Cursor<&[u8]>) -> Result { let len = read_u32(cursor)? as usize; - let remaining = cursor.get_ref().len() - cursor.position() as usize; + let pos = cursor.position() as usize; + let remaining = cursor.get_ref().len() - pos; if len > remaining { return Err(RdbError::Corrupted { detail: format!( @@ -766,9 +1370,29 @@ pub(crate) fn read_bytes(cursor: &mut Cursor<&[u8]>) -> Result } .into()); } - let mut data = vec![0u8; len]; - cursor.read_exact(&mut data)?; - Ok(Bytes::from(data)) + let slice = &cursor.get_ref()[pos..pos + len]; + cursor.set_position((pos + len) as u64); + Ok(Bytes::copy_from_slice(slice)) +} + +/// Read bytes as owned Vec — avoids Bytes intermediate for RDB load path. +/// Single allocation directly to the right size, no refcount overhead. +pub(crate) fn read_bytes_vec(cursor: &mut Cursor<&[u8]>) -> Result, MoonError> { + let len = read_u32(cursor)? as usize; + let pos = cursor.position() as usize; + let remaining = cursor.get_ref().len() - pos; + if len > remaining { + return Err(RdbError::Corrupted { + detail: format!( + "read_bytes_vec: length {} exceeds remaining {}", + len, remaining + ), + } + .into()); + } + let slice = &cursor.get_ref()[pos..pos + len]; + cursor.set_position((pos + len) as u64); + Ok(slice.to_vec()) } pub(crate) fn read_u32(cursor: &mut Cursor<&[u8]>) -> Result { diff --git a/src/server/conn/handler_monoio.rs b/src/server/conn/handler_monoio.rs index a97856d1..5186830c 100644 --- a/src/server/conn/handler_monoio.rs +++ b/src/server/conn/handler_monoio.rs @@ -1204,6 +1204,18 @@ pub async fn handle_connection_sharded_monoio< responses.push(crate::command::persistence::handle_lastsave()); continue; } + // BGREWRITEAOF -- multi-part AOF rewrite + if cmd.eq_ignore_ascii_case(b"BGREWRITEAOF") { + if let Some(ref tx) = aof_tx { + responses.push(crate::command::persistence::bgrewriteaof_start_sharded( + tx, + shard_databases.clone(), + )); + } else { + responses.push(Frame::Error(Bytes::from_static(b"ERR AOF is not enabled"))); + } + continue; + } // === ACL permission check (NOPERM gate) === // Exempt commands (AUTH, HELLO, QUIT, ACL) already handled above. diff --git a/src/server/conn/handler_sharded.rs b/src/server/conn/handler_sharded.rs index 404643f8..6be205ca 100644 --- a/src/server/conn/handler_sharded.rs +++ b/src/server/conn/handler_sharded.rs @@ -1214,6 +1214,13 @@ pub async fn handle_connection_sharded_inner< continue; } + // --- MULTI queue mode --- + if in_multi { + command_queue.push(frame); + responses.push(Frame::SimpleString(Bytes::from_static(b"QUEUED"))); + continue; + } + // --- BGSAVE --- if cmd.eq_ignore_ascii_case(b"BGSAVE") { responses.push(crate::command::persistence::bgsave_start_sharded(&snapshot_trigger_tx, num_shards)); @@ -1227,11 +1234,17 @@ pub async fn handle_connection_sharded_inner< responses.push(crate::command::persistence::handle_lastsave()); continue; } - - // --- MULTI queue mode --- - if in_multi { - command_queue.push(frame); - responses.push(Frame::SimpleString(Bytes::from_static(b"QUEUED"))); + if cmd.eq_ignore_ascii_case(b"BGREWRITEAOF") { + if let Some(ref tx) = aof_tx { + responses.push(crate::command::persistence::bgrewriteaof_start_sharded( + tx, + shard_databases.clone(), + )); + } else { + responses.push(Frame::Error(Bytes::from_static( + b"ERR AOF is not enabled", + ))); + } continue; } diff --git a/src/shard/shared_databases.rs b/src/shard/shared_databases.rs index 663ee418..b299ad27 100644 --- a/src/shard/shared_databases.rs +++ b/src/shard/shared_databases.rs @@ -138,6 +138,13 @@ impl ShardDatabases { self.num_shards } + /// Return a reference to all databases across all shards (for AOF rewrite). + /// Callers iterate shards × dbs and acquire read locks individually. + #[inline] + pub fn all_shard_dbs(&self) -> &[Vec>] { + &self.shards + } + /// Number of databases per shard (typically 16). #[inline] pub fn db_count(&self) -> usize { diff --git a/src/storage/compact_value.rs b/src/storage/compact_value.rs index d39a88cf..e830b348 100644 --- a/src/storage/compact_value.rs +++ b/src/storage/compact_value.rs @@ -42,7 +42,7 @@ const HEAP_TAG_MASK: usize = 0x7; /// Thin wrapper for heap-allocated strings. /// At 24 bytes (Vec), this is smaller than RedisValue::String(Bytes) (~40 bytes) -/// and avoids the enum discriminant overhead. +/// and avoids the enum discriminant + refcount overhead. struct HeapString(Vec); /// Borrowed view of a CompactValue, for zero-copy read access. @@ -139,16 +139,13 @@ impl CompactValue { /// `RedisValue` enum wrapper (~40B savings per heap string). /// Collections are still stored as `Box`. pub fn from_redis_value(value: RedisValue) -> Self { - match &value { - RedisValue::String(s) if s.len() <= SSO_MAX_LEN => { - return Self::inline_string(s); - } - _ => {} - } - - // String heap path: store as Box<[u8]> directly (no RedisValue wrapper) - if let RedisValue::String(s) = &value { - return Self::heap_string(s); + // String fast path: inline SSO or zero-copy owned Bytes + if let RedisValue::String(s) = value { + return if s.len() <= SSO_MAX_LEN { + Self::inline_string(&s) + } else { + Self::heap_string_owned(s) + }; } // Collection heap path: store as Box @@ -183,10 +180,29 @@ impl CompactValue { } } - /// Create a heap-allocated string CompactValue from byte data. - /// Stores as `Box` — eliminates RedisValue enum wrapper. - /// HeapString is 24 bytes (Vec) vs RedisValue::String(Bytes) at ~40 bytes. + /// Create a heap-allocated string CompactValue from a byte slice (copies data). pub fn heap_string(data: &[u8]) -> Self { + Self::heap_string_vec(data.to_vec()) + } + + /// Create from owned Bytes (converts to Vec via Bytes::into for zero-copy + /// when Bytes has unique ownership, or copies when shared). + pub fn heap_string_owned(data: Bytes) -> Self { + // Bytes::into::> is zero-copy when refcount == 1, copies otherwise + Self::heap_string_vec(data.into()) + } + + /// Create from an owned Vec directly — no copy, no refcount. + /// This is the fastest path: one Box allocation for the HeapString wrapper. + /// Public for RDB loader fast path. + pub fn heap_string_vec_direct(data: Vec) -> Self { + if data.len() <= SSO_MAX_LEN { + return Self::inline_string(&data); + } + Self::heap_string_vec(data) + } + + fn heap_string_vec(data: Vec) -> Self { debug_assert!(data.len() > SSO_MAX_LEN); let str_len = data.len(); @@ -194,7 +210,7 @@ impl CompactValue { let copy_len = str_len.min(4); prefix[..copy_len].copy_from_slice(&data[..copy_len]); - let hs = Box::new(HeapString(data.to_vec())); + let hs = Box::new(HeapString(data)); let raw_ptr = Box::into_raw(hs) as usize; debug_assert!( raw_ptr & HEAP_TAG_MASK == 0, @@ -322,6 +338,7 @@ impl CompactValue { /// Get a mutable reference to the heap string bytes. /// Returns None for non-string types and inline values. + /// Note: returns `Option<&mut Vec>` for the underlying byte buffer. pub fn as_bytes_mut(&mut self) -> Option<&mut Vec> { if self.is_inline() { None diff --git a/src/storage/db.rs b/src/storage/db.rs index 27b5176a..0871cd71 100644 --- a/src/storage/db.rs +++ b/src/storage/db.rs @@ -434,6 +434,56 @@ impl Database { self.data.insert(CompactKey::from(key), entry); } + /// Clear all entries and reset memory accounting. + /// + /// Used during multi-part AOF recovery to wipe any state populated by + /// earlier recovery phases (per-shard WAL replay, legacy appendonly.aof) + /// before loading the authoritative base RDB + incr log. Without this, + /// non-idempotent commands from pre-existing state would be double-applied. + pub fn clear(&mut self) { + self.data = DashTable::new(); + self.used_memory = 0; + } + + /// Bulk-load insert: skip duplicate check, version tracking, and per-key memory accounting. + /// + /// Used exclusively during RDB/AOF restore where keys are guaranteed unique and + /// we recalculate `used_memory` once after the entire load completes. + #[inline] + pub fn insert_for_load(&mut self, key: Bytes, entry: Entry) { + self.data.insert(CompactKey::from(key), entry); + } + + /// Recalculate `used_memory` by scanning all entries. Call once after bulk load. + pub fn recalculate_memory(&mut self) { + let mut total = 0usize; + for (key, entry) in self.data.iter() { + total += entry_overhead(key.as_bytes(), entry); + } + self.used_memory = total; + } + + /// Pre-size the internal hash table for an expected key count. + /// + /// WARNING: This REPLACES the internal hash table with a fresh one sized for + /// `additional` entries. It MUST be called on an empty `Database` (typically + /// immediately after `Database::new()` during RDB/AOF bulk load). Calling it + /// on a populated database silently discards all entries. + /// + /// Named `reserve` rather than `reset_with_capacity` to match the plan + /// nomenclature, but the debug assertion guards the misuse case. + pub fn reserve(&mut self, additional: usize) { + debug_assert!( + self.data.is_empty(), + "Database::reserve() must only be called on an empty database (bulk-load pre-sizing); called with {} existing entries", + self.data.len() + ); + if additional > self.data.len() { + let new_table = DashTable::with_capacity(additional); + self.data = new_table; + } + } + /// Remove a key and return its entry. No expiry check needed (DEL removes regardless). pub fn remove(&mut self, key: &[u8]) -> Option { if let Some(entry) = self.data.remove(key) {