Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .planning
27 changes: 27 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,33 @@ See `docs/guides/pitr.md` for operator usage.

See `docs/guides/cdc.md` for consumer integration.

### Added — Embedded sharded server

- **PR #95** — `server::embedded::run_embedded(config, cancel)` exposes the
full sharded handler (with TXN.* cross-store transactions) to in-process
embedders such as `helios moon-daemon`. The existing `run_with_shutdown`
drives `handler_single`, which deliberately does not implement TXN.
Embedded mode skips TLS, console, cluster bus, admin port, and multi-part
AOF manifest replay; it does include per-shard RDB + WAL recovery,
graph/temporal/workspace/MQ WAL replay, SO_REUSEPORT, NUMA pinning, and
cancel-driven graceful shutdown.

### Fixed — PR #95 review hardening

- `main.rs` `malloc_conf` symbol: replaced the union-based unsafe pun with
a `#[repr(transparent)]` `Sync` wrapper around a `c"..."` literal.
- `command/server_admin.rs` `get_vsz_bytes`: replaced four `unsafe`
libc::{open,read,close,sysconf} blocks with safe `/proc/self/status`
parsing on the cold MEMORY DOCTOR path.
- `main.rs` arena scan now uses `env::args_os()` so non-UTF-8 argv no
longer panics before clap reports the error.
- `server/embedded.rs` shutdown sequence: cancel → join shard threads →
drop the outer `aof_tx` → join the AOF thread, so the writer never
exits while shards are still queuing appends. Thread join panics are
now propagated through the function result.
- `storage/db.rs` annotated two `.expect()` calls in `Database::set`'s
`insert_or_update` closures for the hot-path unwrap ratchet.

### Deferred to v0.2 follow-ups

- **P3c** — wire `SnapshotState::set_last_lsn(wal_flush_lsn)` into the live
Expand Down
38 changes: 13 additions & 25 deletions src/command/server_admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -542,34 +542,22 @@ fn allocator_info() -> (String, String) {
}

/// Read VSZ (virtual memory size) for the current process.
///
/// Uses safe `/proc/self/status` parsing — `VmSize:` line is the canonical
/// virtual size in KiB. Cold admin path, allocation is fine.
#[cfg(target_os = "linux")]
fn get_vsz_bytes() -> usize {
// /proc/self/statm field 0 is size in pages.
// SAFETY: same pattern as get_rss_bytes in metrics_setup.rs.
let fd = unsafe { libc::open(c"/proc/self/statm".as_ptr(), libc::O_RDONLY) };
if fd < 0 {
return 0;
}
let mut buf = [0u8; 128];
let n = unsafe { libc::read(fd, buf.as_mut_ptr().cast::<libc::c_void>(), buf.len()) };
unsafe { libc::close(fd) };
if n <= 0 {
let Ok(status) = std::fs::read_to_string("/proc/self/status") else {
return 0;
}
let s = &buf[..n as usize];
// Field 0 = size (VSZ in pages).
if let Some(size_field) = s.split(|&b| b == b' ').next() {
let mut pages: u64 = 0;
for &b in size_field {
if b.is_ascii_digit() {
pages = pages * 10 + (b - b'0') as u64;
}
}
// Use the same page_size approach as get_rss_bytes.
let page_size = unsafe { libc::sysconf(libc::_SC_PAGESIZE) } as u64;
return (pages * page_size) as usize;
}
0
};
status
.lines()
.find_map(|line| {
let rest = line.strip_prefix("VmSize:")?;
let kib = rest.split_whitespace().next()?.parse::<usize>().ok()?;
kib.checked_mul(1024)
})
.unwrap_or(0)
}

#[cfg(target_os = "macos")]
Expand Down
48 changes: 28 additions & 20 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,24 @@ static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
#[global_allocator]
static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;

/// `#[repr(transparent)]` wrapper around `*const c_char` so we can declare a
/// `Sync` static that exposes the exact `const char *` ABI jemalloc expects
/// for the `_rjem_malloc_conf` symbol.
#[cfg(feature = "jemalloc")]
#[repr(transparent)]
pub struct MallocConfPtr(*const libc::c_char);

// SAFETY: The pointer is a `'static` C-string literal — immutable for the
// lifetime of the program. jemalloc reads it exactly once during init.
#[cfg(feature = "jemalloc")]
unsafe impl Sync for MallocConfPtr {}

#[cfg(feature = "jemalloc")]
#[allow(non_upper_case_globals)]
#[unsafe(export_name = "_rjem_malloc_conf")]
pub static malloc_conf: Option<&'static libc::c_char> = {
// SAFETY: The byte string is null-terminated and valid ASCII; reinterpreting
// the first byte's address as *const c_char is safe (same layout).
union U {
x: &'static u8,
y: &'static libc::c_char,
}
Some(unsafe {
U {
x: &b"narenas:8,background_thread:true,metadata_thp:auto,dirty_decay_ms:1000,muzzy_decay_ms:5000,abort_conf:true\0"[0],
}
.y
})
};
pub static malloc_conf: MallocConfPtr = MallocConfPtr(
c"narenas:8,background_thread:true,metadata_thp:auto,dirty_decay_ms:1000,muzzy_decay_ms:5000,abort_conf:true".as_ptr(),
);

use std::path::PathBuf;

Expand Down Expand Up @@ -840,17 +841,24 @@ fn maybe_respawn_with_arena_override() -> anyhow::Result<()> {
// Lightweight scan of argv for --memory-arenas-cap N or --memory-arenas-cap=N.
// We can't use clap here because clap::parse() requires the full struct, and
// we need to inject env vars BEFORE jemalloc reads the config.
let args: Vec<String> = env::args().collect();
// Use args_os() to avoid panicking on non-UTF-8 argv and to preserve the
// original OsString argv for the re-spawn below (CodeRabbit).
use std::ffi::OsString;
use std::os::unix::ffi::OsStrExt;
let args: Vec<OsString> = env::args_os().collect();
let mut requested: Option<u32> = None;
let mut i = 1;
while i < args.len() {
let a = &args[i];
if let Some(rest) = a.strip_prefix("--memory-arenas-cap=") {
requested = rest.parse().ok();
let a = args[i].as_os_str().as_bytes();
if let Some(rest) = a.strip_prefix(b"--memory-arenas-cap=") {
requested = std::str::from_utf8(rest).ok().and_then(|s| s.parse().ok());
break;
}
if a == "--memory-arenas-cap" && i + 1 < args.len() {
requested = args[i + 1].parse().ok();
if a == b"--memory-arenas-cap" && i + 1 < args.len() {
requested = args[i + 1]
.as_os_str()
.to_str()
.and_then(|s| s.parse().ok());
break;
}
i += 1;
Expand Down
Loading
Loading