Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,20 @@ See `docs/guides/cdc.md` for consumer integration.
graph/temporal/workspace/MQ WAL replay, SO_REUSEPORT, NUMA pinning, and
cancel-driven graceful shutdown.

### Fixed — PR #96 test deflake + tokio AOF replay

- `main.rs` AOF recovery: gated the multi-part AOF manifest replay block to
`#[cfg(feature = "runtime-monoio")]`. Under tokio, the legacy single-file
`appendonly.aof` is loaded via the v2 recovery chain; the multi-part loader
no longer creates an empty manifest at first boot that wiped v2-loaded
state on the next restart (every tokio SET was lost on restart).
- `tests/txn_kv_wiring.rs`: made `test_txn_commit_wal_crash_recovery`
runtime-agnostic by polling for either the monoio multi-part `.base.rdb`
artifact or the tokio single-file `appendonly.aof`. Added
`connect_redis_with_retry` helper to bound and retry the post-bind RESP
handshake (was racing the shard accept loop, surfacing as EAGAIN on Linux
and ECONNRESET on macOS CI).

### Fixed — PR #95 review hardening

- `main.rs` `malloc_conf` symbol: replaced the union-based unsafe pun with
Expand Down
27 changes: 27 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,14 @@ fn main() -> anyhow::Result<()> {
//
// A corrupt manifest is FATAL: overwriting it silently destroys the reference
// to the real base RDB and loses all persisted data.
//
// Gated to runtime-monoio: BGREWRITEAOF under runtime-tokio writes a
// single-file appendonly.aof with RDB preamble (legacy v2 format) and
// never advances the manifest. Engaging this block under tokio creates an
// empty manifest at first boot, then on the next boot wipes v2-loaded
// state because the multi-part replay finds no base RDB. This caused the
// tokio TXN replay regression that surfaced via test_txn_commit_wal_crash_recovery.
#[cfg(feature = "runtime-monoio")]
if config.appendonly == "yes"
&& let Some(ref dir) = persistence_dir
{
Expand Down Expand Up @@ -523,6 +531,25 @@ fn main() -> anyhow::Result<()> {
}
}

// Under tokio, multi-part AOF is not supported. If a manifest exists, the
// operator likely switched from monoio — warn so they don't think their
// data is silently corrupted. v2 recovery (single-file appendonly.aof)
// remains active.
#[cfg(not(feature = "runtime-monoio"))]
if config.appendonly == "yes"
&& let Some(ref dir) = persistence_dir
{
let manifest_path = std::path::PathBuf::from(dir).join("appendonlydir/moon.aof.manifest");
if manifest_path.exists() {
tracing::warn!(
"multi-part AOF manifest found at {} but runtime is tokio; ignoring. \
Switch to monoio (cargo run --no-default-features --features runtime-monoio,jemalloc) \
to load multi-part AOF data.",
manifest_path.display()
);
}
}

// Extract databases from all shards and wrap in ShardDatabases
let all_dbs: Vec<Vec<moon::storage::Database>> = shards
.iter_mut()
Expand Down
59 changes: 49 additions & 10 deletions tests/txn_kv_wiring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -878,6 +878,34 @@ fn find_moon_binary() -> Option<std::path::PathBuf> {
None
}

/// Open a redis-rs sync connection, retrying through the post-bind handshake race.
///
/// `wait_for_server` only proves TCP bind. The shard accept loop and RESP handler
/// can lag the bind by a small window, during which the first RESP exchange may
/// fail with EAGAIN (Linux) or ECONNRESET (macOS). Retry with short backoff
/// instead of letting a single fast attempt panic.
fn connect_redis_with_retry(client: &redis::Client, label: &str) -> redis::Connection {
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(15);
#[allow(unused_assignments)]
let mut last_err: Option<redis::RedisError> = None;
loop {
match client.get_connection_with_timeout(std::time::Duration::from_secs(1)) {
Ok(conn) => return conn,
Err(e) => last_err = Some(e),
}
if std::time::Instant::now() >= deadline {
panic!(
"connect to {label} server failed after 15s: {}",
last_err
.as_ref()
.map(|e| e.to_string())
.unwrap_or_else(|| "no error captured".into())
);
}
std::thread::sleep(std::time::Duration::from_millis(200));
}
}

/// Bind a free OS port, drop the listener, and return the port number.
fn free_port() -> u16 {
// SAFETY: bind + drop releases the port; the server will re-bind it.
Expand Down Expand Up @@ -944,7 +972,7 @@ async fn test_txn_commit_wal_crash_recovery() {

// Connect with sync redis client to avoid holding an async runtime across process boundaries.
let client1 = redis::Client::open(format!("redis://127.0.0.1:{port1}")).unwrap();
let mut sync_conn1 = client1.get_connection().expect("connect to phase-1 server");
let mut sync_conn1 = connect_redis_with_retry(&client1, "phase-1");

// Non-TXN baseline (verifies plain AOF replay too)
let _: String = redis::cmd("SET")
Expand Down Expand Up @@ -980,13 +1008,20 @@ async fn test_txn_commit_wal_crash_recovery() {
"BGREWRITEAOF should start background rewrite: {bgrw}"
);

// Wait for BGREWRITEAOF to complete by polling for the base RDB file.
// The file is named moon.aof.<seq>.base.rdb inside the appendonlydir.
// Wait for BGREWRITEAOF to produce a non-empty AOF artifact.
//
// Under runtime-monoio multi-part AOF, this is
// `appendonlydir/moon.aof.<seq>.base.rdb`.
// Under runtime-tokio single-file AOF, this is `appendonly.aof`
// (the rewrite re-writes it with an RDB preamble in-place).
//
// Poll for either so the test stays runtime-agnostic.
let aof_dir = tmp_dir.join("appendonlydir");
let base_rdb_exists = {
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(10);
let single_file_aof = tmp_dir.join("appendonly.aof");
let aof_artifact_ready = {
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(15);
loop {
let found = std::fs::read_dir(&aof_dir)
let base_rdb = std::fs::read_dir(&aof_dir)
.ok()
.and_then(|mut d| {
d.find(|e| {
Expand All @@ -998,7 +1033,10 @@ async fn test_txn_commit_wal_crash_recovery() {
})
})
.is_some();
if found {
let single = std::fs::metadata(&single_file_aof)
.map(|m| m.len() > 0)
.unwrap_or(false);
if base_rdb || single {
break true;
}
if std::time::Instant::now() >= deadline {
Expand All @@ -1008,8 +1046,9 @@ async fn test_txn_commit_wal_crash_recovery() {
}
};
assert!(
base_rdb_exists,
"BGREWRITEAOF did not create a base RDB file within 10s in {aof_dir:?}"
aof_artifact_ready,
"BGREWRITEAOF did not produce an AOF artifact within 15s \
(checked multi-part {aof_dir:?} and single-file {single_file_aof:?})"
);

// Kill server 1.
Expand Down Expand Up @@ -1041,7 +1080,7 @@ async fn test_txn_commit_wal_crash_recovery() {
);

let client2 = redis::Client::open(format!("redis://127.0.0.1:{port2}")).unwrap();
let mut sync_conn2 = client2.get_connection().expect("connect to phase-2 server");
let mut sync_conn2 = connect_redis_with_retry(&client2, "phase-2");

// Verify TXN committed value survived server restart via WAL replay.
let recovered: Option<String> = redis::cmd("GET")
Expand Down
Loading