diff --git a/CHANGELOG.md b/CHANGELOG.md index 3297276a..98c25e16 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -61,6 +61,20 @@ See `docs/guides/cdc.md` for consumer integration. graph/temporal/workspace/MQ WAL replay, SO_REUSEPORT, NUMA pinning, and cancel-driven graceful shutdown. +### Fixed — PR #96 test deflake + tokio AOF replay + +- `main.rs` AOF recovery: gated the multi-part AOF manifest replay block to + `#[cfg(feature = "runtime-monoio")]`. Under tokio, the legacy single-file + `appendonly.aof` is loaded via the v2 recovery chain; the multi-part loader + no longer creates an empty manifest at first boot that wiped v2-loaded + state on the next restart (every tokio SET was lost on restart). +- `tests/txn_kv_wiring.rs`: made `test_txn_commit_wal_crash_recovery` + runtime-agnostic by polling for either the monoio multi-part `.base.rdb` + artifact or the tokio single-file `appendonly.aof`. Added + `connect_redis_with_retry` helper to bound and retry the post-bind RESP + handshake (was racing the shard accept loop, surfacing as EAGAIN on Linux + and ECONNRESET on macOS CI). + ### Fixed — PR #95 review hardening - `main.rs` `malloc_conf` symbol: replaced the union-based unsafe pun with diff --git a/src/main.rs b/src/main.rs index 8ec268fb..18c9990f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -431,6 +431,14 @@ fn main() -> anyhow::Result<()> { // // A corrupt manifest is FATAL: overwriting it silently destroys the reference // to the real base RDB and loses all persisted data. + // + // Gated to runtime-monoio: BGREWRITEAOF under runtime-tokio writes a + // single-file appendonly.aof with RDB preamble (legacy v2 format) and + // never advances the manifest. Engaging this block under tokio creates an + // empty manifest at first boot, then on the next boot wipes v2-loaded + // state because the multi-part replay finds no base RDB. This caused the + // tokio TXN replay regression that surfaced via test_txn_commit_wal_crash_recovery. + #[cfg(feature = "runtime-monoio")] if config.appendonly == "yes" && let Some(ref dir) = persistence_dir { @@ -523,6 +531,25 @@ fn main() -> anyhow::Result<()> { } } + // Under tokio, multi-part AOF is not supported. If a manifest exists, the + // operator likely switched from monoio — warn so they don't think their + // data is silently corrupted. v2 recovery (single-file appendonly.aof) + // remains active. + #[cfg(not(feature = "runtime-monoio"))] + if config.appendonly == "yes" + && let Some(ref dir) = persistence_dir + { + let manifest_path = std::path::PathBuf::from(dir).join("appendonlydir/moon.aof.manifest"); + if manifest_path.exists() { + tracing::warn!( + "multi-part AOF manifest found at {} but runtime is tokio; ignoring. \ + Switch to monoio (cargo run --no-default-features --features runtime-monoio,jemalloc) \ + to load multi-part AOF data.", + manifest_path.display() + ); + } + } + // Extract databases from all shards and wrap in ShardDatabases let all_dbs: Vec> = shards .iter_mut() diff --git a/tests/txn_kv_wiring.rs b/tests/txn_kv_wiring.rs index e26574e4..2de881c8 100644 --- a/tests/txn_kv_wiring.rs +++ b/tests/txn_kv_wiring.rs @@ -878,6 +878,34 @@ fn find_moon_binary() -> Option { None } +/// Open a redis-rs sync connection, retrying through the post-bind handshake race. +/// +/// `wait_for_server` only proves TCP bind. The shard accept loop and RESP handler +/// can lag the bind by a small window, during which the first RESP exchange may +/// fail with EAGAIN (Linux) or ECONNRESET (macOS). Retry with short backoff +/// instead of letting a single fast attempt panic. +fn connect_redis_with_retry(client: &redis::Client, label: &str) -> redis::Connection { + let deadline = std::time::Instant::now() + std::time::Duration::from_secs(15); + #[allow(unused_assignments)] + let mut last_err: Option = None; + loop { + match client.get_connection_with_timeout(std::time::Duration::from_secs(1)) { + Ok(conn) => return conn, + Err(e) => last_err = Some(e), + } + if std::time::Instant::now() >= deadline { + panic!( + "connect to {label} server failed after 15s: {}", + last_err + .as_ref() + .map(|e| e.to_string()) + .unwrap_or_else(|| "no error captured".into()) + ); + } + std::thread::sleep(std::time::Duration::from_millis(200)); + } +} + /// Bind a free OS port, drop the listener, and return the port number. fn free_port() -> u16 { // SAFETY: bind + drop releases the port; the server will re-bind it. @@ -944,7 +972,7 @@ async fn test_txn_commit_wal_crash_recovery() { // Connect with sync redis client to avoid holding an async runtime across process boundaries. let client1 = redis::Client::open(format!("redis://127.0.0.1:{port1}")).unwrap(); - let mut sync_conn1 = client1.get_connection().expect("connect to phase-1 server"); + let mut sync_conn1 = connect_redis_with_retry(&client1, "phase-1"); // Non-TXN baseline (verifies plain AOF replay too) let _: String = redis::cmd("SET") @@ -980,13 +1008,20 @@ async fn test_txn_commit_wal_crash_recovery() { "BGREWRITEAOF should start background rewrite: {bgrw}" ); - // Wait for BGREWRITEAOF to complete by polling for the base RDB file. - // The file is named moon.aof..base.rdb inside the appendonlydir. + // Wait for BGREWRITEAOF to produce a non-empty AOF artifact. + // + // Under runtime-monoio multi-part AOF, this is + // `appendonlydir/moon.aof..base.rdb`. + // Under runtime-tokio single-file AOF, this is `appendonly.aof` + // (the rewrite re-writes it with an RDB preamble in-place). + // + // Poll for either so the test stays runtime-agnostic. let aof_dir = tmp_dir.join("appendonlydir"); - let base_rdb_exists = { - let deadline = std::time::Instant::now() + std::time::Duration::from_secs(10); + let single_file_aof = tmp_dir.join("appendonly.aof"); + let aof_artifact_ready = { + let deadline = std::time::Instant::now() + std::time::Duration::from_secs(15); loop { - let found = std::fs::read_dir(&aof_dir) + let base_rdb = std::fs::read_dir(&aof_dir) .ok() .and_then(|mut d| { d.find(|e| { @@ -998,7 +1033,10 @@ async fn test_txn_commit_wal_crash_recovery() { }) }) .is_some(); - if found { + let single = std::fs::metadata(&single_file_aof) + .map(|m| m.len() > 0) + .unwrap_or(false); + if base_rdb || single { break true; } if std::time::Instant::now() >= deadline { @@ -1008,8 +1046,9 @@ async fn test_txn_commit_wal_crash_recovery() { } }; assert!( - base_rdb_exists, - "BGREWRITEAOF did not create a base RDB file within 10s in {aof_dir:?}" + aof_artifact_ready, + "BGREWRITEAOF did not produce an AOF artifact within 15s \ + (checked multi-part {aof_dir:?} and single-file {single_file_aof:?})" ); // Kill server 1. @@ -1041,7 +1080,7 @@ async fn test_txn_commit_wal_crash_recovery() { ); let client2 = redis::Client::open(format!("redis://127.0.0.1:{port2}")).unwrap(); - let mut sync_conn2 = client2.get_connection().expect("connect to phase-2 server"); + let mut sync_conn2 = connect_redis_with_retry(&client2, "phase-2"); // Verify TXN committed value survived server restart via WAL replay. let recovered: Option = redis::cmd("GET")