Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,20 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- **monoio handler: `maxclients reached` after first disconnect.** `handle_connection_sharded_monoio` called `record_connection_closed()` unconditionally at its exit, while the caller (`conn_accept.rs`) also called it in the non-migration branch. The `AtomicU64` counter wrapped to `u64::MAX` on the second `fetch_sub`, causing every subsequent `try_accept_connection` to reject against `maxclients`. Removed the handler-level decrement to restore symmetry with the `try_accept_connection` increment owned by the caller. Verified: 10 sequential SETs now succeed; `redis-benchmark SET p=16 c=50` reports real throughput (1.25M+ req/s) instead of rejection errors.

### Performance — PR #43 Recovery (2026-04-12)

Measured on aarch64 Linux (OrbStack moon-dev, 1 shard, 50 clients, `redis-benchmark` 8.0.2):

- **SET p=16: +34.4% avg, +48% peak** (2.34M → 3.15M rps) — primary target hit
- **SET p=1: -2.5% median** (within run-to-run noise)
- **GET p=16: +4.0%** — NEON SIMD bonus on reads

Three optimisation tracks:

- **ACL caching**: per-connection `cached_acl_unrestricted` flag skips RwLock + HashMap SipHash probe on every command for unrestricted users.
- **Inline SET dispatch**: extend `try_inline_dispatch` to handle plain `SET key value` from raw RESP bytes, bypassing Frame construction/drop. Handles eviction + AOF. Zero-copy key/value via `read_buf.split_to(consumed).freeze() + slice()`.
- **NEON SIMD for DashTable**: AArch64 NEON path for `Group::match_h2` (1.39× scalar) and `match_empty_or_deleted` (7.68× scalar). Microbench: `cargo bench --bench simd_probe`.

### Added — Graph Engine Integration (v0.1.4, 2026-04-11)

- **Property graph engine** (`src/graph/`, feature-gated under `graph`): segment-aligned CSR storage with SlotMap generational indices, ArcSwap lock-free reads, Roaring validity bitmaps, and Rabbit Order compaction for cache locality. 8,500+ LOC, 319 tests.
Expand Down
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -163,3 +163,7 @@ required-features = ["graph"]
name = "graph_traversal"
harness = false
required-features = ["graph"]

[[bench]]
name = "simd_probe"
harness = false
6 changes: 6 additions & 0 deletions UNSAFE_POLICY.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,12 @@ SAFETY comment:
- `slice::from_raw_parts(self.ptr, self.len)` where `self` owns the
allocation and `len` is a struct invariant.
- `is_x86_feature_detected!`-gated SIMD intrinsics.
- AArch64 NEON intrinsics from `core::arch::aarch64` under a
`#[cfg(target_arch = "aarch64")]` gate. NEON is mandatory in ARMv8-A,
so no runtime detection is needed; the `unsafe` wrapper is a Rust
formality. The SAFETY comment must still justify pointer validity,
alignment requirements (note `vld1q_u8` has none, unlike SSE2's
`_mm_load_si128`), and any shift-by-constant assumptions.
- `MmapOptions::new().map(&file)` over a sealed-after-rename file with a
refcount-protected directory handle (see
`vector::persistence::warm_segment::WarmSegmentFiles` for the canonical
Expand Down
150 changes: 150 additions & 0 deletions benches/simd_probe.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
//! Micro-benchmark: `Group::match_h2` SIMD paths (SSE2 / NEON) vs scalar fallback.
//!
//! The `Segment::find` hot path in `DashTable` calls `match_h2` for every probe.
//! On aarch64 the scalar fallback was showing up as ~14% of CPU in the profiling
//! that motivated Track B of PR #71. This micro-bench measures the SIMD win on
//! the actual target CPU, decoupled from network benchmark infrastructure.
//!
//! We benchmark three workload shapes per path:
//! - `miss` : 16 FULL bytes, none match the needle
//! - `hit_one` : one match at a random position
//! - `hit_many` : three matches (common for H2 collisions)
//!
//! The scalar implementation is inlined here to provide a direct A/B on the same
//! CPU, compiler, and optimiser settings.

use std::hint::black_box;

use criterion::{Criterion, criterion_group, criterion_main};

use moon::storage::dashtable::simd::{BitMask, Group};

/// Scalar baseline — identical semantics to the SIMD `match_h2` but portable.
/// Inlined here (not re-exported from the library) to keep the comparison
/// honest across platforms.
#[inline]
fn match_h2_scalar(group: &Group, h2: u8) -> BitMask {
let mut mask = 0u16;
for i in 0..16 {
if group.0[i] == h2 {
mask |= 1 << i;
}
}
BitMask(mask)
}

/// Scalar baseline for `match_empty_or_deleted` (bit 7 set).
#[inline]
fn match_empty_or_deleted_scalar(group: &Group) -> BitMask {
let mut mask = 0u16;
for i in 0..16 {
if group.0[i] & 0x80 != 0 {
mask |= 1 << i;
}
}
BitMask(mask)
}

fn bench_match_h2(c: &mut Criterion) {
// ── Workload fixtures ────────────────────────────────────────────────
// `miss`: 16 FULL bytes, none equal the needle (0x2A).
let miss = Group([
0x10, 0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18, 0x19, 0x1a, 0x1b, 0x1c, 0x1d, 0x1e,
0x1f,
]);
// `hit_one`: exactly one position matches 0x42 (at index 7).
let hit_one = Group([
0x10, 0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x42, 0x18, 0x19, 0x1a, 0x1b, 0x1c, 0x1d, 0x1e,
0x1f,
]);
// `hit_many`: three positions match 0x42 (indices 0, 5, 15) — typical H2 collision.
let hit_many = Group([
0x42, 0x11, 0x12, 0x13, 0x14, 0x42, 0x16, 0x17, 0x18, 0x19, 0x1a, 0x1b, 0x1c, 0x1d, 0x1e,
0x42,
]);

// ── SIMD path (NEON on aarch64, SSE2 on x86_64) ──────────────────────
let mut simd_group = c.benchmark_group("simd/match_h2");

simd_group.bench_function("simd_miss", |b| {
b.iter(|| {
// SAFETY: SSE2 is baseline on x86_64; NEON is mandatory on AArch64.
#[cfg(target_arch = "x86_64")]
let r = unsafe { black_box(&miss).match_h2(black_box(0x42)) };
#[cfg(not(target_arch = "x86_64"))]
let r = black_box(&miss).match_h2(black_box(0x42));
Comment thread
coderabbitai[bot] marked this conversation as resolved.
black_box(r)
})
});

simd_group.bench_function("simd_hit_one", |b| {
b.iter(|| {
// SAFETY: SSE2 is baseline on x86_64; NEON is mandatory on AArch64.
#[cfg(target_arch = "x86_64")]
let r = unsafe { black_box(&hit_one).match_h2(black_box(0x42)) };
#[cfg(not(target_arch = "x86_64"))]
let r = black_box(&hit_one).match_h2(black_box(0x42));
black_box(r)
})
});

simd_group.bench_function("simd_hit_many", |b| {
b.iter(|| {
// SAFETY: SSE2 is baseline on x86_64; NEON is mandatory on AArch64.
#[cfg(target_arch = "x86_64")]
let r = unsafe { black_box(&hit_many).match_h2(black_box(0x42)) };
#[cfg(not(target_arch = "x86_64"))]
let r = black_box(&hit_many).match_h2(black_box(0x42));
black_box(r)
})
});

simd_group.finish();

// ── Scalar baseline (same CPU, same compiler flags) ──────────────────
let mut scalar_group = c.benchmark_group("simd/match_h2_scalar");

scalar_group.bench_function("scalar_miss", |b| {
b.iter(|| black_box(match_h2_scalar(black_box(&miss), black_box(0x42))))
});

scalar_group.bench_function("scalar_hit_one", |b| {
b.iter(|| black_box(match_h2_scalar(black_box(&hit_one), black_box(0x42))))
});

scalar_group.bench_function("scalar_hit_many", |b| {
b.iter(|| black_box(match_h2_scalar(black_box(&hit_many), black_box(0x42))))
});

scalar_group.finish();
}

fn bench_match_empty_or_deleted(c: &mut Criterion) {
// Mixed group: 2 FULL (bit 7 clear), 1 DELETED (0x80), rest EMPTY (0xFF).
let mixed = Group([
0x10, 0x80, 0x7f, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff,
]);

let mut g = c.benchmark_group("simd/match_empty_or_deleted");

g.bench_function("simd", |b| {
b.iter(|| {
// SAFETY: SSE2 baseline on x86_64; NEON mandatory on AArch64.
#[cfg(target_arch = "x86_64")]
let r = unsafe { black_box(&mixed).match_empty_or_deleted() };
#[cfg(not(target_arch = "x86_64"))]
let r = black_box(&mixed).match_empty_or_deleted();
black_box(r)
})
});

g.bench_function("scalar", |b| {
b.iter(|| black_box(match_empty_or_deleted_scalar(black_box(&mixed))))
});

g.finish();
}

criterion_group!(benches, bench_match_h2, bench_match_empty_or_deleted);
criterion_main!(benches);
125 changes: 124 additions & 1 deletion src/acl/table.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};

use crate::config::ServerConfig;
use crate::protocol::Frame;
Expand Down Expand Up @@ -235,22 +237,68 @@ impl AclUser {

pub struct AclTable {
users: HashMap<String, AclUser>,
/// Monotonic version counter bumped on every mutation (set/del/apply).
/// Readers outside the `RwLock<AclTable>` can subscribe to this counter
/// via [`Self::version_handle`] and detect stale per-connection caches
/// without acquiring the lock: when `load(Acquire)` differs from their
/// cached snapshot, the cache must be re-resolved.
///
/// `Arc<AtomicU64>` rather than a plain `AtomicU64` so the handle can
/// be cloned into `ConnectionContext` / `ConnectionState` at connection
/// accept time and survive ACL LOAD (which calls
/// [`Self::replace_with`] to swap the user set while preserving the
/// counter's identity).
version: Arc<AtomicU64>,
}

impl AclTable {
pub fn new() -> Self {
AclTable {
users: HashMap::new(),
version: Arc::new(AtomicU64::new(0)),
}
}

/// Create an empty table with no users (used by ACL LOAD).
pub fn new_empty() -> Self {
AclTable {
users: HashMap::new(),
version: Arc::new(AtomicU64::new(0)),
}
}

/// Return a clone of the version-counter handle for lock-free staleness
/// checks by cached consumers (e.g. `ConnectionState::cached_acl_*`).
#[inline]
pub fn version_handle(&self) -> Arc<AtomicU64> {
Arc::clone(&self.version)
}

/// Read the current version — caller must already hold the RwLock read
/// guard for a consistent snapshot with any user-data read in the same
/// critical section.
#[inline]
pub fn version(&self) -> u64 {
self.version.load(Ordering::Acquire)
}

/// Bump the version counter. Must be called under a write lock on self
/// (i.e. via `&mut self`) so that readers see the underlying user-data
/// change before the version increment is observable.
#[inline]
fn bump_version(&self) {
self.version.fetch_add(1, Ordering::Release);
}

/// Replace the user-set in place while preserving the same `Arc<AtomicU64>`
/// version handle. Used by ACL LOAD so existing per-connection handles
/// (`version_handle()` clones distributed at accept time) remain valid
/// and the counter is strictly monotonic across reloads.
pub fn replace_with(&mut self, new: AclTable) {
self.users = new.users;
self.bump_version();
}

/// Bootstrap from ServerConfig. Loads aclfile if configured, otherwise creates
/// the default user from requirepass (or nopass).
pub fn load_or_default(config: &ServerConfig) -> Self {
Expand All @@ -264,15 +312,23 @@ impl AclTable {
}

pub fn get_user_mut(&mut self, username: &str) -> Option<&mut AclUser> {
// Callers that mutate via this handle must call bump_version() after
// their mutation is complete (or go through set_user / apply_setuser
// which bump automatically). See src/command/acl.rs for call sites.
self.users.get_mut(username)
}

pub fn set_user(&mut self, username: String, user: AclUser) {
self.users.insert(username, user);
self.bump_version();
}

pub fn del_user(&mut self, username: &str) -> bool {
self.users.remove(username).is_some()
let removed = self.users.remove(username).is_some();
if removed {
self.bump_version();
}
removed
}

pub fn list_users(&self) -> Vec<&AclUser> {
Expand All @@ -294,6 +350,7 @@ impl AclTable {
for rule in rules {
apply_rule(user, rule);
}
self.bump_version();
}

/// Authenticate username+password. Returns Some(username) on success, None on failure.
Expand All @@ -312,6 +369,15 @@ impl AclTable {
}
}

/// Return `true` if the named user exists and has no restrictions at all
/// (enabled + AllAllowed + `~*` rw + `&*`). Used by the connection handler
/// to cache the unrestricted flag per-connection, avoiding the RwLock +
/// HashMap probe on every command for the common case (default user).
#[inline]
pub fn is_user_unrestricted(&self, username: &str) -> bool {
self.users.get(username).is_some_and(|u| u.unrestricted())
}

/// Check if the command is allowed for the user.
/// Returns None if allowed, Some(reason) if denied.
pub fn check_command_permission(
Expand Down Expand Up @@ -795,4 +861,61 @@ mod tests {
.is_some()
);
}

#[test]
fn version_bumps_on_set_del_apply() {
// Every mutation entry point must advance the shared version
// counter so cached consumers (per-connection unrestricted flag)
// can detect staleness via the Arc<AtomicU64> handle without
// needing to re-acquire the RwLock.
let mut table = AclTable::new();
let v0 = table.version();
let handle = table.version_handle();

table.apply_setuser("alice", &["on", "nopass", ">pw"]);
let v1 = table.version();
assert!(v1 > v0, "apply_setuser must bump version");
assert_eq!(v1, handle.load(Ordering::Acquire));

table.set_user("bob".to_string(), AclUser::default_deny("bob".to_string()));
let v2 = table.version();
assert!(v2 > v1, "set_user must bump version");

assert!(table.del_user("bob"));
let v3 = table.version();
assert!(v3 > v2, "del_user must bump version");

// del_user returning false (no such user) MUST NOT bump — otherwise
// callers can spam DELUSER to inflate the counter.
assert!(!table.del_user("nonexistent"));
assert_eq!(table.version(), v3, "no-op del_user must not bump");
}

#[test]
fn version_handle_survives_replace_with() {
// ACL LOAD swaps the user set in place via `replace_with` instead of
// `*table = new_table` precisely so cached connection handles remain
// valid. Verify the Arc stays identity-equal and the counter
// advances monotonically across the swap.
let mut table = AclTable::new();
table.apply_setuser("alice", &["on", "nopass"]);
let pre_handle = table.version_handle();
let v_pre = pre_handle.load(Ordering::Acquire);

let mut new_table = AclTable::new_empty();
new_table.set_user("bob".to_string(), AclUser::default_deny("bob".to_string()));
table.replace_with(new_table);

let post_handle = table.version_handle();
assert!(
Arc::ptr_eq(&pre_handle, &post_handle),
"replace_with must preserve the Arc<AtomicU64> identity"
);
assert!(
pre_handle.load(Ordering::Acquire) > v_pre,
"replace_with must bump the shared counter"
);
assert!(table.get_user("bob").is_some());
assert!(table.get_user("alice").is_none());
}
}
7 changes: 6 additions & 1 deletion src/command/acl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,12 @@ pub fn handle_acl(
let Ok(mut table) = acl_table.write() else {
return Frame::Error(Bytes::from_static(b"ERR internal ACL error"));
};
*table = new_table;
// Preserve the Arc<AtomicU64> version handle that
// existing connections hold references to — if we
// replaced via `*table = new_table`, their handles
// would observe the old (now unreachable) counter and
// never invalidate their caches.
table.replace_with(new_table);
Frame::SimpleString(Bytes::from_static(b"OK"))
}
},
Expand Down
Loading
Loading