diff --git a/.planning b/.planning index a9cbf36a..854f1a05 160000 --- a/.planning +++ b/.planning @@ -1 +1 @@ -Subproject commit a9cbf36ab1e494c3de19ce8944efe61bc209774f +Subproject commit 854f1a052bfa5048184abdd28fa66b086cbeed88 diff --git a/CHANGELOG.md b/CHANGELOG.md index edcfd8b3..3297276a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -50,6 +50,33 @@ See `docs/guides/pitr.md` for operator usage. See `docs/guides/cdc.md` for consumer integration. +### Added — Embedded sharded server + +- **PR #95** — `server::embedded::run_embedded(config, cancel)` exposes the + full sharded handler (with TXN.* cross-store transactions) to in-process + embedders such as `helios moon-daemon`. The existing `run_with_shutdown` + drives `handler_single`, which deliberately does not implement TXN. + Embedded mode skips TLS, console, cluster bus, admin port, and multi-part + AOF manifest replay; it does include per-shard RDB + WAL recovery, + graph/temporal/workspace/MQ WAL replay, SO_REUSEPORT, NUMA pinning, and + cancel-driven graceful shutdown. + +### Fixed — PR #95 review hardening + +- `main.rs` `malloc_conf` symbol: replaced the union-based unsafe pun with + a `#[repr(transparent)]` `Sync` wrapper around a `c"..."` literal. +- `command/server_admin.rs` `get_vsz_bytes`: replaced four `unsafe` + libc::{open,read,close,sysconf} blocks with safe `/proc/self/status` + parsing on the cold MEMORY DOCTOR path. +- `main.rs` arena scan now uses `env::args_os()` so non-UTF-8 argv no + longer panics before clap reports the error. +- `server/embedded.rs` shutdown sequence: cancel → join shard threads → + drop the outer `aof_tx` → join the AOF thread, so the writer never + exits while shards are still queuing appends. Thread join panics are + now propagated through the function result. +- `storage/db.rs` annotated two `.expect()` calls in `Database::set`'s + `insert_or_update` closures for the hot-path unwrap ratchet. + ### Deferred to v0.2 follow-ups - **P3c** — wire `SnapshotState::set_last_lsn(wal_flush_lsn)` into the live diff --git a/src/command/server_admin.rs b/src/command/server_admin.rs index 2cdfdf58..07398ca1 100644 --- a/src/command/server_admin.rs +++ b/src/command/server_admin.rs @@ -542,34 +542,22 @@ fn allocator_info() -> (String, String) { } /// Read VSZ (virtual memory size) for the current process. +/// +/// Uses safe `/proc/self/status` parsing — `VmSize:` line is the canonical +/// virtual size in KiB. Cold admin path, allocation is fine. #[cfg(target_os = "linux")] fn get_vsz_bytes() -> usize { - // /proc/self/statm field 0 is size in pages. - // SAFETY: same pattern as get_rss_bytes in metrics_setup.rs. - let fd = unsafe { libc::open(c"/proc/self/statm".as_ptr(), libc::O_RDONLY) }; - if fd < 0 { - return 0; - } - let mut buf = [0u8; 128]; - let n = unsafe { libc::read(fd, buf.as_mut_ptr().cast::(), buf.len()) }; - unsafe { libc::close(fd) }; - if n <= 0 { + let Ok(status) = std::fs::read_to_string("/proc/self/status") else { return 0; - } - let s = &buf[..n as usize]; - // Field 0 = size (VSZ in pages). - if let Some(size_field) = s.split(|&b| b == b' ').next() { - let mut pages: u64 = 0; - for &b in size_field { - if b.is_ascii_digit() { - pages = pages * 10 + (b - b'0') as u64; - } - } - // Use the same page_size approach as get_rss_bytes. - let page_size = unsafe { libc::sysconf(libc::_SC_PAGESIZE) } as u64; - return (pages * page_size) as usize; - } - 0 + }; + status + .lines() + .find_map(|line| { + let rest = line.strip_prefix("VmSize:")?; + let kib = rest.split_whitespace().next()?.parse::().ok()?; + kib.checked_mul(1024) + }) + .unwrap_or(0) } #[cfg(target_os = "macos")] diff --git a/src/main.rs b/src/main.rs index 66d9d4c0..8ec268fb 100644 --- a/src/main.rs +++ b/src/main.rs @@ -21,23 +21,24 @@ static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; #[global_allocator] static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc; +/// `#[repr(transparent)]` wrapper around `*const c_char` so we can declare a +/// `Sync` static that exposes the exact `const char *` ABI jemalloc expects +/// for the `_rjem_malloc_conf` symbol. +#[cfg(feature = "jemalloc")] +#[repr(transparent)] +pub struct MallocConfPtr(*const libc::c_char); + +// SAFETY: The pointer is a `'static` C-string literal — immutable for the +// lifetime of the program. jemalloc reads it exactly once during init. +#[cfg(feature = "jemalloc")] +unsafe impl Sync for MallocConfPtr {} + #[cfg(feature = "jemalloc")] #[allow(non_upper_case_globals)] #[unsafe(export_name = "_rjem_malloc_conf")] -pub static malloc_conf: Option<&'static libc::c_char> = { - // SAFETY: The byte string is null-terminated and valid ASCII; reinterpreting - // the first byte's address as *const c_char is safe (same layout). - union U { - x: &'static u8, - y: &'static libc::c_char, - } - Some(unsafe { - U { - x: &b"narenas:8,background_thread:true,metadata_thp:auto,dirty_decay_ms:1000,muzzy_decay_ms:5000,abort_conf:true\0"[0], - } - .y - }) -}; +pub static malloc_conf: MallocConfPtr = MallocConfPtr( + c"narenas:8,background_thread:true,metadata_thp:auto,dirty_decay_ms:1000,muzzy_decay_ms:5000,abort_conf:true".as_ptr(), +); use std::path::PathBuf; @@ -840,17 +841,24 @@ fn maybe_respawn_with_arena_override() -> anyhow::Result<()> { // Lightweight scan of argv for --memory-arenas-cap N or --memory-arenas-cap=N. // We can't use clap here because clap::parse() requires the full struct, and // we need to inject env vars BEFORE jemalloc reads the config. - let args: Vec = env::args().collect(); + // Use args_os() to avoid panicking on non-UTF-8 argv and to preserve the + // original OsString argv for the re-spawn below (CodeRabbit). + use std::ffi::OsString; + use std::os::unix::ffi::OsStrExt; + let args: Vec = env::args_os().collect(); let mut requested: Option = None; let mut i = 1; while i < args.len() { - let a = &args[i]; - if let Some(rest) = a.strip_prefix("--memory-arenas-cap=") { - requested = rest.parse().ok(); + let a = args[i].as_os_str().as_bytes(); + if let Some(rest) = a.strip_prefix(b"--memory-arenas-cap=") { + requested = std::str::from_utf8(rest).ok().and_then(|s| s.parse().ok()); break; } - if a == "--memory-arenas-cap" && i + 1 < args.len() { - requested = args[i + 1].parse().ok(); + if a == b"--memory-arenas-cap" && i + 1 < args.len() { + requested = args[i + 1] + .as_os_str() + .to_str() + .and_then(|s| s.parse().ok()); break; } i += 1; diff --git a/src/server/embedded.rs b/src/server/embedded.rs new file mode 100644 index 00000000..55a2b3fd --- /dev/null +++ b/src/server/embedded.rs @@ -0,0 +1,444 @@ +//! `run_embedded` — minimal sharded boot for in-process Moon embedding. +//! +//! This is the production-equivalent path used by `helios moon-daemon` +//! (and any other embedder that needs the full sharded handler — most +//! importantly the TXN.BEGIN / TXN.COMMIT cross-store transaction wiring +//! implemented only in `handler_sharded.rs`). +//! +//! Reference shape: `tests/txn_kv_wiring.rs::start_txn_server` for the +//! minimal sharded wiring with public APIs, and `src/main.rs` for the +//! AOF / WAL recovery blocks we layer on top. +//! +//! # Why a dedicated helper instead of calling `run_with_shutdown`? +//! +//! `run_with_shutdown` drives `handler_single`, which deliberately does +//! NOT implement TXN (see comments in `conn/handler_single.rs`). Embedders +//! that need transactional KV must take the sharded path. This helper +//! exposes that path with a small surface that hides cluster/console/TLS +//! complexity an embedder does not need. +//! +//! # What is intentionally skipped (vs main.rs) +//! +//! - TLS / `tls_port` (embedders use loopback only) +//! - Console gateway, admin auth, CORS, rate limiting (`console` feature) +//! - Admin port / Prometheus HTTP server +//! - Cluster bus + gossip ticker +//! - SIGHUP TLS reload thread +//! - `--check-config` short-circuit (caller already validated) +//! +//! What IS included: per-shard RDB + WAL recovery (`Shard::restore_from_persistence`), +//! graph/temporal/workspace/MQ WAL replay, per-shard SO_REUSEPORT on Linux, +//! NUMA pinning, and graceful cancel-driven shutdown. +//! +//! What is NOT included even though the operator may have configured it: +//! - Multi-part AOF manifest replay (legacy `appendonlydir/`). The tokio AOF +//! writer only knows the single-file path; see the in-body comment near the +//! `restore_from_persistence` loop for the rationale. +//! - `save` change-count rules: the auto-save task needs a hook into the +//! sharded write path that does not yet exist, so embedded mode logs a +//! warning and skips the timer instead of silently promising snapshots. + +#![cfg(feature = "runtime-tokio")] + +use std::path::PathBuf; +use std::sync::Arc; + +use anyhow::Context; +use parking_lot::RwLock; +use tracing::info; + +use crate::config::ServerConfig; +use crate::persistence::aof::{self, AofMessage, FsyncPolicy}; +use crate::runtime::cancel::CancellationToken; +use crate::runtime::channel; +use crate::runtime::{RuntimeFactoryImpl, traits::RuntimeFactory}; +use crate::server; +use crate::shard::Shard; +use crate::shard::mesh::{CHANNEL_BUFFER_SIZE, ChannelMesh}; +use crate::shard::shared_databases::ShardDatabases; + +/// Run an embedded sharded Moon server until `cancel` is fired. +/// +/// Behaves like the production `main.rs` startup path but with cluster, +/// TLS, console, and admin-port concerns elided. Suitable for in-process +/// embedding (e.g. `helios moon-daemon`). +/// +/// # Arguments +/// * `config` — fully-resolved `ServerConfig` (the caller is responsible +/// for setting `shards >= 1`; if `0`, this fn auto-detects core count). +/// * `cancel` — when fired, the listener exits, AOF flushes via +/// `AofMessage::Shutdown`, and shard threads are joined. +/// +/// # Returns +/// `Ok(())` on clean shutdown. Returns `Err` if the persistence directory +/// is unusable, AOF manifest is corrupt, or a shard thread fails to spawn. +pub async fn run_embedded( + mut config: ServerConfig, + cancel: CancellationToken, +) -> anyhow::Result<()> { + // Validate / create persistence directory up front. + std::fs::create_dir_all(&config.dir).with_context(|| { + format!( + "embedded moon: failed to create persistence directory {:?}", + config.dir + ) + })?; + + // Resolve shard count (`0` => auto-detect core count, matches main.rs). + if config.shards == 0 { + config.shards = std::thread::available_parallelism() + .map(|n| n.get()) + .unwrap_or(4); + } + let num_shards = config.shards; + + info!( + "embedded moon: starting with {} shard(s) on {}:{}", + num_shards, config.bind, config.port + ); + + // One-time global init that the production binary normally performs. + crate::admin::metrics_setup::init_global_slowlog( + config.slowlog_max_len, + config.slowlog_log_slower_than, + ); + crate::vector::distance::init(); + + // Channel mesh for inter-shard messaging. + let mut mesh = ChannelMesh::new(num_shards, CHANNEL_BUFFER_SIZE); + let conn_txs: Vec> = + (0..num_shards).map(|i| mesh.conn_tx(i)).collect(); + let all_notifiers = mesh.all_notifiers(); + + // AOF writer: dedicated std::thread (matches main.rs lifetime model). + // We retain the JoinHandle so shutdown can wait for the writer to finish + // flushing — dropping it would race the process exit and risk losing the + // final fsync (CodeRabbit #1). + let (aof_tx, aof_join): ( + Option>, + Option>, + ) = if config.appendonly == "yes" { + let (tx, rx) = channel::mpsc_bounded::(10_000); + let aof_token = cancel.child_token(); + let fsync = FsyncPolicy::from_str(&config.appendfsync); + let aof_file_path = PathBuf::from(&config.dir).join(&config.appendfilename); + let handle = std::thread::Builder::new() + .name("embedded-moon-aof".to_string()) + .spawn(move || { + RuntimeFactoryImpl::block_on_local( + "embedded-moon-aof".to_string(), + aof::aof_writer_task(rx, aof_file_path, fsync, aof_token), + ); + }) + .context("embedded moon: failed to spawn AOF writer thread")?; + info!("embedded moon: AOF enabled (fsync: {:?})", fsync); + (Some(tx), Some(handle)) + } else { + (None, None) + }; + + let bind_addr = format!("{}:{}", config.bind, config.port); + + // Snapshot trigger watch channel. + let (snap_tx, snap_rx) = channel::watch(0u64); + + // Persistence dir is set whenever any persistence is on. + let persistence_dir: Option = if config.appendonly == "yes" || config.save.is_some() { + Some(config.dir.clone()) + } else { + None + }; + + // Replication state — embedded mode is single-node, but the shard + // event loop still expects a populated state object. + let (repl_id, repl_id2) = + crate::replication::state::load_replication_state(std::path::Path::new(&config.dir)); + let repl_state = Arc::new(std::sync::RwLock::new( + crate::replication::state::ReplicationState::new(num_shards, repl_id, repl_id2), + )); + crate::admin::metrics_setup::set_global_repl_state(repl_state.clone()); + + // ACL table (loads aclfile if configured; default no-op otherwise). + let acl_table: Arc> = Arc::new(std::sync::RwLock::new( + crate::acl::AclTable::load_or_default(&config), + )); + + // Shared runtime + server configs. + let runtime_config_shared: Arc> = + Arc::new(RwLock::new(config.to_runtime_config())); + let server_config_shared: Arc = Arc::new(config.clone()); + + // Per-shard pubsub + remote-subscriber registries. + let all_pubsub_registries: Vec>> = (0..num_shards) + .map(|_| Arc::new(RwLock::new(crate::pubsub::PubSubRegistry::new()))) + .collect(); + let all_remote_sub_maps: Vec< + Arc>, + > = (0..num_shards) + .map(|_| { + Arc::new(RwLock::new( + crate::shard::remote_subscriber_map::RemoteSubscriberMap::new(), + )) + }) + .collect(); + + let affinity_tracker = Arc::new(RwLock::new(crate::shard::affinity::AffinityTracker::new())); + + // Build shards + run pre-loop persistence recovery (RDB / per-shard WAL + // baseline). Disk offload is opt-in. + let disk_offload_base = if config.disk_offload_enabled() { + Some(config.effective_disk_offload_dir()) + } else { + None + }; + let mut shards: Vec = (0..num_shards) + .map(|id| { + let mut shard = Shard::with_initial_keyspace_hint( + id, + num_shards, + config.databases, + config.initial_keyspace_hint, + config.to_runtime_config(), + ); + if let Some(ref dir) = persistence_dir { + shard.restore_from_persistence(dir, disk_offload_base.as_deref()); + } + if let Some(ref offload_base) = disk_offload_base { + let shard_dir = offload_base.join(format!("shard-{}", id)); + for db in &mut shard.databases { + db.cold_shard_dir = Some(shard_dir.clone()); + if db.cold_index.is_none() { + db.cold_index = Some(crate::storage::tiered::cold_index::ColdIndex::new()); + } + } + } + shard + }) + .collect(); + + // NOTE: multi-part AOF (appendonlydir/ manifest) is intentionally NOT used here. + // + // Under `runtime-tokio` the AOF writer (`aof::aof_writer_task`) opens a single + // file at `/` and appends RESP frames directly — it never + // reads `AofManifest` nor advances the `incr` file (that path is `runtime-monoio` + // only; see `src/persistence/aof.rs` cfg gates). If we replayed `base+incr` here + // and then started the tokio writer, persisted writes would land in the legacy + // single-file AOF while the next boot would replay the stale manifest pair — + // silently dropping data (Qodo bug #3). + // + // Embedded recovery therefore relies on the per-shard baseline restore performed + // above (`Shard::restore_from_persistence` → RDB + per-shard WAL) plus the + // auxiliary WAL replay below. When multi-part AOF gains a tokio writer, wire it + // through the manifest's `incr_path()` and re-enable this block. + + // Pull databases out into the shared registry for cross-shard reads. + let all_dbs: Vec> = shards + .iter_mut() + .map(|s| std::mem::take(&mut s.databases)) + .collect(); + let shard_databases = ShardDatabases::new(all_dbs); + + // Auxiliary WAL replay paths (no-op when files do not exist). + #[cfg(feature = "graph")] + if let Some(ref dir) = persistence_dir { + let dir_path = std::path::Path::new(dir); + shard_databases.recover_graph_stores(dir_path); + shard_databases.replay_graph_wal(dir_path); + } + if let Some(ref dir) = persistence_dir { + let dir_path = std::path::Path::new(dir); + shard_databases.replay_temporal_wal(dir_path); + shard_databases.replay_workspace_wal(dir_path); + shard_databases.replay_mq_wal(dir_path); + } + + // Readiness flag — `/readyz` is gated on this; harmless without admin port. + crate::admin::metrics_setup::set_server_ready(); + + // Spawn shard threads. + let mut shard_handles = Vec::with_capacity(num_shards); + let config_port = config.port; + for (id, mut shard) in shards.into_iter().enumerate() { + let producers = mesh.take_producers(id); + let consumers = mesh.take_consumers(id); + let conn_rx = mesh.take_conn_rx(id); + let shard_cancel = cancel.clone(); + let shard_aof_tx = aof_tx.clone(); + let shard_bind_addr = bind_addr.clone(); + let shard_persistence_dir = persistence_dir.clone(); + let shard_snap_rx = snap_rx.clone(); + let shard_snap_tx = snap_tx.clone(); + let shard_repl_state = repl_state.clone(); + let shard_acl_table = acl_table.clone(); + let shard_runtime_config = runtime_config_shared.clone(); + let shard_server_config = server_config_shared.clone(); + let shard_spsc_notify = mesh.take_notify(id); + let shard_all_notifiers = all_notifiers.clone(); + let shard_dbs = shard_databases.clone(); + let shard_pubsub_regs = all_pubsub_registries.clone(); + let shard_remote_sub_maps = all_remote_sub_maps.clone(); + let shard_affinity = affinity_tracker.clone(); + + let handle = std::thread::Builder::new() + .name(format!("embedded-moon-shard-{}", id)) + .spawn(move || { + crate::shard::numa::pin_to_core(id); + RuntimeFactoryImpl::block_on_local( + format!("embedded-moon-shard-{}", id), + async move { + shard + .run( + conn_rx, + None, // tls + consumers, + producers, + shard_cancel, + shard_aof_tx, + Some(shard_bind_addr), + shard_persistence_dir, + shard_snap_rx, + shard_snap_tx, + Some(shard_repl_state), + None, // cluster_state + config_port, + shard_acl_table, + shard_runtime_config, + shard_server_config, + shard_spsc_notify, + shard_all_notifiers, + shard_dbs, + shard_pubsub_regs, + shard_remote_sub_maps, + shard_affinity, + ) + .await; + }, + ); + }) + .context("embedded moon: failed to spawn shard thread")?; + shard_handles.push(handle); + } + + // Auto-save (change-count rules) is intentionally NOT spawned in embedded mode. + // + // `run_auto_save_sharded` only fires when its `change_counter` crosses the + // configured threshold, but the sharded ConnectionContext / write paths do + // not currently take an `Arc` we can wire it through (see + // `src/server/conn/core.rs` and `src/persistence/auto_save.rs`). Spawning + // the task with a counter that no writer can increment would silently + // promise persistence the daemon can never deliver (Qodo bug #5 / + // CodeRabbit autosave finding). Embedders that need periodic snapshots + // should call `BGSAVE` on their own cadence until the sharded write path + // exposes a dirty-tracking hook. + if config.save.is_some() { + tracing::warn!( + "embedded moon: `save` rules configured but change-count auto-save is not wired in embedded mode; ignoring" + ); + } + let _ = snap_tx; // keep the watch sender alive for the shard's snap_rx clones + + // Run the sharded listener until cancelled. + let per_shard_accept = cfg!(target_os = "linux"); + let listener_result = server::listener::run_sharded( + config, + conn_txs, + cancel.clone(), + per_shard_accept, + affinity_tracker, + ) + .await; + + if let Err(e) = &listener_result { + tracing::error!("embedded moon: listener error: {}", e); + } + + // Listener exited (cancel fired or fatal error). Shutdown ordering: + // 1. cancel.cancel() — stops shard accept loops + producers. + // 2. Join shard threads — drops every shard-held `aof_tx` clone. + // 3. Drop our outer `aof_tx` — last sender goes away, the AOF writer's + // `recv_async()` returns `Err(_)` and the task flushes + fsyncs + // before exiting (see `aof::aof_writer_task` Err arm). + // 4. Join the AOF thread. + // + // This sequencing fixes Qodo bug #5: sending `AofMessage::Shutdown` before + // shards exit lets the writer terminate while shards still `try_send` + // appends, dropping the final writes. Relying on channel-close instead of + // an explicit Shutdown also avoids the blocking-send hazard (Qodo bug #4). + cancel.cancel(); + + // Shard + AOF thread joins must run on a blocking thread because they own + // current-thread runtimes (CodeRabbit #1 — without joining, the final + // fsync can race process exit). + let join_outcome = tokio::task::spawn_blocking(move || { + let mut shard_panics: usize = 0; + for handle in shard_handles { + if let Err(payload) = handle.join() { + shard_panics += 1; + tracing::error!( + "embedded moon: shard thread panicked: {}", + panic_message(&payload) + ); + } + } + + // Drop the last AOF sender so the writer's recv loop sees channel close. + drop(aof_tx); + + let aof_panic = if let Some(handle) = aof_join { + match handle.join() { + Ok(()) => false, + Err(payload) => { + tracing::error!( + "embedded moon: AOF writer thread panicked: {}", + panic_message(&payload) + ); + true + } + } + } else { + false + }; + + (shard_panics, aof_panic) + }) + .await; + + let result = match (join_outcome, listener_result) { + (Err(e), listener) => { + tracing::warn!("embedded moon: shard-join task failed: {}", e); + listener + } + (Ok((shard_panics, aof_panic)), listener) => { + if shard_panics > 0 || aof_panic { + // Promote panics to a typed shutdown error. We still surface + // the listener result if it was an error; otherwise return a + // synthetic error so callers know shutdown was incomplete. + listener.and_then(|()| { + Err(std::io::Error::other(format!( + "embedded moon: shutdown incomplete ({} shard panic(s), aof_panic={})", + shard_panics, aof_panic + )) + .into()) + }) + } else { + listener + } + } + }; + + if result.is_ok() { + info!("embedded moon: shut down cleanly"); + } + result +} + +/// Best-effort stringification of a `JoinHandle::join()` panic payload. +fn panic_message<'a>(payload: &'a Box) -> &'a str { + if let Some(s) = payload.downcast_ref::<&'static str>() { + s + } else if let Some(s) = payload.downcast_ref::() { + s.as_str() + } else { + "" + } +} diff --git a/src/server/mod.rs b/src/server/mod.rs index 0c70972a..2ef4eadf 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -1,6 +1,8 @@ pub mod codec; pub mod conn; pub mod conn_state; +#[cfg(feature = "runtime-tokio")] +pub mod embedded; pub mod expiration; pub mod listener; pub mod response_slot; diff --git a/src/storage/db.rs b/src/storage/db.rs index 289b987d..b45623e1 100644 --- a/src/storage/db.rs +++ b/src/storage/db.rs @@ -259,6 +259,10 @@ impl Database { // Exactly one closure runs (FnOnce), so the take() is safe. let entry_cell = std::cell::Cell::new(Some(entry)); + // `insert_or_update` invariant: exactly one of the two closures fires + // exactly once per call, so the `Cell::take()` below cannot observe + // a None value. Annotated for the hot-path unwrap ratchet. + #[allow(clippy::expect_used)] let result = self.data.insert_or_update( CompactKey::from(key.clone()), // Bytes::clone is a refcount bump, not deep copy |existing: &mut Entry| { diff --git a/src/storage/eviction.rs b/src/storage/eviction.rs index 871d38fd..36648580 100644 --- a/src/storage/eviction.rs +++ b/src/storage/eviction.rs @@ -850,6 +850,12 @@ mod tests { #[test] fn test_volatile_ttl_evicts_soonest() { + // `sample_random_keys` reservoir-samples across DashTable segments + // with a non-deterministic RNG, so over a tiny 2-key population a + // single eviction round can fail to sample `soon` and pick `later` + // instead. Same flake pattern as `test_lru_evicts_oldest`: drive + // eviction in a bounded loop so `soon` is guaranteed to be sampled + // (worst case once the population shrinks to one key). let mut db = Database::new(); let now_ms = current_time_ms(); db.set_string_with_expiry( @@ -863,10 +869,32 @@ mod tests { now_ms + 3_600_000, ); - let result = evict_one_volatile_ttl(&mut db, 5); - assert!(result); - assert_eq!(db.len(), 1); - assert!(db.data().get(b"soon" as &[u8]).is_none()); + let mut evictions = 0; + for _ in 0..50 { + if db.data().get(b"soon" as &[u8]).is_none() { + break; + } + if !evict_one_volatile_ttl(&mut db, 5) { + break; + } + evictions += 1; + // Re-insert the wrongly-evicted `later` so we always have two + // candidates until the algorithm picks `soon`. This proves the + // sampler eventually selects the soonest-expiring key. + if db.data().get(b"later" as &[u8]).is_none() + && db.data().get(b"soon" as &[u8]).is_some() + { + db.set_string_with_expiry( + Bytes::from_static(b"later"), + Bytes::from_static(b"v"), + now_ms + 3_600_000, + ); + } + } + assert!( + db.data().get(b"soon" as &[u8]).is_none(), + "expected `soon` evicted within bounded rounds (evictions={evictions})" + ); } #[test]