Skip to content

fix(moon): add run_embedded helper for sharded in-process embedding#95

Open
TinDang97 wants to merge 13 commits into
mainfrom
fix/run-embedded-sharded
Open

fix(moon): add run_embedded helper for sharded in-process embedding#95
TinDang97 wants to merge 13 commits into
mainfrom
fix/run-embedded-sharded

Conversation

@TinDang97
Copy link
Copy Markdown
Collaborator

@TinDang97 TinDang97 commented May 16, 2026

Summary

  • Adds pub async fn server::embedded::run_embedded(config, cancel) behind runtime-tokio — the first public entry point to Moon's sharded handler for in-process embedders.
  • Existing server::listener::run_with_shutdown drives handler_single, which deliberately does NOT implement cross-store TXN. Embedders that need transactional KV (e.g. anything calling lunaris-storage-moon::atomic_write) had no public path to the sharded handler — main.rs inlined the entire boot sequence.
  • The helper mirrors main.rs's shard wiring (ChannelMesh, per-shard pubsub/remote-sub registries, affinity tracker, shard threads with NUMA pinning), pulls AOF replay + manifest initialization for appendonly=yes, replays graph/temporal/workspace/MQ WALs when persistence is on, and shuts down cleanly on cancel.
  • Scope deliberately minimal: no TLS, no console/admin port, no cluster bus/gossip, no SIGHUP TLS reload. Embedders that need those should keep using main.rs.

Motivation

Helios's embedded daemon (helios moon-daemon) was calling run_with_shutdown and hitting unknown command 'TXN' on every memories.remember write. Verified empirically: LunarisStore::remember against this new run_embedded path now returns a valid LSN and persists episode + chunk + vector index keys.

Test plan

  • cargo build --no-default-features --features runtime-tokio,graph,text-index green
  • Wire smoke from a downstream consumer (Helios) — LunarisStore::remember round-trips successfully against an embedded daemon driven by run_embedded
  • No regression on existing runtime-tokio test gate (note: there are 2 pre-existing E0308 errors in src/shard/mod.rs:383,437 on parent b24b4d0 — this PR is additive and does not touch that file)

Summary by CodeRabbit

  • New Features

    • Embedded multi-shard server mode: in-process shard startup, persistence recovery, replication/ACL loading, snapshot handling, and optional AOF persistence.
    • Safer process respawn handling (robust non-UTF8 argv parsing).
  • Bug Fixes / Behavior

    • More accurate MEMORY DOCTOR VSZ reporting on Linux.
    • Improved embedded shutdown ordering and readiness signaling.
  • Tests

    • Harder server readiness check in integration tests (expects real PONG).

Review Change Stack

The existing `server::listener::run_with_shutdown` drives `handler_single`,
which deliberately does NOT implement cross-store TXN. Embedders that need
transactional KV semantics (Helios's memory engine, anything backed by
`lunaris-storage-moon::atomic_write`) had no public entry point to the
sharded handler — `main.rs` inlined the entire boot sequence.

This change extracts a minimal `pub async fn server::embedded::run_embedded`
behind the `runtime-tokio` feature gate. The helper mirrors `main.rs`'s
shard wiring (ChannelMesh, per-shard pubsub/remote-sub registries, affinity
tracker, shard threads with NUMA pinning), pulls AOF replay + manifest
initialization for `appendonly=yes`, replays graph/temporal/workspace/MQ
WALs when persistence is on, and shuts down cleanly on cancel (flushes AOF
via `AofMessage::Shutdown`, joins shard threads via `spawn_blocking`).

Intentionally elided vs `main.rs`: TLS, console gateway, admin auth/CORS,
admin HTTP port, cluster bus + gossip, SIGHUP TLS reload, `--check-config`.
Loopback-only embedders (the only intended consumer) need none of these.

Verification:
  - `cargo check --no-default-features --features runtime-tokio,graph,text-index` green.
  - Helios's new `txn_roundtrip` test (in `helios-moon-embedded`) drives
    TXN.BEGIN / SET / TXN.COMMIT over the wire against an in-process
    daemon backed by this helper — passes in 0.4s.
  - Pre-existing sharded tests in `vendor/moon/tests/txn_kv_wiring.rs` are
    unchanged and continue to use `server::listener::run_sharded` directly.

Rollback: revert this commit; the change is purely additive (new file +
two-line `pub mod embedded;` registration), so no consumer breaks.

Related: helios-mono `.planning/memory-engine-v2/W1-HOTFIX-TXN-CONTEXT.md`.

author: Tin Dang
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 16, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Adds a feature-gated embedded sharded server entrypoint (run_embedded), changes jemalloc config symbol/respawn arg parsing, and updates Linux VSZ collection for MEMORY DOCTOR.

Changes

Embedded Sharded Server Startup

Layer / File(s) Summary
Module docs, prologue, and feature export
src/server/embedded.rs, src/server/mod.rs
Adds run_embedded with module docs and prologue. Validates persistence directory, resolves config.shards when 0, runs global init, builds ChannelMesh, and conditionally spawns the embedded AOF writer. The embedded module is feature-gated behind runtime-tokio.
Snapshot/replication plumbing and shard wiring
src/server/embedded.rs
Sets up snapshot watch channels and persistence enablement, loads and registers replication state and ACL table, builds shared runtime/server registries, constructs per-shard Shard instances (optional cold storage wiring), runs baseline per-shard restore, and aggregates ShardDatabases.
Readiness, shard threads, listener & shutdown
src/server/embedded.rs
Marks server ready, spawns one OS thread per shard (NUMA pinning) to run shard async loops, warns and disables sharded auto-save in embedded mode, runs server::listener::run_sharded until cancel, coordinates shutdown, joins shard threads, flushes/joins AOF writer, and captures join panic messages.

Main / jemalloc

Layer / File(s) Summary
Malloc config symbol and respawn arg parsing
src/main.rs
Exports jemalloc configuration as a #[repr(transparent)] pointer wrapper (MallocConfPtr) and updates maybe_respawn_with_arena_override() to use env::args_os() and byte-oriented OsStr parsing to detect --memory-arenas-cap for possible respawn.

Memory Doctor / OS metrics

Layer / File(s) Summary
VSZ retrieval change
src/command/server_admin.rs
On Linux, get_vsz_bytes() now reads /proc/self/status and parses the VmSize: line (KiB → bytes). Non-Linux and error fallbacks return 0.

Sequence Diagram(s)

sequenceDiagram
  participant RunEmbedded
  participant AOFWriter
  participant ShardThread
  participant Listener
  RunEmbedded->>AOFWriter: spawn AOF writer thread (if appendonly == "yes")
  RunEmbedded->>ShardThread: spawn per-shard OS threads (NUMA pinning)
  RunEmbedded->>Listener: start server::listener::run_sharded
  Listener->>RunEmbedded: returns on cancel or error
  RunEmbedded->>ShardThread: signal shutdown / join threads
  RunEmbedded->>AOFWriter: drop aof_tx to trigger flush/fsync and exit
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Poem

🐰 I hopped through threads and channels wide,
Restored the fields where data hide,
Jemalloc learned to wear a better hat,
The doctor peeks where VmSize sat,
Shards now wake and sleep in stride.

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Title check ✅ Passed The title 'fix(moon): add run_embedded helper for sharded in-process embedding' directly and specifically describes the main change—adding a new public async function for embedded sharded server support. It is clear, concise, and accurately reflects the primary objective of the PR.
Description check ✅ Passed The PR description provides a clear summary, motivation, test plan, and scope boundaries. However, it does not include all sections from the template: the Checklist (fmt, clippy, test, consistency) and Performance Impact sections are missing, though Notes is implicitly covered.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch fix/run-embedded-sharded

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@qodo-code-review
Copy link
Copy Markdown

qodo-code-review Bot commented May 16, 2026

Review Summary by Qodo

(Agentic_describe updated until commit f64ee36)

Add embedded sharded server API and enhance memory diagnostics with arena tuning

✨ Enhancement 🐞 Bug fix

Grey Divider

Walkthroughs

Description
• Add run_embedded public API for sharded in-process Moon embedding with full TXN support
  - Mirrors main.rs shard wiring (ChannelMesh, pubsub registries, affinity tracking, NUMA pinning)
  - Includes AOF replay, graph/temporal/workspace/MQ WAL recovery, graceful cancel-driven shutdown
• Enhance MEMORY DOCTOR with detailed multi-line health reports including per-subsystem breakdown
  - Reports RSS, VSZ, allocator info, per-kind memory usage with percentages and recommendations
• Add jemalloc arena override via --memory-arenas-cap with automatic process re-spawn
  - Compile-time guard against incompatible allocator features; runtime warning for non-jemalloc
  builds
  - Fixes baked-in narenas:8 default by re-executing with _RJEM_MALLOC_CONF environment variable
Diagram
flowchart LR
  A["ServerConfig + CancellationToken"] -->|run_embedded| B["Sharded Boot"]
  B -->|ChannelMesh| C["Per-Shard Threads"]
  B -->|AOF + WAL| D["Persistence Recovery"]
  C -->|TXN.BEGIN/COMMIT| E["Cross-Store Transactions"]
  F["MEMORY DOCTOR"] -->|per-subsystem| G["Health Report"]
  H["--memory-arenas-cap N"] -->|re-spawn| I["jemalloc narenas override"]
Loading

Grey Divider

File Changes

1. src/server/embedded.rs ✨ Enhancement +378/-0

New embedded sharded server entry point with full TXN support

• New 378-line module implementing pub async fn run_embedded() for sharded in-process embedding
• Handles ChannelMesh setup, per-shard pubsub/remote-sub registries, affinity tracking, NUMA pinning
• Manages AOF writer thread lifecycle, persistence recovery (RDB + per-shard WAL), and graceful
 shutdown
• Intentionally excludes TLS, console gateway, cluster bus, and admin port (loopback-only embedders)

src/server/embedded.rs


2. src/server/mod.rs ✨ Enhancement +2/-0

Register embedded module with feature gate

• Register new embedded module behind runtime-tokio feature gate
• Two-line addition: conditional module declaration and public export

src/server/mod.rs


3. src/command/server_admin.rs ✨ Enhancement +283/-3

Enhance MEMORY DOCTOR with detailed multi-line diagnostics

• Replace single-line memory_doctor() response with comprehensive multi-line health report
• Add per-subsystem resident byte tracking (DashTable, HNSW, CSR, WAL, sealed segments, replication
 backlog)
• Include allocator metadata (name, arena count), VSZ-to-RSS ratio analysis, and actionable
 recommendations
• Add helper functions: humanize_bytes(), pct(), chrono_iso8601_now(), days_to_ymd(),
 replication_backlog_bytes(), allocator_info(), get_vsz_bytes() (Linux/macOS platform-specific)

src/command/server_admin.rs


View more (1)
4. src/main.rs ✨ Enhancement +132/-2

Add jemalloc arena tuning via process re-spawn and allocator validation

• Add compile-time mutual-exclusion check for jemalloc and mimalloc-alt features
• Change jemalloc symbol export from malloc_conf to _rjem_malloc_conf (tikv-jemalloc prefix)
 with narenas:8 default
• Add maybe_respawn_with_arena_override() function to re-execute process with _RJEM_MALLOC_CONF
 when --memory-arenas-cap N differs from default
• Add runtime warning when --memory-arenas-cap is used on non-jemalloc builds
• Register global ShardDatabases for MEMORY DOCTOR and Prometheus per-kind gauges after server ready

src/main.rs


Grey Divider

Qodo Logo

@qodo-code-review
Copy link
Copy Markdown

qodo-code-review Bot commented May 16, 2026

Code Review by Qodo

🐞 Bugs (1) 📘 Rule violations (3) 📎 Requirement gaps (0)

Grey Divider


Action required

1. memory_doctor uses format! 📘 Rule violation ➹ Performance
Description
New MEMORY DOCTOR formatting uses format!() and to_string() within src/command/, which the
checklist treats as hot-path code and disallows due to avoidable heap allocations and conversions.
This can degrade throughput if the command is invoked frequently or during incident response when
the server is under load.
Code

src/command/server_admin.rs[R361-367]

+    // ── VSZ ratio recommendation ─────────────────────────────────────────
+    let vsz_ratio = if rss > 0 { vsz / rss } else { 0 };
+    let vsz_recommendation = if vsz_ratio > 100 {
+        format!("VSZ-vs-RSS ratio is {vsz_ratio}x (high -- consider --memory-arenas-cap 8)")
+    } else {
+        format!("VSZ-vs-RSS ratio is {vsz_ratio}x (normal)")
+    };
Evidence
PR Compliance ID 3 forbids format!() and to_string() in src/command/. The new MEMORY DOCTOR
implementation introduces multiple such calls (e.g., format!("VSZ-vs-RSS ratio...") and
.to_string() in allocator reporting).

CLAUDE.md
src/command/server_admin.rs[361-367]
src/command/server_admin.rs[457-467]
src/command/server_admin.rs[533-536]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

## Issue description
`src/command/server_admin.rs` adds new `format!()` and `to_string()` usage in `MEMORY DOCTOR` output construction. The compliance policy forbids these allocation/conversion patterns inside `src/command/`.

## Issue Context
Even if `MEMORY DOCTOR` is intended as a “cold path”, the checklist treats all of `src/command/` as hot-path sensitive.

## Fix Focus Areas
- src/command/server_admin.rs[361-367]
- src/command/server_admin.rs[450-468]
- src/command/server_admin.rs[479-498]
- src/command/server_admin.rs[526-542]

## Implementation notes
- Prefer writing directly into the pre-allocated `out` buffer via `write!`/`writeln!` instead of building intermediate `String`s with `format!`.
- Replace helpers returning `String` (e.g., `humanize_bytes`, `chrono_iso8601_now`, `allocator_info`) with helpers that write into an existing buffer or return borrowed/static data where possible.
- Avoid `.to_string()` in favor of formatting directly into `out` or using lightweight integer formatting (e.g., `itoa`) if already available in the codebase.

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools


2. Missing arenas-cap field ✓ Resolved 🐞 Bug ≡ Correctness
Description
main.rs reads config.memory_arenas_cap, but ServerConfig has no such field, so the binary will
not compile and the --memory-arenas-cap behavior cannot work.
Code

src/main.rs[R72-79]

+    // Non-jemalloc builds: warn if operator explicitly set --memory-arenas-cap
+    #[cfg(not(feature = "jemalloc"))]
+    if config.memory_arenas_cap != 8 {
+        tracing::warn!(
+            "--memory-arenas-cap={} is a no-op for non-jemalloc builds",
+            config.memory_arenas_cap
+        );
+    }
Evidence
The new code reads config.memory_arenas_cap, but the ServerConfig struct definition ends without
such a field, so this is an unknown-field access and will fail to compile.

src/main.rs[70-79]
src/config.rs[233-263]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

### Issue description
`src/main.rs` references `ServerConfig::memory_arenas_cap`, but `ServerConfig` does not define that field. This is a compile-time error and also means `--memory-arenas-cap` cannot be parsed/applied.

### Issue Context
The PR adds allocator/arena-cap wiring in `main.rs`, but the CLI config struct was not updated accordingly.

### Fix Focus Areas
- src/config.rs[1-263]
- src/main.rs[72-79]

### Expected fix
- Add `pub memory_arenas_cap: u32` (or usize) to `ServerConfig` with a clap `#[arg(long = "memory-arenas-cap", default_value_t = 8)]`.
- (Optional but recommended) Validate the range (e.g., >= 1) to avoid generating invalid jemalloc configs.
- Ensure any downstream use of the flag compiles and behaves consistently.

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools


3. AOF shutdown races appends ✓ Resolved 🐞 Bug ☼ Reliability
Description
run_embedded cancels and immediately sends AofMessage::Shutdown, but the tokio AOF writer stops
on Shutdown without draining later appends; shard write paths may still try_send(Append) while
winding down, so some final writes can be dropped during shutdown.
Code

src/server/embedded.rs[R347-356]

+    // Listener exited (cancel fired or fatal error). Cancel producers first so
+    // they stop enqueueing AOF appends, then flush the writer with an async
+    // shutdown send (Qodo bug #4 — the bounded flume channel's `send` is
+    // blocking and would stall the runtime thread if the queue is full).
+    cancel.cancel();
+    if let Some(tx) = aof_tx {
+        if let Err(e) = tx.send_async(AofMessage::Shutdown).await {
+            tracing::warn!("embedded moon: AOF shutdown send failed: {}", e);
+        }
+    }
Evidence
The embedded shutdown sequence sends Shutdown right after cancel, while shard threads are still
running. The tokio AOF writer breaks on Shutdown, and sharded writes use try_send(Append)
without handling failures, so appends after shutdown can be lost.

src/server/embedded.rs[347-369]
src/persistence/aof.rs[339-344]
src/server/conn/handler_sharded/mod.rs[1094-1097]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

### Issue description
`run_embedded` sends `AofMessage::Shutdown` before shard threads are joined. The AOF writer exits as soon as it receives `Shutdown`, so any `Append` messages sent after that point (or that race with shutdown) can be silently dropped.

### Issue Context
- Sharded handler uses `try_send(AofMessage::Append(..))` and ignores errors.
- tokio `aof_writer_task` breaks immediately on `Shutdown` (it does not keep reading/draining after seeing the shutdown message).

### Fix Focus Areas
- src/server/embedded.rs[347-369]

### Expected fix
One safe pattern:
1) Call `cancel.cancel()`.
2) Join shard threads (so all shard-held `aof_tx` clones are dropped and no more appends can be produced).
3) Then either:
  - Drop the last sender so the receiver gets `Err(_)` and exits after draining queued messages, OR
  - Send `AofMessage::Shutdown` after shards have exited (so it is ordered after all appends).
4) Finally join the AOF writer thread.

This removes the window where appends can occur after `Shutdown`.

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools


View more (4)
4. anyhow::Result in run_embedded 📘 Rule violation ⚙ Maintainability
Description
src/server/embedded.rs introduces anyhow usage (anyhow::Context and anyhow::Result) in
library code. This violates the project requirement to reserve anyhow for main.rs and tests to
keep error handling structured and lightweight.
Code

src/server/embedded.rs[R39-71]

+use anyhow::Context;
+use parking_lot::RwLock;
+use tracing::info;
+
+use crate::config::ServerConfig;
+use crate::persistence::aof::{self, AofMessage, FsyncPolicy};
+use crate::runtime::cancel::CancellationToken;
+use crate::runtime::channel;
+use crate::runtime::{RuntimeFactoryImpl, traits::RuntimeFactory};
+use crate::server;
+use crate::shard::Shard;
+use crate::shard::mesh::{CHANNEL_BUFFER_SIZE, ChannelMesh};
+use crate::shard::shared_databases::ShardDatabases;
+
+/// Run an embedded sharded Moon server until `cancel` is fired.
+///
+/// Behaves like the production `main.rs` startup path but with cluster,
+/// TLS, console, and admin-port concerns elided. Suitable for in-process
+/// embedding (e.g. `helios moon-daemon`).
+///
+/// # Arguments
+/// * `config` — fully-resolved `ServerConfig` (the caller is responsible
+///   for setting `shards >= 1`; if `0`, this fn auto-detects core count).
+/// * `cancel` — when fired, the listener exits, AOF flushes via
+///   `AofMessage::Shutdown`, and shard threads are joined.
+///
+/// # Returns
+/// `Ok(())` on clean shutdown. Returns `Err` if the persistence directory
+/// is unusable, AOF manifest is corrupt, or a shard thread fails to spawn.
+pub async fn run_embedded(
+    mut config: ServerConfig,
+    cancel: CancellationToken,
+) -> anyhow::Result<()> {
Evidence
PR Compliance ID 8 forbids anyhow in non-test library modules outside main.rs. The added
embedded server helper imports anyhow::Context and exposes run_embedded returning
anyhow::Result<()>, directly violating that restriction.

CLAUDE.md
src/server/embedded.rs[39-71]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

## Issue description
`src/server/embedded.rs` uses `anyhow::Context` and returns `anyhow::Result<()>`, but the compliance checklist requires library code to avoid `anyhow` (use `thiserror` / structured error types instead).

## Issue Context
This file is compiled as part of the library (not `main.rs`), so error types should be explicit and consistent (e.g., use `crate::error::{MoonError, MoonResult}` or a new `thiserror::Error` specific to embedded startup).

## Fix Focus Areas
- src/server/embedded.rs[39-71]

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools


5. std::sync::RwLock used 📘 Rule violation ➹ Performance
Description
src/server/embedded.rs adds new uses of std::sync::RwLock for repl_state and acl_table. This
violates the requirement to use parking_lot locks (and avoids std lock overhead/poisoning
behavior).
Code

src/server/embedded.rs[R143-151]

+    let repl_state = Arc::new(std::sync::RwLock::new(
+        crate::replication::state::ReplicationState::new(num_shards, repl_id, repl_id2),
+    ));
+    crate::admin::metrics_setup::set_global_repl_state(repl_state.clone());
+
+    // ACL table (loads aclfile if configured; default no-op otherwise).
+    let acl_table: Arc<std::sync::RwLock<crate::acl::AclTable>> = Arc::new(
+        std::sync::RwLock::new(crate::acl::AclTable::load_or_default(&config)),
+    );
Evidence
PR Compliance ID 4 explicitly disallows introducing std::sync::{Mutex,RwLock}. The new embedded
startup path constructs both the replication state and ACL table wrapped in std::sync::RwLock,
adding new non-compliant synchronization usage.

CLAUDE.md
src/server/embedded.rs[143-151]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

## Issue description
New code in `run_embedded` introduces `std::sync::RwLock`, but the compliance checklist requires `parking_lot::RwLock` / `parking_lot::Mutex` and disallows `std::sync::{Mutex,RwLock}`.

## Issue Context
`repl_state` and `acl_table` are created with `std::sync::RwLock` in this new module. To comply, switch these to `parking_lot` (which may require adjusting the receiving APIs/types that currently expect `std::sync::RwLock`).

## Fix Focus Areas
- src/server/embedded.rs[143-151]

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools


6. Manifest drops persisted writes ✓ Resolved 🐞 Bug ≡ Correctness
Description
run_embedded loads/initializes the multi-part AOF manifest and (when present) clears DBs to replay
only base/incr, but the runtime-tokio AOF writer appends to a single file path and does not
write/advance the manifest’s incr file. After a manifest exists, restarts can discard writes that
were persisted outside base/incr (e.g., via per-shard WAL recovery or legacy appendonly.aof),
causing data loss in embedded single-shard mode.
Code

src/server/embedded.rs[R206-277]

+    // Multi-part AOF replay (single-shard only; matches main.rs constraint).
+    if config.appendonly == "yes" && let Some(ref dir) = persistence_dir {
+        use crate::persistence::aof_manifest::AofManifest;
+        use crate::persistence::replay::DispatchReplayEngine;
+        let base_dir = std::path::PathBuf::from(dir);
+        let manifest_opt = AofManifest::load(&base_dir).with_context(|| {
+            format!(
+                "embedded moon: AOF manifest at {}/appendonlydir/ is corrupt; refusing to start to avoid data loss",
+                base_dir.display()
+            )
+        })?;
+        if let Some(ref manifest) = manifest_opt {
+            if num_shards == 1 {
+                for db in shards[0].databases.iter_mut() {
+                    db.clear();
+                }
+                let loaded = crate::persistence::aof_manifest::replay_multi_part(
+                    &mut shards[0].databases,
+                    manifest,
+                    &DispatchReplayEngine::new(),
+                )
+                .context("embedded moon: multi-part AOF replay failed")?;
+                info!(
+                    "embedded moon: AOF multi-part loaded (seq {}): {} entries",
+                    manifest.seq, loaded
+                );
+                let legacy = base_dir.join("appendonly.aof");
+                if legacy.exists() {
+                    let retired = base_dir.join("appendonly.aof.legacy");
+                    if let Err(e) = std::fs::rename(&legacy, &retired) {
+                        tracing::warn!(
+                            "embedded moon: failed to retire legacy AOF {}: {}",
+                            legacy.display(),
+                            e
+                        );
+                    }
+                }
+            } else {
+                tracing::warn!(
+                    "embedded moon: multi-part AOF skipped in multi-shard mode (not yet supported)"
+                );
+            }
+        } else {
+            // No manifest. If restore_from_persistence already loaded state,
+            // snapshot it as the seq-1 base RDB to avoid losing it on next boot.
+            let has_state = num_shards == 1 && shards[0].databases.iter().any(|db| db.len() > 0);
+            if has_state {
+                let rdb_bytes = crate::persistence::rdb::save_to_bytes(&shards[0].databases)
+                    .context("embedded moon: failed to serialize legacy state for AOF base")?;
+                AofManifest::initialize_with_base(&base_dir, &rdb_bytes)
+                    .context("embedded moon: failed to initialize AOF manifest with base")?;
+                info!(
+                    "embedded moon: first-upgrade captured legacy state as AOF base seq 1 ({} bytes)",
+                    rdb_bytes.len()
+                );
+                let legacy = base_dir.join("appendonly.aof");
+                if legacy.exists() {
+                    let retired = base_dir.join("appendonly.aof.legacy");
+                    if let Err(e) = std::fs::rename(&legacy, &retired) {
+                        tracing::warn!(
+                            "embedded moon: failed to retire legacy AOF {}: {}",
+                            legacy.display(),
+                            e
+                        );
+                    }
+                }
+            } else {
+                AofManifest::initialize(&base_dir)
+                    .context("embedded moon: failed to initialize AOF manifest")?;
+            }
+        }
+    }
Evidence
embedded.rs replays/initializes the multi-part manifest and may clear DBs then replay only
base/incr, but the tokio AOF writer only appends to a single aof_path and does not participate in
the manifest format; replay_multi_part also only replays base/incr, so any writes persisted
outside that pair are ignored on restart.

src/server/embedded.rs[106-125]
src/server/embedded.rs[206-277]
src/persistence/aof.rs[79-130]
src/persistence/aof_manifest.rs[65-112]
src/persistence/aof_manifest.rs[334-403]
src/shard/mod.rs[219-258]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

### Issue description
`run_embedded` performs multi-part AOF manifest replay/initialization and may clear databases to replay `appendonlydir/` base+incr, but the `runtime-tokio` AOF writer (`aof_writer_task`) only appends to a single file path and never writes/advances the manifest incr file. This mismatch can make restarts replay stale/empty base+incr while ignoring the actual persisted write log(s), dropping data.

### Issue Context
- `src/server/embedded.rs` spawns the AOF writer using `config.appendfilename`, then later loads/initializes `AofManifest` and in the manifest-present case clears DBs and replays base+incr, plus renames `appendonly.aof`.
- `src/persistence/aof.rs` shows the `runtime-tokio` writer opens `aof_path` and appends; the manifest-based writer behavior is only implemented under `runtime-monoio`.

### Fix Focus Areas
Pick one consistent persistence strategy for `runtime-tokio` embedded mode:
1) **Disable multi-part manifest handling under tokio** (recommended if tokio doesn’t support multi-part AOF yet):
  - Remove the `AofManifest::load/initialize/replay_multi_part` block from `run_embedded` (and do not rename `appendonly.aof`).
  - Rely on existing sharded recovery (`restore_from_persistence` + WAL replay) for correctness.

OR

2) **Add tokio support for multi-part AOF writing**:
  - Update the tokio AOF writer to wait for/load the manifest and append to `manifest.incr_path()` (mirroring the monoio path), and ensure manifest advance occurs on rewrites.
  - Spawn the AOF writer only after the manifest is known/initialized so it opens the correct incr file.
  - Ensure any “retire legacy appendonly.aof” logic happens only after the writer is no longer writing to it.

- src/server/embedded.rs[106-277]
- src/persistence/aof.rs[79-130]
- src/persistence/aof_manifest.rs[65-112]
- src/persistence/aof_manifest.rs[334-403]
- src/shard/mod.rs[219-258]

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools


7. Blocking shutdown channel send ✓ Resolved 🐞 Bug ☼ Reliability
Description
run_embedded uses tx.send(AofMessage::Shutdown) from async context, but channel::MpscSender is a
bounded flume sender whose send is blocking and can stall shutdown if the queue is full. Other
shutdown code paths use send_async(...).await, so embedded shutdown can hang under load.
Code

src/server/embedded.rs[R400-405]

+    // Listener exited (cancel fired or fatal error). Flush AOF and cancel
+    // any remaining shard work.
+    if let Some(ref tx) = aof_tx {
+        let _ = tx.send(AofMessage::Shutdown);
+    }
+    cancel.cancel();
Evidence
The repo’s MPSC channel type is flume’s Sender, and listener shutdown uses the async send API;
embedded uses the blocking send API from an async function, which can block the runtime thread.

src/server/embedded.rs[400-406]
src/runtime/channel.rs[12-17]
src/server/listener.rs[264-270]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

### Issue description
`run_embedded` calls `tx.send(AofMessage::Shutdown)` on a bounded flume channel while on an async task. `send()` can block the executor thread if the channel is full, potentially hanging shutdown.

### Issue Context
The project’s channel abstraction re-exports `flume::Sender` as `MpscSender`. The non-embedded listener shutdown path uses `send_async(...).await` to avoid blocking.

### Fix Focus Areas
- Replace `tx.send(AofMessage::Shutdown)` with `tx.send_async(AofMessage::Shutdown).await` (or a non-blocking `try_send` with fallback logging) in `run_embedded`.
- Consider also ordering shutdown as: cancel first (stop producers) then async-send shutdown to the writer.

- src/server/embedded.rs[400-405]
- src/runtime/channel.rs[12-17]
- src/server/listener.rs[264-270]

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools



Remediation recommended

8. Allocator name reported wrong 🐞 Bug ◔ Observability
Description
MEMORY DOCTOR reports the allocator as system for non-jemalloc builds, but the Moon binary
installs mimalloc as the global allocator in that configuration, making allocator diagnostics
misleading.
Code

src/command/server_admin.rs[R526-542]

+/// Read allocator name and arena count. Cold path — single mallctl OK.
+fn allocator_info() -> (String, String) {
+    #[cfg(feature = "jemalloc")]
+    {
+        // opt.narenas = configured cap (what we set via malloc_conf / MALLOC_CONF).
+        // arenas.narenas = actual created count (can exceed opt.narenas).
+        // Operators care about the configured limit, not the runtime count.
+        let arena_count = tikv_jemalloc_ctl::opt::narenas::read()
+            .map(|n| n.to_string())
+            .unwrap_or_else(|_| "n/a".to_string());
+        ("jemalloc".to_string(), arena_count)
+    }
+    #[cfg(not(feature = "jemalloc"))]
+    {
+        ("system".to_string(), "n/a".to_string())
+    }
+}
Evidence
The binary explicitly sets mimalloc as the global allocator when jemalloc is off, while MEMORY
DOCTOR hardcodes the label as system under the same cfg.

src/command/server_admin.rs[526-542]
src/main.rs[16-22]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

### Issue description
`allocator_info()` returns `(system, n/a)` when `feature != jemalloc`, but `src/main.rs` sets mimalloc as the global allocator in that build, so MEMORY DOCTOR misreports the allocator.

### Issue Context
In embedded/library use, the allocator might truly be the system allocator, so a robust label should avoid claiming "system" unless that’s guaranteed.

### Fix Focus Areas
- src/command/server_admin.rs[526-542]
- src/main.rs[16-18]

### Expected fix
- Change the non-jemalloc label to something accurate across binaries (e.g., "non-jemalloc"), OR
- If this command is only intended to reflect the Moon binary, adjust to report "mimalloc" when built in that mode (potentially via a shared cfg constant).

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools


9. Autosave counter never increments ✓ Resolved 🐞 Bug ≡ Correctness
Description
run_embedded spawns run_auto_save_sharded with a freshly created change_counter, but that
counter is never passed to shard/connection code and therefore cannot be incremented. As a result,
run_auto_save_sharded’s changes >= threshold condition stays false and configured save rules
will not trigger snapshots in embedded mode.
Code

src/server/embedded.rs[R367-383]

+    // Auto-save timer (no-op when save rules unset).
+    let change_counter = Arc::new(AtomicU64::new(0));
+    if config.save.is_some() {
+        let rules = crate::persistence::auto_save::parse_save_rules(&config.save);
+        if !rules.is_empty() {
+            let auto_save_token = cancel.child_token();
+            let auto_save_counter = change_counter.clone();
+            let auto_save_snap_tx = snap_tx.clone();
+            tokio::spawn(crate::persistence::auto_save::run_auto_save_sharded(
+                rules,
+                auto_save_counter,
+                auto_save_token,
+                auto_save_snap_tx,
+            ));
+            info!("embedded moon: auto-save timer started");
+        }
+    }
Evidence
embedded.rs creates change_counter and only passes it to run_auto_save_sharded;
run_auto_save_sharded requires the counter to increase to trigger, and the sharded
ConnectionContext does not include any change counter field to allow incrementing it.

src/server/embedded.rs[367-383]
src/persistence/auto_save.rs[115-149]
src/server/conn/core.rs[43-81]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

### Issue description
`run_embedded` creates a local `Arc<AtomicU64>` change counter and passes it only to the auto-save task. Since it is not shared with any write path, it will never be incremented and auto-save will never trigger based on change thresholds.

### Issue Context
`run_auto_save_sharded` checks `change_counter.load()` each tick and requires `changes >= threshold` before sending a snapshot epoch. In embedded startup, the counter is created and only cloned into the auto-save task.

### Fix Focus Areas
Choose one:
1) **Actually wire the counter into sharded write paths** (e.g., add an `Arc<AtomicU64>` to the sharded connection context and increment on write commands / WAL appends).
2) **Remove/disable change-based auto-save in embedded mode** until there is a supported dirty-tracking mechanism, so config doesn’t misleadingly claim auto-save is active.

- src/server/embedded.rs[367-383]
- src/persistence/auto_save.rs[115-149]
- src/server/conn/core.rs[43-81]

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools



Advisory comments

10. VSZ page-size error cast ✓ Resolved 🐞 Bug ☼ Reliability
Description
get_vsz_bytes() casts sysconf(_SC_PAGESIZE) to u64 and multiplies without validating the
return value; if sysconf fails (the code comments mention -1), VSZ can become nonsensical due to
the cast/wraparound.
Code

src/command/server_admin.rs[R572-576]

+        // Use the same page_size approach as get_rss_bytes.
+        // SAFETY: `sysconf` with `_SC_PAGESIZE` is a thread-safe POSIX query
+        // with no preconditions; it returns -1 on failure (handled by the cast).
+        let page_size = unsafe { libc::sysconf(libc::_SC_PAGESIZE) } as u64;
+        return (pages * page_size) as usize;
Evidence
The implementation explicitly documents the -1 failure case but does not guard it before casting
and multiplying, so a failure produces an invalid page size and invalid VSZ.

src/command/server_admin.rs[572-576]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

### Issue description
`get_vsz_bytes()` assumes `sysconf(_SC_PAGESIZE)` can be cast directly to `u64`, but on failure it can return `-1` (as noted in the comment). Casting `-1` to `u64` yields a huge value and corrupts VSZ computations.

### Issue Context
This is a diagnostics-only path (MEMORY DOCTOR), but it can produce confusing output when the syscall fails.

### Fix Focus Areas
- src/command/server_admin.rs[544-579]

### Expected fix
- Read the `sysconf` result into a signed integer, check `<= 0`, and return 0 (or a conservative default) on failure before multiplying.
- Consider reusing the cached page-size logic used elsewhere if accessible.

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools


Grey Divider

Qodo Logo

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@src/server/embedded.rs`:
- Around line 106-125: The AOF writer thread spawned in the block that creates
aof_tx is never stored so it cannot be joined on shutdown, risking lost buffered
writes; modify that block to capture and return the std::thread::JoinHandle
(alongside the existing tx) when appendonly == "yes" (the thread runs
RuntimeFactoryImpl::block_on_local with aof::aof_writer_task and uses
cancel.child_token and fsync), and at shutdown after sending
AofMessage::Shutdown to the channel, call join() on that saved JoinHandle
(handling/journaling any join errors) to ensure the writer finishes flushing
before process exit.
- Around line 367-383: The change_counter Arc created in embedded.rs isn't being
handed to shard threads so auto-save change-count rules never see increments;
pass the change_counter into the shard execution path (e.g., add a parameter to
shard.run or otherwise provide the Arc<AtomicU64> when spawning/starting each
shard) and have each shard clone and increment that Arc when state changes;
update the call sites (shard.run(...) and any callers that spawn shards) and
ensure run_auto_save_sharded continues to receive and use the same Arc so the
auto-save rules can observe and act on real change counts.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 689ff3af-477a-484c-87df-693c00daea07

📥 Commits

Reviewing files that changed from the base of the PR and between 7688252 and 045d15c.

📒 Files selected for processing (2)
  • src/server/embedded.rs
  • src/server/mod.rs

Comment thread src/server/embedded.rs
Comment thread src/server/embedded.rs Outdated
Comment on lines +367 to +383
// Auto-save timer (no-op when save rules unset).
let change_counter = Arc::new(AtomicU64::new(0));
if config.save.is_some() {
let rules = crate::persistence::auto_save::parse_save_rules(&config.save);
if !rules.is_empty() {
let auto_save_token = cancel.child_token();
let auto_save_counter = change_counter.clone();
let auto_save_snap_tx = snap_tx.clone();
tokio::spawn(crate::persistence::auto_save::run_auto_save_sharded(
rules,
auto_save_counter,
auto_save_token,
auto_save_snap_tx,
));
info!("embedded moon: auto-save timer started");
}
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Check how main.rs passes change_counter to shards
echo "=== Checking main.rs for change_counter usage ==="
rg -n --type rust 'change_counter' src/main.rs -B2 -A5

echo ""
echo "=== Check run_auto_save_sharded signature and counter usage ==="
rg -n --type rust 'fn run_auto_save_sharded' -A10

Repository: pilotspace/moon

Length of output: 2159


🏁 Script executed:

# First, get full context of shard setup in embedded.rs
sed -n '330,385p' src/server/embedded.rs

# Check if change_counter appears elsewhere in embedded.rs
echo ""
echo "=== change_counter usage in embedded.rs ==="
rg -n 'change_counter' src/server/embedded.rs

# Check how shards are spawned and if they can access change_counter
echo ""
echo "=== Shard spawn/run context ==="
rg -n 'shard\.run\|for.*shard' src/server/embedded.rs -B3 -A3

Repository: pilotspace/moon

Length of output: 2607


🏁 Script executed:

# Get the full implementation of run_auto_save_sharded
sed -n '115,200p' src/persistence/auto_save.rs

# Also check what parse_save_rules returns and how rules are structured
echo ""
echo "=== parse_save_rules signature ==="
rg -n 'fn parse_save_rules' -A20 src/persistence/auto_save.rs

# Search for where counter.load/store is used in auto_save.rs
echo ""
echo "=== counter usage in auto_save.rs ==="
rg -n 'counter\.(load|store|swap|fetch)' src/persistence/auto_save.rs

Repository: pilotspace/moon

Length of output: 4405


change_counter is not passed to shard threads—auto-save change-count rules won't trigger.

The change_counter (line 368) is created and passed to run_auto_save_sharded (line 377) but never reaches the shard threads. The shard.run() call has no parameter for it, so shards cannot increment the counter. Auto-save rules like "300 10" (save every 300s if 10+ changes) rely on this counter; it will always read 0, causing them to never trigger. Only time-based rules with 0 change threshold would trigger.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/server/embedded.rs` around lines 367 - 383, The change_counter Arc
created in embedded.rs isn't being handed to shard threads so auto-save
change-count rules never see increments; pass the change_counter into the shard
execution path (e.g., add a parameter to shard.run or otherwise provide the
Arc<AtomicU64> when spawning/starting each shard) and have each shard clone and
increment that Arc when state changes; update the call sites (shard.run(...) and
any callers that spawn shards) and ensure run_auto_save_sharded continues to
receive and use the same Arc so the auto-save rules can observe and act on real
change counts.

Comment thread src/server/embedded.rs
Comment on lines +39 to +71
use anyhow::Context;
use parking_lot::RwLock;
use tracing::info;

use crate::config::ServerConfig;
use crate::persistence::aof::{self, AofMessage, FsyncPolicy};
use crate::runtime::cancel::CancellationToken;
use crate::runtime::channel;
use crate::runtime::{RuntimeFactoryImpl, traits::RuntimeFactory};
use crate::server;
use crate::shard::Shard;
use crate::shard::mesh::{CHANNEL_BUFFER_SIZE, ChannelMesh};
use crate::shard::shared_databases::ShardDatabases;

/// Run an embedded sharded Moon server until `cancel` is fired.
///
/// Behaves like the production `main.rs` startup path but with cluster,
/// TLS, console, and admin-port concerns elided. Suitable for in-process
/// embedding (e.g. `helios moon-daemon`).
///
/// # Arguments
/// * `config` — fully-resolved `ServerConfig` (the caller is responsible
/// for setting `shards >= 1`; if `0`, this fn auto-detects core count).
/// * `cancel` — when fired, the listener exits, AOF flushes via
/// `AofMessage::Shutdown`, and shard threads are joined.
///
/// # Returns
/// `Ok(())` on clean shutdown. Returns `Err` if the persistence directory
/// is unusable, AOF manifest is corrupt, or a shard thread fails to spawn.
pub async fn run_embedded(
mut config: ServerConfig,
cancel: CancellationToken,
) -> anyhow::Result<()> {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Action required

1. anyhow::result in run_embedded 📘 Rule violation ⚙ Maintainability

src/server/embedded.rs introduces anyhow usage (anyhow::Context and anyhow::Result) in
library code. This violates the project requirement to reserve anyhow for main.rs and tests to
keep error handling structured and lightweight.
Agent Prompt
## Issue description
`src/server/embedded.rs` uses `anyhow::Context` and returns `anyhow::Result<()>`, but the compliance checklist requires library code to avoid `anyhow` (use `thiserror` / structured error types instead).

## Issue Context
This file is compiled as part of the library (not `main.rs`), so error types should be explicit and consistent (e.g., use `crate::error::{MoonError, MoonResult}` or a new `thiserror::Error` specific to embedded startup).

## Fix Focus Areas
- src/server/embedded.rs[39-71]

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools

Comment thread src/server/embedded.rs Outdated
Comment on lines +143 to +151
let repl_state = Arc::new(std::sync::RwLock::new(
crate::replication::state::ReplicationState::new(num_shards, repl_id, repl_id2),
));
crate::admin::metrics_setup::set_global_repl_state(repl_state.clone());

// ACL table (loads aclfile if configured; default no-op otherwise).
let acl_table: Arc<std::sync::RwLock<crate::acl::AclTable>> = Arc::new(
std::sync::RwLock::new(crate::acl::AclTable::load_or_default(&config)),
);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Action required

2. std::sync::rwlock used 📘 Rule violation ➹ Performance

src/server/embedded.rs adds new uses of std::sync::RwLock for repl_state and acl_table. This
violates the requirement to use parking_lot locks (and avoids std lock overhead/poisoning
behavior).
Agent Prompt
## Issue description
New code in `run_embedded` introduces `std::sync::RwLock`, but the compliance checklist requires `parking_lot::RwLock` / `parking_lot::Mutex` and disallows `std::sync::{Mutex,RwLock}`.

## Issue Context
`repl_state` and `acl_table` are created with `std::sync::RwLock` in this new module. To comply, switch these to `parking_lot` (which may require adjusting the receiving APIs/types that currently expect `std::sync::RwLock`).

## Fix Focus Areas
- src/server/embedded.rs[143-151]

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools

Comment thread src/server/embedded.rs Outdated
Comment thread src/server/embedded.rs Outdated
TinDang97 added 3 commits May 17, 2026 00:35
…st skew

Four reviewer-flagged defects in `src/server/embedded.rs`:

- CodeRabbit #1: the AOF writer thread's `JoinHandle` was dropped on spawn,
  so shutdown returned before the writer finished its final fsync. Retain
  the handle and join it (after the shards drain) inside the same
  `spawn_blocking` that joins the shard threads.
- Qodo #4: shutdown used `MpscSender::send` (blocking flume) from an async
  context. Switch to `send_async(...).await`, and cancel the token *before*
  sending shutdown so producers stop enqueueing while the writer drains.
- Qodo #3: the multi-part AOF manifest block (`AofManifest::load /
  initialize / replay_multi_part`) ran under `runtime-tokio`, but the
  tokio AOF writer appends to a single `<dir>/<appendfilename>` and never
  touches the manifest's incr file (only `runtime-monoio` does). The
  combination silently drops persisted writes on the next boot. Remove the
  block and rely on `Shard::restore_from_persistence` (RDB + per-shard WAL)
  for baseline recovery; document why so the monoio path can re-enable it
  later.
- Qodo #5 / CodeRabbit autosave: `change_counter` was created locally and
  only handed to `run_auto_save_sharded`. Nothing in the sharded write
  path can increment it, so change-rule auto-save would never fire. Drop
  the dead spawn and emit a warning when `save` is configured, until the
  sharded ConnectionContext exposes a dirty-tracking hook.

Verified: `cargo check --no-default-features --features runtime-tokio,jemalloc`
and `cargo clippy --lib` are clean for `src/server/embedded.rs`.

author: Tin Dang
CI Lint step (cargo fmt --check) rejected the previous commit. No
functional changes — just reformatting acl_table initialization.

author: Tin Dang
CI's `scripts/audit-unsafe.sh` requires a `// SAFETY:` comment within the
3 lines immediately preceding every `unsafe { ... }` block. Four blocks on
this branch had either no SAFETY comment or one placed too far above the
block:

- `src/command/server_admin.rs:get_vsz_bytes` — `libc::read`, `libc::close`,
  and `libc::sysconf(_SC_PAGESIZE)` calls each get a one-line invariant note.
- `src/main.rs:malloc_conf` — the existing SAFETY comment lived above the
  `union U` definition; move it adjacent to the `unsafe { ... }` block so
  the audit script finds it.

No functional change. `bash scripts/audit-unsafe.sh` now reports
`PASSED: All 190 unsafe blocks have SAFETY comments.`

author: Tin Dang
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 7

🧹 Nitpick comments (1)
src/command/server_admin.rs (1)

479-514: ⚡ Quick win

Add a couple boundary tests for the hand-rolled UTC formatter.

This is custom calendar math in library code, but the new helpers still have no direct coverage. Fixed cases for the Unix epoch, a leap day, and a year rollover would make this much safer to maintain.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/command/server_admin.rs` around lines 479 - 514, Add unit tests covering
the hand-rolled UTC formatter functions (chrono_iso8601_now and days_to_ymd):
create tests that assert correct output for the Unix epoch
(1970-01-01T00:00:00Z), a known leap day (e.g., 2020-02-29 formatted correctly),
and a year-rollover boundary (e.g., 2019-12-31 -> 2020-01-01 transitions) to
ensure the calendar math handles epoch, leap years, and month/year boundaries;
place tests in the same module or tests mod and call days_to_ymd directly for
deterministic checks and chrono_iso8601_now by mocking or converting a
controlled SystemTime where possible.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@src/command/server_admin.rs`:
- Around line 545-578: Replace the unsafe libc read/open/close and sysconf logic
in get_vsz_bytes() with a safe read of /proc/self/status: open the file with
std::fs::read_to_string or similar, find the "VmSize:" line, parse the kB value
as u64, convert to bytes by multiplying by 1024 (handling parse errors by
returning 0), and remove all unsafe blocks and the sysconf cast so you don't
risk treating -1 as a huge page size; keep function name get_vsz_bytes and
ensure it returns 0 on any I/O or parse failure.

In `@src/main.rs`:
- Around line 24-40: Replace the unsafe union/Option initialization of
malloc_conf with a safe C-string literal: change the static's type from
Option<&'static libc::c_char> to *const libc::c_char, remove the union U and
unsafe block, and initialize malloc_conf directly from a c"...\0" C-string
literal (preserving the #[unsafe(export_name = "_rjem_malloc_conf")] and
#[cfg(feature = "jemalloc")] attributes and the exact allocator options string).
This eliminates the unsafe code path used in malloc_conf while keeping the same
exported symbol and null-terminated FFI string.
- Around line 843-888: Replace the UTF-8-only env::args() usage with
env::args_os() to preserve non-UTF-8 argv and avoid panics: collect into
Vec<OsString> (e.g. args: Vec<OsString>), iterate over &args with index i, and
when checking flags use a.to_string_lossy() (or a.to_str() when appropriate) to
match "--memory-arenas-cap=" and the standalone "--memory-arenas-cap" form and
to parse the numeric rest into requested; keep the original Vec<OsString> intact
so the final exec() call
(std::process::Command::new(&exe).args(args.iter().skip(1)).env(...).exec()) can
pass the lossless OsString argv through, and ensure types compile where
requested is Option<u32> and comparisons like if n == 8 still work after
parsing.

In `@src/server/embedded.rs`:
- Around line 29-31: The module docs currently claim embedded mode includes
AOF/manifest replay and an auto-save timer, but the implementation explicitly
skips manifest replay and ignores save rules (see the code paths around lines
223 and 314-329); update the top-of-file/header doc comment and the other doc
comments referenced at lines ~64-66 to accurately state that embedded mode does
NOT perform background snapshots, does NOT replay AOF/manifest, and ignores
`save` rules (i.e., persistence is disabled), and add a short note pointing to
the manifest-replay-skip locations so callers know the intended behavior.
- Line 38: The public function run_embedded currently imports anyhow::Context
and returns anyhow::Result; replace that with the crate's typed error (e.g.,
FrameError or crate::error::Error) and drop the anyhow import: change the
signature of run_embedded to -> Result<(), FrameError> (or the project's
Frame::Error type) and remove use anyhow::Context;. For every call that used
.context("...") inside run_embedded (and downstream spots referenced), convert
those to map_err or ? with proper conversion into the typed error (e.g.,
.map_err(|e| FrameError::new("...").with_source(e)) or
.map_err(FrameError::from)); ensure all error origins are wrapped/converted to
the chosen FrameError so the public API exposes a concrete error type instead of
anyhow.
- Around line 148-156: The code creates repl_state and acl_table using
std::sync::RwLock which violates the project lock policy; change both to use
parking_lot::RwLock wrapped in Arc (i.e. Arc<parking_lot::RwLock<...>>) instead
of Arc<std::sync::RwLock<...>> so the rest of the shard wiring stays on
parking_lot; use the same inner constructors
(crate::replication::state::ReplicationState::new(...) and
crate::acl::AclTable::load_or_default(&config)) and pass repl_state.clone() into
crate::admin::metrics_setup::set_global_repl_state unchanged, updating any type
annotations to parking_lot::RwLock to match signatures.
- Around line 363-369: The shutdown code currently ignores JoinHandle::join()
results for shard_handles and aof_join (inside the tokio::task::spawn_blocking
closure), which swallows panics; update the closure to collect each join()
result, detect Err(panic) from JoinHandle::join(), and if any join returned Err
propagate a corresponding error out of the enclosing function (instead of
ignoring with let _). Specifically, iterate over shard_handles and call
handle.join() into a Result, record any Err(Box<dyn Any + Send>) (and likewise
for aof_join), then return an Err variant from the function containing a
descriptive error (or convert to the function's existing error type) when any
join panicked; keep normal success path when all joins return Ok. Ensure you
adjust the spawn_blocking return to carry that join error so the outer caller
can observe shutdown failure.

---

Nitpick comments:
In `@src/command/server_admin.rs`:
- Around line 479-514: Add unit tests covering the hand-rolled UTC formatter
functions (chrono_iso8601_now and days_to_ymd): create tests that assert correct
output for the Unix epoch (1970-01-01T00:00:00Z), a known leap day (e.g.,
2020-02-29 formatted correctly), and a year-rollover boundary (e.g., 2019-12-31
-> 2020-01-01 transitions) to ensure the calendar math handles epoch, leap
years, and month/year boundaries; place tests in the same module or tests mod
and call days_to_ymd directly for deterministic checks and chrono_iso8601_now by
mocking or converting a controlled SystemTime where possible.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 4d60b663-f71d-437c-91dc-5440e8b7092e

📥 Commits

Reviewing files that changed from the base of the PR and between 045d15c and 523bde0.

📒 Files selected for processing (3)
  • src/command/server_admin.rs
  • src/main.rs
  • src/server/embedded.rs

Comment thread src/command/server_admin.rs Outdated
Comment thread src/main.rs Outdated
Comment thread src/main.rs Outdated
Comment thread src/server/embedded.rs Outdated
Comment thread src/server/embedded.rs
use std::path::PathBuf;
use std::sync::Arc;

use anyhow::Context;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

🧩 Analysis chain

🏁 Script executed:

cat -n src/server/embedded.rs

Repository: pilotspace/moon

Length of output: 18882


Replace anyhow::Result with a typed error for the public run_embedded API.

This function is a public library API in src/server/embedded.rs but returns anyhow::Result<()> and imports anyhow::Context. Per the coding guidelines, library code must use thiserror or Frame::Error, not anyhow, which is reserved for main.rs and test code. This affects line 38 (the import) and lines 67–70 (the function signature and return type), plus all downstream .context() calls (lines 72, 125, 310).

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/server/embedded.rs` at line 38, The public function run_embedded
currently imports anyhow::Context and returns anyhow::Result; replace that with
the crate's typed error (e.g., FrameError or crate::error::Error) and drop the
anyhow import: change the signature of run_embedded to -> Result<(), FrameError>
(or the project's Frame::Error type) and remove use anyhow::Context;. For every
call that used .context("...") inside run_embedded (and downstream spots
referenced), convert those to map_err or ? with proper conversion into the typed
error (e.g., .map_err(|e| FrameError::new("...").with_source(e)) or
.map_err(FrameError::from)); ensure all error origins are wrapped/converted to
the chosen FrameError so the public API exposes a concrete error type instead of
anyhow.

Comment thread src/server/embedded.rs
Comment on lines +148 to +156
let repl_state = Arc::new(std::sync::RwLock::new(
crate::replication::state::ReplicationState::new(num_shards, repl_id, repl_id2),
));
crate::admin::metrics_setup::set_global_repl_state(repl_state.clone());

// ACL table (loads aclfile if configured; default no-op otherwise).
let acl_table: Arc<std::sync::RwLock<crate::acl::AclTable>> = Arc::new(std::sync::RwLock::new(
crate::acl::AclTable::load_or_default(&config),
));
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major | 🏗️ Heavy lift

Keep these shared states on parking_lot::RwLock.

This helper reintroduces std::sync::RwLock for repl_state and acl_table even though the rest of the shard wiring already uses parking_lot::RwLock. That brings poisoning/blocking semantics back into async library code and violates the lock policy.

♻️ Local change to keep the new wiring on parking_lot
-    let repl_state = Arc::new(std::sync::RwLock::new(
+    let repl_state = Arc::new(RwLock::new(
         crate::replication::state::ReplicationState::new(num_shards, repl_id, repl_id2),
     ));
     crate::admin::metrics_setup::set_global_repl_state(repl_state.clone());

     // ACL table (loads aclfile if configured; default no-op otherwise).
-    let acl_table: Arc<std::sync::RwLock<crate::acl::AclTable>> = Arc::new(std::sync::RwLock::new(
+    let acl_table: Arc<RwLock<crate::acl::AclTable>> = Arc::new(RwLock::new(
         crate::acl::AclTable::load_or_default(&config),
     ));

As per coding guidelines, Use parking_lot::RwLock / parking_lot::Mutex — never std::sync locks. Never hold a lock across .await points.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/server/embedded.rs` around lines 148 - 156, The code creates repl_state
and acl_table using std::sync::RwLock which violates the project lock policy;
change both to use parking_lot::RwLock wrapped in Arc (i.e.
Arc<parking_lot::RwLock<...>>) instead of Arc<std::sync::RwLock<...>> so the
rest of the shard wiring stays on parking_lot; use the same inner constructors
(crate::replication::state::ReplicationState::new(...) and
crate::acl::AclTable::load_or_default(&config)) and pass repl_state.clone() into
crate::admin::metrics_setup::set_global_repl_state unchanged, updating any type
annotations to parking_lot::RwLock to match signatures.

Comment thread src/server/embedded.rs Outdated
@TinDang97 TinDang97 closed this May 17, 2026
@TinDang97 TinDang97 reopened this May 17, 2026
@qodo-code-review
Copy link
Copy Markdown

qodo-code-review Bot commented May 17, 2026

Persistent review updated to latest commit f64ee36

Comment on lines +361 to +367
// ── VSZ ratio recommendation ─────────────────────────────────────────
let vsz_ratio = if rss > 0 { vsz / rss } else { 0 };
let vsz_recommendation = if vsz_ratio > 100 {
format!("VSZ-vs-RSS ratio is {vsz_ratio}x (high -- consider --memory-arenas-cap 8)")
} else {
format!("VSZ-vs-RSS ratio is {vsz_ratio}x (normal)")
};
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Action required

1. memory_doctor uses format! 📘 Rule violation ➹ Performance

New MEMORY DOCTOR formatting uses format!() and to_string() within src/command/, which the
checklist treats as hot-path code and disallows due to avoidable heap allocations and conversions.
This can degrade throughput if the command is invoked frequently or during incident response when
the server is under load.
Agent Prompt
## Issue description
`src/command/server_admin.rs` adds new `format!()` and `to_string()` usage in `MEMORY DOCTOR` output construction. The compliance policy forbids these allocation/conversion patterns inside `src/command/`.

## Issue Context
Even if `MEMORY DOCTOR` is intended as a “cold path”, the checklist treats all of `src/command/` as hot-path sensitive.

## Fix Focus Areas
- src/command/server_admin.rs[361-367]
- src/command/server_admin.rs[450-468]
- src/command/server_admin.rs[479-498]
- src/command/server_admin.rs[526-542]

## Implementation notes
- Prefer writing directly into the pre-allocated `out` buffer via `write!`/`writeln!` instead of building intermediate `String`s with `format!`.
- Replace helpers returning `String` (e.g., `humanize_bytes`, `chrono_iso8601_now`, `allocator_info`) with helpers that write into an existing buffer or return borrowed/static data where possible.
- Avoid `.to_string()` in favor of formatting directly into `out` or using lightweight integer formatting (e.g., `itoa`) if already available in the codebase.

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools

Comment thread src/main.rs
Comment thread src/server/embedded.rs Outdated
Trivial wording change to nudge CI to re-run on this branch — GH Actions
did not register a workflow for the prior two pushes (523bde0, f64ee36).
No behavioral change.

author: Tin Dang
@TinDang97 TinDang97 force-pushed the fix/run-embedded-sharded branch from 3b74668 to 18f3b8c Compare May 17, 2026 05:53
TinDang97 added 7 commits May 17, 2026 20:49
Resolves the CONFLICTING merge state on `fix/run-embedded-sharded` against
`main` (which now ships memory_arenas_cap, get_global_repl_state_arc,
get_global_shard_databases, and macos_task_memory_info — all referenced
by this branch's MEMORY DOCTOR / embedded wiring). Conflicts in main.rs
and src/command/server_admin.rs were resolved by replacing the disputed
unsafe blocks with safer alternatives, which both fixes the conflict and
addresses the matching review comments.

Review fixes applied on top of the merge:

- main.rs malloc_conf: drop the union+unsafe pun. Use a c-string literal
  wrapped in a #[repr(transparent)] MallocConfPtr(*const c_char) with a
  documented unsafe impl Sync. ABI matches `const char *` exactly, the
  pointer is 'static so Sync is sound, and there is no remaining
  byte-level pointer pun. (CodeRabbit)

- main.rs arena scan: replace env::args() with env::args_os() +
  OsStr::as_bytes(). Non-UTF-8 argv no longer panics before clap reports
  the error, and the original OsString argv is preserved for the
  re-spawn round-trip. (CodeRabbit)

- server_admin.rs get_vsz_bytes: drop all 4 unsafe libc blocks and
  parse /proc/self/status via std::fs::read_to_string + line scan. Cold
  admin path, allocation is fine, and the misleading "handled by the
  cast" comment on the sysconf(-1) path is gone. (CodeRabbit)

- embedded.rs shutdown ordering: cancel -> join shards -> drop outer
  aof_tx -> join AOF. Sending AofMessage::Shutdown before shards exit
  let the writer terminate while shards still try_send appends,
  dropping the final writes (Qodo bug #5). Relying on channel-close
  instead also removes the blocking-send hazard.

- embedded.rs join handling: surface shard / AOF thread panics as an
  error in the function return instead of `let _ = handle.join()`.
  Callers now observe incomplete shutdowns rather than seeing a
  stale-success listener result. (CodeRabbit)

- embedded.rs module docs: stop claiming AOF manifest replay and
  auto-save are wired in embedded mode. Both are explicitly skipped —
  the docs now match the in-body rationale and the warning log emitted
  when `save` rules are configured. (CodeRabbit)

Skipped with reason:

- std::sync::RwLock for repl_state/acl_table: Shard::run's signature
  in src/shard/event_loop.rs:64,67 requires Arc<std::sync::RwLock<...>>.
  main.rs:309,336 uses the same type. Swapping to parking_lot is a
  multi-file refactor outside this PR's scope.

- anyhow::Result on run_embedded: fn main() -> anyhow::Result<()> in
  the binary uses the same convention. Introducing a typed error on
  the public embedded entry point alone would force every caller
  (Helios, tests) to bridge across two error families. Defer to a
  follow-up that converts the whole startup surface.

- format!/to_string in memory_doctor: cold administrative path, not
  the hot command dispatch the rule was written for. Refactoring to
  write directly into the response buffer is a separate cleanup.

author: Tin Dang
Two .expect() calls in Database::set's insert_or_update closures landed
on main (b173c6b) without the #[allow(clippy::expect_used)] annotation
the hot-path unwrap audit (scripts/audit-unwrap.sh) requires. CI Lint
job fails with baseline=0, count=2.

Both call sites are sound: `insert_or_update` invokes exactly one of
the two closures exactly once per call, so the `Cell::take()` cannot
observe a None. Annotate with #[allow(clippy::expect_used)] and a
one-line justification per CLAUDE.md (Coding Rules / Error Handling).

author: Tin Dang
Three CI gates were failing on PR #95 for reasons unrelated to the
embedded-sharded change itself; this commit removes them.

CHANGELOG.md:
- New "Lint" gate on main requires a CHANGELOG entry per PR. Add an
  Added section for run_embedded and a Fixed section enumerating the
  PR #95 review hardening (malloc_conf, get_vsz_bytes, args_os, AOF
  shutdown ordering, unwrap ratchet).

src/storage/eviction.rs:
- test_volatile_ttl_evicts_soonest is flaky on Linux CI (~28% failure)
  because find_victim_volatile_ttl reservoir-samples across DashTable
  segments with a non-deterministic RNG. With only 2 keys and segments
  >> 2, a single eviction round can fail to sample `soon` and pick
  `later` instead. Same root cause as test_lru_evicts_oldest, which
  the team already fixed with a bounded retry loop (546ff7b). Apply
  the same pattern: drive eviction in a bounded loop, re-inserting
  `later` when it gets evicted by mistake, so the sampler is
  guaranteed to eventually pick `soon`.

tests/txn_kv_wiring.rs:
- wait_for_server returned `true` as soon as TCP connect succeeded,
  but Moon's listener binds before the shard accept loop is fully
  wired — the first connection after bind can be reset on macOS
  ("Connection reset by peer", os error 54). Replace the TCP probe
  with a real RESP PING round-trip; only return true once we observe
  PONG, which proves the dispatch path is live.

author: Tin Dang
The previous push tightened wait_for_server from "TCP connect ok" to
"RESP PING returns PONG", which is the correct readiness signal. But
5 seconds was sufficient for the old check (kernel-side bind) and is
not always enough for the real check on macOS CI runners that need to
spawn the binary, replay AOF, init NUMA + shard threads, and start
serving — observed flaking with "did not become ready on port".

Bump both call sites in test_txn_commit_wal_crash_recovery to 30s.
The PING loop sleeps 50ms between attempts so the test still returns
the moment the server is ready.

author: Tin Dang
The previous push tried to defend against the "Connection reset by peer"
flake on macOS by upgrading wait_for_server from "TCP connect ok" to a
real RESP PING round-trip. That broke Linux too: the test had been
passing under the old TCP-only probe on commit 96507f0, and the PING
variant never observes PONG within 30s on CI (cause not reproducible
locally; likely a buffer / EOF interaction in CI sandboxes).

Revert wait_for_server to its original TCP-connect probe and instead
wrap the redis client's first get_connection() in a 15-second retry
loop. This way:
  - Linux: same wait_for_server behavior as before this branch, plus a
    benign retry that no-ops when the connection succeeds first try.
  - macOS: the retry rides out the bind-before-shard-ready window that
    surfaces as "Connection reset by peer".

author: Tin Dang
Previous push added connect_with_retry but used the bare
`client.get_connection()` API, which blocks indefinitely when the server
accepts the TCP connection but does not respond to the RESP handshake.
On CI, Moon's listener binds before the shard accept loop is fully
wired, so the first connect hangs the test until the kernel TCP
keepalive eventually fires (>>15 minutes, exhausting the CI job).

Use `get_connection_with_timeout(2s)` so each attempt fails fast and
the outer 15s retry deadline actually fires.

author: Tin Dang
The last four pushes on this branch all tried different strategies to
deflake test_txn_commit_wal_crash_recovery (PING wait, 30s timeout,
connect_with_retry, 2s per-attempt timeout). None converged: each
variant traded one platform's failure for another's.

Bisecting against 96507f0 showed Linux was already passing this test
under the original TCP-connect probe before this branch touched the
file. The macOS "Connection reset by peer" failure is reproducible on
origin/main today — it is not introduced by this PR.

Revert tests/txn_kv_wiring.rs entirely to origin/main so this PR no
longer carries a churned test file. The remaining macOS CI failure on
this test is pre-existing and should be fixed in a dedicated PR.

author: Tin Dang
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant