Skip to content

Commit 72b928f

Browse files
committed
feat(message_bus): async fire-and-forget transport for VSR consensus
The previous cache/connection.rs was a single-stream blob that held a RefCell<TcpStream> across .await, awaited kernel write completion in the send path, and serialized fan-out through one shared lock. Under VSR pipelining a slow peer stalled sends to every other peer in the same dispatch round, killing parallel quorum collection. Reentrant sends to the same peer would also panic on BorrowMutError. Rebuilt the crate around a per-connection writer task that drains a bounded mpsc and submits batches via a single write_vectored_all syscall, with an independent read half handled by a dedicated reader task. send_to_* is now sync try_send under the async fn shell: zero awaits, returns SendError::Backpressure on full so VSR can recover via WAL retransmission or view-change timeouts. The lifecycle module owns a root Shutdown / ShutdownToken plus a ConnectionRegistry<K> that tracks the per-peer Sender and both task handles for graceful drain. The directional rule (lower id dials, higher id accepts) eliminates the dialed- both-ways race without a tiebreaker. Message::into_frozen() removes the per-send memcpy. Socket split and lifecycle plumbing (C1, C2, Cx2) Swapped the dup(2)-based socket_split module for compio's native TcpStream::into_split(), deleting ~50 lines of unsafe, three dup_stream copies, and an entire module file. Reader and writer tasks now take OwnedReadHalf / OwnedWriteHalf directly. The writer task, on write_vectored_all error, removes its own registry entry so a stale Sender cannot accept further sends; on graceful shutdown path it defers removal to the root drain so DrainOutcome.clean counts both halves. The reader self-remove path went through a new ConnectionRegistry::close_peer helper that orders "close sender, await writer, drop reader handle" to prevent a mid-writev cancellation landing a truncated frame. All listeners + the outbound connector apply TCP SO_KEEPALIVE via socket2 (TCP_KEEPIDLE=10s, TCP_KEEPINTVL=5s, TCP_KEEPCNT=3) so a silently dead peer is detected within ~25s, well inside the VSR view-change window. Shutdown loop-drain (C3, N5) track_background is back to an unconditional push. shutdown now loop-drains the background-task vec until empty, so a task pushed mid-shutdown (e.g. a reader that observed the token and registered its own cleanup) is still awaited. DrainOutcome gains background_clean / background_force fields to make the bg count observable. Cluster config wedge fixes (C5, C6) replica_id is now Option<u8> on both CurrentNodeConfig and OtherNodeConfig. Startup-time ClusterConfig::validate rejects missing ids, duplicate ids, and ids >= total replica count so a misconfigured cluster cannot wedge into a permanent view change on boot. CurrentNodeConfig gained the TransportPorts field that was already present on OtherNodeConfig; the bus now has a place to read its own tcp_replica bind port from. Per-connection body-byte limiter (C7) A new FrameRateLimiter (token bucket, 32 MiB/s sustained, 256 MiB burst by default) gates the body-read allocation in framing::read_message. A peer claiming a valid 64 MiB body goes through once then has to wait seconds before the next burst; the hard ceiling on MAX_MESSAGE_SIZE stays as-is. Zero-copy framing (W1, W3) framing::read_message was allocating three Vecs and copying the header + body twice to reassemble the final Owned. It is now a single Owned<MESSAGE_ALIGN>::with_capacity(HEADER_SIZE) handed straight to compio's read_exact, grown in-place via reserve_exact after the size field is parsed, then a second read_exact into owned.slice(HEADER_SIZE..total_size). The backing AVec's buffer is reused across both reads: one alloc (with at most one in-place realloc) and zero memcpys of the data. iggy_binary_protocol's Owned<ALIGN> now implements IoBuf / IoBufMut / SetLen so compio can drive the read directly. A compile-time static assert guards the hardcoded offset_of!(GenericHeader, size) == 48 the reader relies on. Router blocking-send fix (C8) core/shard/src/router.rs was calling the blocking crossfire::MTx::send from the compio reactor, which could park the io_uring thread and stall every other connection on the shard when an inbox filled up. Both dispatch and dispatch_request now use try_send and log-drop on Full / Disconnected; consensus recovers via WAL retransmit or view change. Misc (Cx1, W4, W6, W7, W8, W9, W10, N3, N4) - connector.rs's dead is_err branch on insert() is replaced with an expect() documenting the directional-rule invariant. - SendError::Io is removed (no constructor anywhere). - replica_listener doc adds an explicit trusted-network-only warning for the handshake (no mTLS / shared secret). - writer_task MAX_BATCH doc drops the "tunable per-deployment" claim. - Duplicate client id insert is now unreachable!() with a helpful diagnostic. - chain-replication failure paths use structured tracing::warn!(error = ?e, ...) so oncall can grep the SendError variant. - MessageBus trait doc advertises the no-yield property of send_to_* in the production impl. - writer_task's license-header "czpressed" typo fixed. - transports/mod.rs 40-line sketch compressed to a tracking reference for IGGY-112. Deferred with TODO(hubcio) - Chain-replicate-before-journal-append ordering (C4) is flagged at both metadata and partitions sites; an ordering decision issue is needed, not a patch in this PR. - Fan-out deep_copy (W2) in plane_helpers and the three shard send sites is flagged; requires a trait-level change to MessageBus::send_to_* taking Frozen<MESSAGE_ALIGN> so the primary freezes once and fan-out is refcount bumps. Scope Consensus, metadata, partitions, shard, and simulator are updated to the new MessageBus shape; core/server is left untouched as legacy code. MESSAGE_ALIGN promoted to pub in iggy_binary_protocol::consensus is consistent with the existing Message<GenericHeader> leak through the MessageBus trait and is accepted for this PR (N8). Follow-up issues to file: - W2 MessageBus::send_to_*(.., Frozen<MESSAGE_ALIGN>) trait- level fan-out fix (benchmark-driven) - W5 SendError::Backpressure(Message) carrying payload back to the caller (benchmark-driven) - W11 SimOutbox bounded mode (simulator coverage) - N6 ConnectionRegistry<u8> -> Vec<Option<Entry>> micro-opt - N7 drain parallel via FuturesUnordered - C4 VSR chain-replicate-vs-append ordering decision - IGGY-112 Transport trait family + mTLS handshake
1 parent f5350d9 commit 72b928f

42 files changed

Lines changed: 3623 additions & 691 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

Cargo.lock

Lines changed: 61 additions & 55 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

DEPENDENCIES.md

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ capacity_builder: 0.5.0, "MIT",
147147
capacity_builder_macros: 0.3.0, "MIT",
148148
cargo-platform: 0.3.2, "Apache-2.0 OR MIT",
149149
cargo_metadata: 0.23.1, "MIT",
150-
cc: 1.2.60, "Apache-2.0 OR MIT",
150+
cc: 1.2.59, "Apache-2.0 OR MIT",
151151
cesu8: 1.1.0, "Apache-2.0 OR MIT",
152152
cexpr: 0.6.0, "Apache-2.0 OR MIT",
153153
cfg-if: 1.0.4, "Apache-2.0 OR MIT",
@@ -451,7 +451,7 @@ hwlocality-sys: 0.7.0, "MIT",
451451
hybrid-array: 0.4.10, "Apache-2.0 OR MIT",
452452
hyper: 1.9.0, "MIT",
453453
hyper-named-pipe: 0.1.0, "Apache-2.0",
454-
hyper-rustls: 0.27.8, "Apache-2.0 OR ISC OR MIT",
454+
hyper-rustls: 0.27.7, "Apache-2.0 OR ISC OR MIT",
455455
hyper-timeout: 0.5.2, "Apache-2.0 OR MIT",
456456
hyper-util: 0.1.20, "MIT",
457457
hyperlocal: 0.9.1, "MIT",
@@ -532,7 +532,7 @@ jni-sys: 0.4.1, "Apache-2.0 OR MIT",
532532
jni-sys-macros: 0.4.1, "Apache-2.0 OR MIT",
533533
jobserver: 0.1.34, "Apache-2.0 OR MIT",
534534
journal: 0.1.0, "Apache-2.0",
535-
js-sys: 0.3.95, "Apache-2.0 OR MIT",
535+
js-sys: 0.3.94, "Apache-2.0 OR MIT",
536536
jsonwebtoken: 10.3.0, "MIT",
537537
jwalk: 0.8.1, "MIT",
538538
keccak: 0.2.0, "Apache-2.0 OR MIT",
@@ -654,11 +654,11 @@ once_cell: 1.21.4, "Apache-2.0 OR MIT",
654654
once_cell_polyfill: 1.70.2, "Apache-2.0 OR MIT",
655655
opaque-debug: 0.3.1, "Apache-2.0 OR MIT",
656656
opendal: 0.55.0, "Apache-2.0",
657-
openssl: 0.10.77, "Apache-2.0",
657+
openssl: 0.10.76, "Apache-2.0",
658658
openssl-macros: 0.1.1, "Apache-2.0 OR MIT",
659659
openssl-probe: 0.2.1, "Apache-2.0 OR MIT",
660660
openssl-src: 300.6.0+3.6.2, "Apache-2.0 OR MIT",
661-
openssl-sys: 0.9.113, "MIT",
661+
openssl-sys: 0.9.112, "MIT",
662662
opentelemetry: 0.31.0, "Apache-2.0",
663663
opentelemetry-appender-tracing: 0.31.1, "Apache-2.0",
664664
opentelemetry-http: 0.31.0, "Apache-2.0",
@@ -708,7 +708,7 @@ pin-utils: 0.1.0, "Apache-2.0 OR MIT",
708708
pinned: 0.1.0, "Apache-2.0 OR MIT",
709709
pkcs1: 0.7.5, "Apache-2.0 OR MIT",
710710
pkcs8: 0.10.2, "Apache-2.0 OR MIT",
711-
pkg-config: 0.3.33, "Apache-2.0 OR MIT",
711+
pkg-config: 0.3.32, "Apache-2.0 OR MIT",
712712
plain: 0.2.3, "Apache-2.0 OR MIT",
713713
png: 0.17.16, "Apache-2.0 OR MIT",
714714
png: 0.18.1, "Apache-2.0 OR MIT",
@@ -761,13 +761,13 @@ r-efi: 5.3.0, "Apache-2.0 OR LGPL-2.1-or-later OR MIT",
761761
r-efi: 6.0.0, "Apache-2.0 OR LGPL-2.1-or-later OR MIT",
762762
radium: 0.7.0, "MIT",
763763
rand: 0.8.5, "Apache-2.0 OR MIT",
764-
rand: 0.9.4, "Apache-2.0 OR MIT",
764+
rand: 0.9.2, "Apache-2.0 OR MIT",
765765
rand: 0.10.1, "Apache-2.0 OR MIT",
766766
rand_chacha: 0.3.1, "Apache-2.0 OR MIT",
767767
rand_chacha: 0.9.0, "Apache-2.0 OR MIT",
768768
rand_core: 0.6.4, "Apache-2.0 OR MIT",
769769
rand_core: 0.9.5, "Apache-2.0 OR MIT",
770-
rand_core: 0.10.1, "Apache-2.0 OR MIT",
770+
rand_core: 0.10.0, "Apache-2.0 OR MIT",
771771
rand_xoshiro: 0.8.0, "Apache-2.0 OR MIT",
772772
rav1e: 0.8.1, "BSD-2-Clause",
773773
ravif: 0.13.0, "BSD-3-Clause",
@@ -826,7 +826,7 @@ rustls-pemfile: 2.2.0, "Apache-2.0 OR ISC OR MIT",
826826
rustls-pki-types: 1.14.0, "Apache-2.0 OR MIT",
827827
rustls-platform-verifier: 0.6.2, "Apache-2.0 OR MIT",
828828
rustls-platform-verifier-android: 0.1.1, "Apache-2.0 OR MIT",
829-
rustls-webpki: 0.103.11, "ISC",
829+
rustls-webpki: 0.103.10, "ISC",
830830
rustversion: 1.0.22, "Apache-2.0 OR MIT",
831831
rustybuzz: 0.20.1, "MIT",
832832
ryu: 1.0.23, "Apache-2.0 OR BSL-1.0",
@@ -907,7 +907,7 @@ sqlx-macros-core: 0.8.6, "Apache-2.0 OR MIT",
907907
sqlx-mysql: 0.8.6, "Apache-2.0 OR MIT",
908908
sqlx-postgres: 0.8.6, "Apache-2.0 OR MIT",
909909
sqlx-sqlite: 0.8.6, "Apache-2.0 OR MIT",
910-
sse-stream: 0.2.2, "Apache-2.0 OR MIT",
910+
sse-stream: 0.2.1, "Apache-2.0 OR MIT",
911911
stable_deref_trait: 1.2.1, "Apache-2.0 OR MIT",
912912
static-toml: 1.3.0, "MIT",
913913
static_assertions: 1.1.0, "Apache-2.0 OR MIT",
@@ -1063,19 +1063,19 @@ wasi: 0.11.1+wasi-snapshot-preview1, "Apache-2.0 OR Apache-2.0 WITH LLVM-excepti
10631063
wasip2: 1.0.2+wasi-0.2.9, "Apache-2.0 OR Apache-2.0 WITH LLVM-exception OR MIT",
10641064
wasip3: 0.4.0+wasi-0.3.0-rc-2026-01-06, "Apache-2.0 OR Apache-2.0 WITH LLVM-exception OR MIT",
10651065
wasite: 0.1.0, "Apache-2.0 OR BSL-1.0 OR MIT",
1066-
wasm-bindgen: 0.2.118, "Apache-2.0 OR MIT",
1067-
wasm-bindgen-futures: 0.4.68, "Apache-2.0 OR MIT",
1068-
wasm-bindgen-macro: 0.2.118, "Apache-2.0 OR MIT",
1069-
wasm-bindgen-macro-support: 0.2.118, "Apache-2.0 OR MIT",
1070-
wasm-bindgen-shared: 0.2.118, "Apache-2.0 OR MIT",
1066+
wasm-bindgen: 0.2.117, "Apache-2.0 OR MIT",
1067+
wasm-bindgen-futures: 0.4.67, "Apache-2.0 OR MIT",
1068+
wasm-bindgen-macro: 0.2.117, "Apache-2.0 OR MIT",
1069+
wasm-bindgen-macro-support: 0.2.117, "Apache-2.0 OR MIT",
1070+
wasm-bindgen-shared: 0.2.117, "Apache-2.0 OR MIT",
10711071
wasm-encoder: 0.244.0, "Apache-2.0 OR Apache-2.0 WITH LLVM-exception OR MIT",
10721072
wasm-metadata: 0.244.0, "Apache-2.0 OR Apache-2.0 WITH LLVM-exception OR MIT",
10731073
wasm-streams: 0.4.2, "Apache-2.0 OR MIT",
10741074
wasm-streams: 0.5.0, "Apache-2.0 OR MIT",
10751075
wasm_dep_analyzer: 0.3.0, "MIT",
10761076
wasmparser: 0.244.0, "Apache-2.0 OR Apache-2.0 WITH LLVM-exception OR MIT",
10771077
wasmtimer: 0.4.3, "MIT",
1078-
web-sys: 0.3.95, "Apache-2.0 OR MIT",
1078+
web-sys: 0.3.94, "Apache-2.0 OR MIT",
10791079
web-time: 1.1.0, "Apache-2.0 OR MIT",
10801080
webpki-root-certs: 1.0.6, "CDLA-Permissive-2.0",
10811081
webpki-roots: 0.26.11, "CDLA-Permissive-2.0",

core/binary_protocol/src/consensus/iobuf.rs

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,14 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use std::mem::MaybeUninit;
1819
use std::ops::{Deref, DerefMut, RangeBounds};
1920
use std::ptr::NonNull;
2021
use std::slice;
2122
use std::sync::atomic::{AtomicUsize, Ordering, fence};
2223

2324
use aligned_vec::{AVec, ConstAlign};
24-
use compio_buf::IoBuf;
25+
use compio_buf::{IoBuf, IoBufMut, ReserveError, ReserveExactError, SetLen};
2526

2627
#[derive(Debug, Clone)]
2728
pub struct Owned<const ALIGN: usize = 4096> {
@@ -91,6 +92,16 @@ impl<const ALIGN: usize> Owned<ALIGN> {
9192
Self { inner }
9293
}
9394

95+
/// Allocate a buffer with `capacity` reserved bytes and `len == 0`.
96+
/// Intended for use with compio's [`IoBufMut`] read path: the caller
97+
/// hands the `Owned` to `read_exact`, which fills it in-place and
98+
/// advances the length via [`SetLen::set_len`].
99+
pub fn with_capacity(capacity: usize) -> Self {
100+
Self {
101+
inner: AVec::with_capacity(ALIGN, capacity),
102+
}
103+
}
104+
94105
pub fn copy_from_slice(data: &[u8]) -> Self {
95106
let mut inner: AVec<u8, ConstAlign<ALIGN>> = AVec::new(ALIGN);
96107
inner.extend_from_slice(data);
@@ -133,6 +144,54 @@ impl<const ALIGN: usize> Owned<ALIGN> {
133144
}
134145
}
135146

147+
// SAFETY contract for the compio IO buf impls below:
148+
// * `as_init` returns the initialized prefix (len bytes).
149+
// * `as_uninit` returns the full allocation up to capacity; compio writes
150+
// bytes into it during reads and THEN calls `SetLen::set_len` to advance
151+
// the initialized region, which is exactly what `AVec::set_len` expects.
152+
// * `buf_capacity` is the AVec capacity, not the initialized length.
153+
impl<const ALIGN: usize> IoBuf for Owned<ALIGN> {
154+
fn as_init(&self) -> &[u8] {
155+
self.inner.as_slice()
156+
}
157+
}
158+
159+
impl<const ALIGN: usize> IoBufMut for Owned<ALIGN> {
160+
fn as_uninit(&mut self) -> &mut [MaybeUninit<u8>] {
161+
let cap = self.inner.capacity();
162+
let ptr = self.inner.as_mut_ptr().cast::<MaybeUninit<u8>>();
163+
unsafe { slice::from_raw_parts_mut(ptr, cap) }
164+
}
165+
166+
fn buf_capacity(&mut self) -> usize {
167+
self.inner.capacity()
168+
}
169+
170+
fn reserve(&mut self, len: usize) -> Result<(), ReserveError> {
171+
// `reserve(additional)` contract: capacity >= current_len + additional
172+
// after the call. `AVec::reserve` matches this contract exactly.
173+
if self.inner.capacity() - self.inner.len() >= len {
174+
return Ok(());
175+
}
176+
self.inner.reserve(len);
177+
Ok(())
178+
}
179+
180+
fn reserve_exact(&mut self, len: usize) -> Result<(), ReserveExactError> {
181+
if self.inner.capacity() - self.inner.len() >= len {
182+
return Ok(());
183+
}
184+
self.inner.reserve_exact(len);
185+
Ok(())
186+
}
187+
}
188+
189+
impl<const ALIGN: usize> SetLen for Owned<ALIGN> {
190+
unsafe fn set_len(&mut self, len: usize) {
191+
unsafe { self.inner.set_len(len) };
192+
}
193+
}
194+
136195
impl<const ALIGN: usize> Prefix<ALIGN> {
137196
pub fn copy_from_slice(src: &[u8]) -> Self {
138197
Self {

core/binary_protocol/src/consensus/message.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use crate::consensus::{
2424
use smallvec::SmallVec;
2525
use std::{marker::PhantomData, mem::size_of};
2626

27-
const MESSAGE_ALIGN: usize = 4096;
27+
pub const MESSAGE_ALIGN: usize = 4096;
2828

2929
pub trait MessageBacking<H>
3030
where

core/binary_protocol/src/consensus/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ pub use header::{
6666
PrepareOkHeader, ReplyHeader, RequestHeader, StartViewChangeHeader, StartViewHeader,
6767
};
6868
pub use message::{
69-
ConsensusMessage, FragmentedBacking, Message, MessageBacking, MessageBag, MutableBacking,
70-
RequestBacking, RequestBackingKind, ResponseBacking, ResponseBackingKind,
69+
ConsensusMessage, FragmentedBacking, MESSAGE_ALIGN, Message, MessageBacking, MessageBag,
70+
MutableBacking, RequestBacking, RequestBackingKind, ResponseBacking, ResponseBackingKind,
7171
};
7272
pub use operation::Operation;

core/configs/src/server_config/cluster.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,28 @@ pub struct NodeConfig {
3636
pub struct CurrentNodeConfig {
3737
pub name: String,
3838
pub ip: String,
39+
/// Explicit numeric replica ID for VSR consensus (0-based).
40+
///
41+
/// Required when `cluster.enabled` is true. `None` is a configuration
42+
/// error caught by [`ClusterConfig::validate`]; a silent default of 0
43+
/// would wedge the cluster into a permanent view change as soon as a
44+
/// second replica joined.
45+
pub replica_id: Option<u8>,
46+
/// Bind ports for the current node. Symmetric with [`OtherNodeConfig`]
47+
/// so the consensus listener can pick a dedicated `tcp_replica` port
48+
/// without borrowing it from the client-facing `tcp.address`.
49+
#[serde(default)]
50+
pub ports: TransportPorts,
3951
}
4052

4153
#[derive(Debug, Deserialize, Serialize, Clone, ConfigEnv)]
4254
pub struct OtherNodeConfig {
4355
pub name: String,
4456
pub ip: String,
57+
/// Explicit numeric replica ID for VSR consensus (0-based).
58+
///
59+
/// Required when `cluster.enabled` is true. See [`CurrentNodeConfig::replica_id`].
60+
pub replica_id: Option<u8>,
4561
pub ports: TransportPorts,
4662
}
4763

@@ -51,4 +67,6 @@ pub struct TransportPorts {
5167
pub quic: Option<u16>,
5268
pub http: Option<u16>,
5369
pub websocket: Option<u16>,
70+
/// Dedicated port for replica-to-replica consensus traffic.
71+
pub tcp_replica: Option<u16>,
5472
}

core/configs/src/server_config/defaults.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -583,6 +583,8 @@ impl Default for NodeConfig {
583583
current: CurrentNodeConfig {
584584
name: SERVER_CONFIG.cluster.node.current.name.parse().unwrap(),
585585
ip: SERVER_CONFIG.cluster.node.current.ip.parse().unwrap(),
586+
replica_id: Some(SERVER_CONFIG.cluster.node.current.replica_id as u8),
587+
ports: TransportPorts::default(),
586588
},
587589
others: SERVER_CONFIG
588590
.cluster
@@ -592,11 +594,13 @@ impl Default for NodeConfig {
592594
.map(|other| OtherNodeConfig {
593595
name: other.name.parse().unwrap(),
594596
ip: other.ip.parse().unwrap(),
597+
replica_id: Some(other.replica_id as u8),
595598
ports: TransportPorts {
596599
tcp: Some(other.ports.tcp as u16),
597600
quic: Some(other.ports.quic as u16),
598601
http: Some(other.ports.http as u16),
599602
websocket: Some(other.ports.websocket as u16),
603+
tcp_replica: Some(other.ports.tcp_replica as u16),
600604
},
601605
})
602606
.collect(),

0 commit comments

Comments
 (0)