Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions src/command/persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,21 @@ pub fn bgrewriteaof_start(aof_tx: &channel::MpscSender<AofMessage>, db: SharedDa
}
}

/// Start BGREWRITEAOF in sharded mode using ShardDatabases.
pub fn bgrewriteaof_start_sharded(
aof_tx: &channel::MpscSender<AofMessage>,
shard_databases: std::sync::Arc<crate::shard::shared_databases::ShardDatabases>,
) -> Frame {
match aof_tx.try_send(AofMessage::RewriteSharded(shard_databases)) {
Ok(()) => Frame::SimpleString(Bytes::from_static(
b"Background append only file rewriting started",
)),
Err(_) => Frame::Error(Bytes::from_static(
b"ERR Background AOF rewrite failed to start",
)),
}
}

/// SAVE command: synchronous save to disk. Blocks until complete.
///
/// Clones all entries under read locks (same as BGSAVE), then serializes
Expand Down
109 changes: 109 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,115 @@ fn main() -> anyhow::Result<()> {
})
.collect();

// Multi-part AOF replay layered on top of v2/v3 recovery.
// Priority: if appendonlydir/ manifest exists → load multi-part (skip legacy v2 fallback).
// Otherwise v2 already handled legacy appendonly.aof during restore_from_persistence.
//
// A corrupt manifest is FATAL: overwriting it silently destroys the reference
// to the real base RDB and loses all persisted data.
if config.appendonly == "yes" {
if let Some(ref dir) = persistence_dir {
use anyhow::Context;
use moon::persistence::aof_manifest::AofManifest;
use moon::persistence::replay::DispatchReplayEngine;
let base_dir = std::path::PathBuf::from(dir);
let manifest_opt = AofManifest::load(&base_dir).with_context(|| {
format!(
"AOF manifest at {}/appendonlydir/ is corrupt; refusing to start to avoid data loss. Inspect manually before deleting.",
base_dir.display()
)
})?;
if let Some(ref manifest) = manifest_opt {
if num_shards == 1 {
// Multi-part AOF is authoritative. Wipe any state that earlier
// recovery phases (per-shard WAL replay, legacy appendonly.aof
// fallback inside restore_from_persistence) may have loaded —
// otherwise non-idempotent commands from the incr log would
// double-apply on top of that pre-existing state.
for db in shards[0].databases.iter_mut() {
db.clear();
}
let loaded = moon::persistence::aof_manifest::replay_multi_part(
&mut shards[0].databases,
manifest,
&DispatchReplayEngine,
)
.with_context(|| "multi-part AOF replay failed")?;
info!(
"AOF multi-part loaded (seq {}): {} entries",
manifest.seq, loaded
);

// Retire legacy appendonly.aof so future boots don't double-
// replay it via restore_from_persistence's fallback path.
// Rename (not delete) so an operator can recover if something
// went wrong.
let legacy = base_dir.join("appendonly.aof");
if legacy.exists() {
let retired = base_dir.join("appendonly.aof.legacy");
if let Err(e) = std::fs::rename(&legacy, &retired) {
tracing::warn!(
"Failed to retire legacy AOF {}: {}",
legacy.display(),
e
);
} else {
info!(
"Retired legacy AOF {} → {}",
legacy.display(),
retired.display()
);
}
}
} else {
tracing::warn!(
"Multi-part AOF skipped in multi-shard mode (not yet supported)"
);
}
} else {
// No manifest present — first boot after upgrade from legacy
// single-file AOF (v2 recovery already loaded it above) or
// fresh install.
//
// If restore_from_persistence loaded any state (from WAL or
// legacy appendonly.aof), we MUST capture it as the seq 1
// base RDB. Otherwise on the next boot the multi-part replay
// path would clear the databases and lose the legacy state.
// Only shard 0 is relevant in single-shard mode (the only
// mode the multi-part path currently supports).
let has_state =
num_shards == 1 && shards[0].databases.iter().any(|db| db.len() > 0);
if has_state {
let rdb_bytes = moon::persistence::rdb::save_to_bytes(&shards[0].databases)
.with_context(|| "failed to serialize legacy state for AOF base")?;
AofManifest::initialize_with_base(&base_dir, &rdb_bytes)
.with_context(|| "failed to initialize AOF manifest with base")?;
info!(
"First-upgrade: captured legacy state as AOF base seq 1 ({} bytes)",
rdb_bytes.len()
);
// Retire legacy appendonly.aof — its contents are now in
// the base RDB, and leaving it would cause v2 recovery on
// the next boot to double-replay it.
let legacy = base_dir.join("appendonly.aof");
if legacy.exists() {
let retired = base_dir.join("appendonly.aof.legacy");
if let Err(e) = std::fs::rename(&legacy, &retired) {
tracing::warn!(
"Failed to retire legacy AOF {}: {}",
legacy.display(),
e
);
}
}
} else {
AofManifest::initialize(&base_dir)
.with_context(|| "failed to initialize AOF manifest")?;
}
}
}
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

// Extract databases from all shards and wrap in ShardDatabases
let all_dbs: Vec<Vec<moon::storage::Database>> = shards
.iter_mut()
Expand Down
Loading
Loading