-
Notifications
You must be signed in to change notification settings - Fork 0
feat: client connection security hardening (P0 + P1) #69
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
70 commits
Select commit
Hold shift + click to select a range
e37114b
docs: sync .planning submodule — gap closure phases 106-111
TinDang97 982ac25
feat: wire metric recording helpers into subsystems (Phase 106)
TinDang97 d6a8fed
ci: add integration test job for durability and replication (Phase 107)
TinDang97 9e29b63
ci: harden fuzz and loom CI coverage (Phase 108)
TinDang97 cc00a32
ci: add lettuce and StackExchange.Redis compatibility tests (Phase 109)
TinDang97 458dc16
feat: add otel feature flag and log schema docs (Phase 110)
TinDang97 c9cbbad
docs: sync .planning submodule — gap closure phases 106-110 complete
TinDang97 9a0b798
docs: sync .planning submodule — v0.1.3 milestone complete
TinDang97 3f801f9
feat(112-01): add graph core types and dependencies
TinDang97 f44bbdf
feat(112-01): add MemGraph mutable adjacency list
TinDang97 fe71f80
feat(112-01): add CSR immutable graph segment
TinDang97 15451f0
feat(112-01): add GraphSegmentHolder with ArcSwap lock-free reads
TinDang97 cca5d8d
feat(112-01): add graph compaction with Rabbit Order reordering
TinDang97 1b6e6e5
feat(112-01): add GraphStore per-shard with lazy initialization
TinDang97 2fc1297
feat(112-01): integrate graph module into Shard with feature gate
TinDang97 b9fc9de
docs(112-01): sync .planning submodule -- Graph Storage Foundation co…
TinDang97 c3d6c8e
feat(113-01): extend TransactionManager with graph write intents
TinDang97 5422929
feat(113-01): add graph MVCC visibility checks and txn_id field
TinDang97 6e18a64
feat(113-01): add bounded epoch traversal guard
TinDang97 f9c308f
feat(113-01): add graph WAL serialization as RESP arrays
TinDang97 9796580
feat(113-01): add two-pass graph WAL replay
TinDang97 b0d173b
fix(113-01): fix Arc ownership in graph replay take/put pattern
TinDang97 7e4b63b
docs(113-01): sync .planning submodule -- Graph MVCC and WAL Durabili…
TinDang97 663b0a9
feat(114-01): add GRAPH.* command suite with dispatch registration
TinDang97 ab64282
docs(114-01): sync .planning submodule -- Phase 114 complete
TinDang97 8586c24
feat(115-01): add graph traversal scoring functions
TinDang97 ba62d2c
feat(115-01): add BFS/DFS/Dijkstra traversal engine with segment merg…
TinDang97 3b27bd0
chore(115-01): wire scoring and traversal modules into graph/mod.rs
TinDang97 411cdc6
docs(115-01): sync .planning submodule -- Phase 115 complete
TinDang97 88b644c
feat(116-01): add graph index module with LabelIndex, EdgeTypeIndex, …
TinDang97 8ef1d16
feat(116-01): integrate indexes into CsrSegment and NamedGraph
TinDang97 6e66ffd
docs(116-01): sync .planning submodule -- Phase 116 complete
TinDang97 c0a4942
feat(117-01): add Cypher parser with logos lexer, recursive descent, …
TinDang97 b3e2f06
feat(117-01): wire GRAPH.QUERY, GRAPH.RO_QUERY, GRAPH.EXPLAIN to Cyph…
TinDang97 3967237
test(117-01): add cypher_parse fuzz target for CI pipeline
TinDang97 b4a04b4
docs(117-01): sync .planning submodule — Phase 117 Cypher Parser comp…
TinDang97 1f28b7b
feat(118-01): add hybrid graph+vector query engine
TinDang97 67ba26e
feat(118-01): wire GRAPH.VSEARCH and GRAPH.HYBRID commands
TinDang97 7262e82
docs: sync .planning submodule — Phase 118 hybrid graph+vector querie…
TinDang97 63848b3
feat(119-01): add GraphStats module with incremental degree tracking
TinDang97 312d148
feat(119-01): integrate GraphStats into NamedGraph and command handlers
TinDang97 d4a6915
feat(119-01): add cost-based strategy selection and hub detection to …
TinDang97 78d1dda
docs(119-01): sync .planning submodule — cost-based query planner com…
TinDang97 0109349
feat(120-01): add ShardMessage::GraphTraverse variant and graph_to_sh…
TinDang97 2523294
feat(120-01): cross-shard graph traversal coordinator and handler
TinDang97 946d4c2
docs(120-01): sync .planning submodule — cross-shard graph traversal …
TinDang97 17d4bd3
feat(121-01): graph persistence and recovery with CRC32 validation
TinDang97 0d154c0
docs(121-01): sync .planning submodule -- graph persistence and recov…
TinDang97 23c90e0
feat(122-01): add Criterion benchmarks for graph operations
TinDang97 75bab3f
fix(122-01): prevent OOM in graph benchmarks with iter_custom batching
TinDang97 73035ed
docs(122-01): sync .planning submodule for Phase 122 completion
TinDang97 6092c30
fix: resolve clippy warnings in graph module (io_other_error, is_mult…
TinDang97 d09d116
fix(122.1-01): wire graph WAL, LSN allocation, recovery, and remove d…
TinDang97 6ad624c
feat: add graph engine E2E benchmark script (scripts/bench-graph.sh)
TinDang97 322c924
feat: add maxclients connection limit
TinDang97 32276af
feat: add client idle timeout (--timeout)
TinDang97 08cb9c3
feat: add tcp-keepalive socket option (--tcp-keepalive)
TinDang97 3d8711c
feat: add AUTH failure rate limiting
TinDang97 55e2c9d
fix: resolve monoio migration value-after-move on Linux
TinDang97 95d426c
feat: add CLIENT LIST, CLIENT INFO, CLIENT KILL commands
TinDang97 10d5d53
feat: add CLIENT PAUSE / UNPAUSE commands
TinDang97 20a79d8
feat: add ACL GENPASS command
TinDang97 d095a3b
style: cargo fmt
TinDang97 d2e4dfe
docs: add CHANGELOG entry for client connection security hardening
TinDang97 e954928
fix: resolve 3 pre-existing CI failures
TinDang97 27056ec
fix: address code review findings (4 issues)
TinDang97 e817a4d
fix: address all PR review findings (round 2)
TinDang97 4e79568
fix: close monoio CLIENT ACL bypass and double-count connection metric
TinDang97 da98b76
style: cargo fmt
TinDang97 dc85a27
merge: sync with main (graph engine PR #70)
TinDang97 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,132 @@ | ||
| //! Per-IP AUTH failure rate limiting. | ||
| //! | ||
| //! Tracks failed AUTH attempts per client IP and enforces exponential backoff | ||
| //! delays to prevent brute-force attacks. Successful AUTH has zero overhead. | ||
| //! | ||
| //! Design: | ||
| //! - Global `parking_lot::Mutex<HashMap>` (not on hot path — only touched on AUTH) | ||
| //! - Exponential backoff: 100ms * 2^(failures-1), capped at 10s | ||
| //! - Auto-reset after 60s of inactivity per IP | ||
| //! - Periodic cleanup of stale entries via `cleanup_stale()` | ||
|
|
||
| use parking_lot::Mutex; | ||
| use std::collections::HashMap; | ||
| use std::net::IpAddr; | ||
| use std::sync::LazyLock; | ||
| use std::time::Instant; | ||
|
|
||
| /// Base delay for first failure (100ms). | ||
| const BASE_DELAY_MS: u64 = 100; | ||
|
|
||
| /// Maximum delay cap (10 seconds). | ||
| const MAX_DELAY_MS: u64 = 10_000; | ||
|
|
||
| /// Entries older than this are pruned (60 seconds). | ||
| const STALE_THRESHOLD_SECS: u64 = 60; | ||
|
|
||
| /// Maximum entries before forced cleanup. | ||
| const MAX_ENTRIES: usize = 10_000; | ||
|
|
||
| struct FailureRecord { | ||
| count: u32, | ||
| last_failure: Instant, | ||
| } | ||
|
|
||
| static RATE_LIMITER: LazyLock<Mutex<HashMap<IpAddr, FailureRecord>>> = | ||
| LazyLock::new(|| Mutex::new(HashMap::new())); | ||
|
|
||
| /// Record a failed AUTH attempt for the given IP. | ||
| /// Returns the delay in milliseconds that should be applied before | ||
| /// sending the error response. | ||
| pub fn record_failure(ip: IpAddr) -> u64 { | ||
| let mut map = RATE_LIMITER.lock(); | ||
|
|
||
| // Periodic cleanup when map grows too large | ||
| if map.len() >= MAX_ENTRIES { | ||
| let cutoff = Instant::now() - std::time::Duration::from_secs(STALE_THRESHOLD_SECS); | ||
| map.retain(|_, r| r.last_failure > cutoff); | ||
| } | ||
|
|
||
| let now = Instant::now(); | ||
| let record = map.entry(ip).or_insert(FailureRecord { | ||
| count: 0, | ||
| last_failure: now, | ||
| }); | ||
|
|
||
| // Reset if stale (no failures for STALE_THRESHOLD_SECS) | ||
| if now.duration_since(record.last_failure).as_secs() >= STALE_THRESHOLD_SECS { | ||
| record.count = 0; | ||
| } | ||
|
|
||
| record.count = record.count.saturating_add(1); | ||
| record.last_failure = now; | ||
|
|
||
| // Exponential backoff: 100ms * 2^(count-1), capped at 10s | ||
| let delay = BASE_DELAY_MS.saturating_mul( | ||
| 1u64.checked_shl(record.count.saturating_sub(1)) | ||
| .unwrap_or(u64::MAX), | ||
| ); | ||
| delay.min(MAX_DELAY_MS) | ||
| } | ||
|
|
||
| /// Clear failure record on successful AUTH (reset for this IP). | ||
| pub fn record_success(ip: IpAddr) { | ||
| let mut map = RATE_LIMITER.lock(); | ||
| map.remove(&ip); | ||
| } | ||
|
|
||
| #[cfg(test)] | ||
| mod tests { | ||
| use super::*; | ||
| use std::net::Ipv4Addr; | ||
|
|
||
| #[test] | ||
| fn test_first_failure_returns_base_delay() { | ||
| let ip = IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)); | ||
| // Clean up from any prior test state | ||
| record_success(ip); | ||
| let delay = record_failure(ip); | ||
| assert_eq!(delay, 100); | ||
| // Cleanup | ||
| record_success(ip); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_exponential_backoff() { | ||
| let ip = IpAddr::V4(Ipv4Addr::new(192, 168, 1, 101)); | ||
| record_success(ip); | ||
| assert_eq!(record_failure(ip), 100); // 100 * 2^0 | ||
| assert_eq!(record_failure(ip), 200); // 100 * 2^1 | ||
| assert_eq!(record_failure(ip), 400); // 100 * 2^2 | ||
| assert_eq!(record_failure(ip), 800); // 100 * 2^3 | ||
| assert_eq!(record_failure(ip), 1600); // 100 * 2^4 | ||
| record_success(ip); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_max_delay_cap() { | ||
| let ip = IpAddr::V4(Ipv4Addr::new(192, 168, 1, 102)); | ||
| record_success(ip); | ||
| // 7 failures: 100, 200, 400, 800, 1600, 3200, 6400 | ||
| for _ in 0..7 { | ||
| record_failure(ip); | ||
| } | ||
| // 8th failure should be capped at 10000 | ||
| let delay = record_failure(ip); | ||
| assert!(delay <= MAX_DELAY_MS); | ||
| record_success(ip); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_success_clears_record() { | ||
| let ip = IpAddr::V4(Ipv4Addr::new(192, 168, 1, 103)); | ||
| record_success(ip); | ||
| record_failure(ip); | ||
| record_failure(ip); | ||
| record_success(ip); | ||
| // After success, next failure should be back to base | ||
| let delay = record_failure(ip); | ||
| assert_eq!(delay, 100); | ||
| record_success(ip); | ||
| } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,104 @@ | ||
| //! CLIENT PAUSE / UNPAUSE global state. | ||
| //! | ||
| //! When paused, command processing is delayed for all clients until the pause | ||
| //! expires or CLIENT UNPAUSE is called. Supports two modes: | ||
| //! - ALL: pause all commands | ||
| //! - WRITE: pause only write commands (reads still served) | ||
|
|
||
| use parking_lot::RwLock; | ||
| use std::sync::LazyLock; | ||
| use std::time::Instant; | ||
|
|
||
| #[derive(Clone, Copy, PartialEq, Eq)] | ||
| pub enum PauseMode { | ||
| All, | ||
| Write, | ||
| } | ||
|
|
||
| struct PauseState { | ||
| active: bool, | ||
| mode: PauseMode, | ||
| until: Instant, | ||
| } | ||
|
|
||
| static PAUSE: LazyLock<RwLock<PauseState>> = LazyLock::new(|| { | ||
| RwLock::new(PauseState { | ||
| active: false, | ||
| mode: PauseMode::All, | ||
| until: Instant::now(), | ||
| }) | ||
| }); | ||
|
|
||
| /// Activate pause for the given duration and mode. | ||
| pub fn pause(duration_ms: u64, mode: PauseMode) { | ||
| let mut state = PAUSE.write(); | ||
| state.active = true; | ||
| state.mode = mode; | ||
| state.until = Instant::now() + std::time::Duration::from_millis(duration_ms); | ||
| } | ||
|
|
||
| /// Deactivate pause immediately. | ||
| pub fn unpause() { | ||
| let mut state = PAUSE.write(); | ||
| state.active = false; | ||
| } | ||
|
|
||
| /// Check if the server is currently paused. Returns the remaining duration | ||
| /// if paused, or None if not paused. Auto-expires when the deadline passes. | ||
| pub fn check_pause(is_write: bool) -> Option<std::time::Duration> { | ||
| let state = PAUSE.read(); | ||
| if !state.active { | ||
| return None; | ||
| } | ||
| let now = Instant::now(); | ||
| if now >= state.until { | ||
| // Expired — will be cleaned up on next write access | ||
| return None; | ||
| } | ||
| // In WRITE mode, only pause write commands | ||
| if state.mode == PauseMode::Write && !is_write { | ||
| return None; | ||
| } | ||
| Some(state.until - now) | ||
| } | ||
|
|
||
| /// Clean up expired pause state (called periodically from handlers). | ||
| /// Takes a write lock to avoid TOCTOU between expiry check and clear. | ||
| pub fn expire_if_needed() { | ||
| let mut state = PAUSE.write(); | ||
| if state.active && Instant::now() >= state.until { | ||
| state.active = false; | ||
| } | ||
| } | ||
|
|
||
| #[cfg(test)] | ||
| mod tests { | ||
| use super::*; | ||
|
|
||
| // Tests share global state — run sequentially via a mutex | ||
| static TEST_LOCK: std::sync::Mutex<()> = std::sync::Mutex::new(()); | ||
|
|
||
| #[test] | ||
| fn test_pause_and_check() { | ||
| let _lock = TEST_LOCK.lock(); | ||
| unpause(); | ||
| assert!(check_pause(true).is_none()); | ||
|
|
||
| pause(5000, PauseMode::All); | ||
| assert!(check_pause(true).is_some()); | ||
| assert!(check_pause(false).is_some()); | ||
|
|
||
| unpause(); | ||
| assert!(check_pause(true).is_none()); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_write_mode_allows_reads() { | ||
| let _lock = TEST_LOCK.lock(); | ||
| unpause(); | ||
| pause(5000, PauseMode::Write); | ||
| assert!(check_pause(true).is_some()); // write blocked | ||
| assert!(check_pause(false).is_none()); // read allowed | ||
| unpause(); | ||
| } | ||
| } | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.