Skip to content

feat(message_bus): implement async fire-and-forget transport for VSR consensus#3134

Open
hubcio wants to merge 1 commit intomasterfrom
bus
Open

feat(message_bus): implement async fire-and-forget transport for VSR consensus#3134
hubcio wants to merge 1 commit intomasterfrom
bus

Conversation

@hubcio
Copy link
Copy Markdown
Contributor

@hubcio hubcio commented Apr 15, 2026

The previous cache/connection.rs was a single-stream blob that
held a RefCell across .await, awaited kernel write
completion in the send path, and serialized fan-out through one
shared lock. Under VSR pipelining a slow peer stalled sends to
every other peer in the same dispatch round, killing parallel
quorum collection. Reentrant sends to the same peer would also
panic on BorrowMutError.

Rebuilt the crate around a per-connection writer task that
drains a bounded async-channel mpsc and submits batches via a
single write_vectored_all syscall, with the dup(2)-split read
half handled by an independent reader task. send_to_* is now
sync try_send under the async fn shell: zero awaits, returns
SendError::Backpressure on full so VSR can recover via WAL
retransmission or view-change timeouts.

The lifecycle module owns a root Shutdown / ShutdownToken plus
a ConnectionRegistry that tracks the per-peer Sender and
both task handles for graceful drain. The directional rule
(lower id dials, higher id accepts) eliminates the dialed-
both-ways race without a tiebreaker. TCP_NODELAY is set on
every socket, and Message::into_frozen() removes the per-send
memcpy. Three new integration tests prove the architectural
properties: backpressure, vectored batching, head-of-line
freedom under a saturated peer.

track_background early-returns when the shutdown token is
already triggered: shutdown drains the background-task vec
exactly once, so a handle pushed after the drain would be
leaked. Tasks spawned post-shutdown have already observed the
cancellation by the time they reach the tracker, so dropping
the handle is the right thing.

The compio stream framing helpers live in message_bus::framing
(formerly message_bus::codec, renamed to avoid the collision
with binary_protocol::codec, which is the sans-IO WireEncode /
WireDecode trait module). MESSAGE_ALIGN is no longer duplicated:
it is promoted to pub in iggy_binary_protocol::consensus and the
trivial message_to_frozen wrapper is dropped in favour of calling
Message::into_frozen() at the writer-task call sites directly.

Consensus, metadata, partitions, shard, and simulator are
updated to the new MessageBus shape; core/server is left
untouched as legacy code. Cluster config gains the per-node
fields the bus needs to identify replicas (replica_id,
tcp_replica port).

@hubcio hubcio changed the title feat(message_bus): async fire-and-forget transport for VSR consensus feat(message_bus): implement async fire-and-forget transport for VSR consensus Apr 15, 2026
@codecov
Copy link
Copy Markdown

codecov bot commented Apr 15, 2026

Codecov Report

❌ Patch coverage is 82.54848% with 189 lines in your changes missing coverage. Please review.
✅ Project coverage is 69.98%. Comparing base (f5350d9) to head (72b928f).

Files with missing lines Patch % Lines
core/message_bus/src/replica_listener.rs 79.67% 19 Missing and 6 partials ⚠️
core/message_bus/src/replica_io.rs 63.33% 21 Missing and 1 partial ⚠️
core/message_bus/src/client_listener.rs 80.85% 13 Missing and 5 partials ⚠️
core/message_bus/src/connector.rs 86.56% 13 Missing and 5 partials ⚠️
core/binary_protocol/src/consensus/iobuf.rs 57.57% 14 Missing ⚠️
core/message_bus/src/framing.rs 82.71% 11 Missing and 3 partials ⚠️
core/message_bus/src/lib.rs 87.27% 9 Missing and 5 partials ⚠️
...e/message_bus/src/lifecycle/connection_registry.rs 91.61% 13 Missing and 1 partial ⚠️
core/shard/src/router.rs 0.00% 10 Missing ⚠️
core/consensus/src/plane_helpers.rs 74.19% 8 Missing ⚠️
... and 8 more
Additional details and impacted files
@@             Coverage Diff              @@
##             master    #3134      +/-   ##
============================================
- Coverage     72.76%   69.98%   -2.78%     
  Complexity      943      943              
============================================
  Files          1117     1125       +8     
  Lines         96368    97116     +748     
  Branches      73544    74308     +764     
============================================
- Hits          70119    67964    -2155     
- Misses        23702    26762    +3060     
+ Partials       2547     2390     -157     
Components Coverage Δ
Rust Core 73.90% <82.54%> (+0.40%) ⬆️
Java SDK 62.30% <ø> (ø)
C# SDK 15.34% <ø> (-54.09%) ⬇️
Python SDK 81.43% <ø> (ø)
Node SDK 91.40% <ø> (-0.13%) ⬇️
Go SDK 39.41% <ø> (ø)
Files with missing lines Coverage Δ
core/binary_protocol/src/consensus/message.rs 49.45% <ø> (ø)
core/message_bus/src/socket_opts.rs 100.00% <100.00%> (ø)
core/shard/src/lib.rs 86.90% <100.00%> (-0.10%) ⬇️
core/simulator/src/lib.rs 83.87% <100.00%> (ø)
core/message_bus/src/lifecycle/shutdown.rs 98.55% <98.55%> (ø)
core/consensus/src/impls.rs 66.99% <33.33%> (-0.36%) ⬇️
core/partitions/src/iggy_partitions.rs 31.57% <50.00%> (-0.11%) ⬇️
core/configs/src/server_config/defaults.rs 0.00% <0.00%> (ø)
core/message_bus/src/writer_task.rs 84.84% <84.84%> (ø)
core/simulator/src/bus.rs 84.72% <80.00%> (+11.89%) ⬆️
... and 12 more

... and 99 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

The previous cache/connection.rs was a single-stream blob that
held a RefCell<TcpStream> across .await, awaited kernel write
completion in the send path, and serialized fan-out through one
shared lock. Under VSR pipelining a slow peer stalled sends to
every other peer in the same dispatch round, killing parallel
quorum collection. Reentrant sends to the same peer would also
panic on BorrowMutError.

Rebuilt the crate around a per-connection writer task that
drains a bounded mpsc and submits batches via a single
write_vectored_all syscall, with an independent read half
handled by a dedicated reader task. send_to_* is now sync
try_send under the async fn shell: zero awaits, returns
SendError::Backpressure on full so VSR can recover via WAL
retransmission or view-change timeouts.

The lifecycle module owns a root Shutdown / ShutdownToken plus
a ConnectionRegistry<K> that tracks the per-peer Sender and
both task handles for graceful drain. The directional rule
(lower id dials, higher id accepts) eliminates the dialed-
both-ways race without a tiebreaker. Message::into_frozen()
removes the per-send memcpy.

Socket split and lifecycle plumbing (C1, C2, Cx2)

Swapped the dup(2)-based socket_split module for compio's
native TcpStream::into_split(), deleting ~50 lines of unsafe,
three dup_stream copies, and an entire module file. Reader and
writer tasks now take OwnedReadHalf / OwnedWriteHalf directly.
The writer task, on write_vectored_all error, removes its own
registry entry so a stale Sender cannot accept further sends;
on graceful shutdown path it defers removal to the root drain
so DrainOutcome.clean counts both halves. The reader self-remove
path went through a new ConnectionRegistry::close_peer helper
that orders "close sender, await writer, drop reader handle"
to prevent a mid-writev cancellation landing a truncated frame.
All listeners + the outbound connector apply TCP SO_KEEPALIVE
via socket2 (TCP_KEEPIDLE=10s, TCP_KEEPINTVL=5s, TCP_KEEPCNT=3)
so a silently dead peer is detected within ~25s, well inside
the VSR view-change window.

Shutdown loop-drain (C3, N5)

track_background is back to an unconditional push. shutdown
now loop-drains the background-task vec until empty, so a
task pushed mid-shutdown (e.g. a reader that observed the
token and registered its own cleanup) is still awaited.
DrainOutcome gains background_clean / background_force fields
to make the bg count observable.

Cluster config wedge fixes (C5, C6)

replica_id is now Option<u8> on both CurrentNodeConfig and
OtherNodeConfig. Startup-time ClusterConfig::validate rejects
missing ids, duplicate ids, and ids >= total replica count
so a misconfigured cluster cannot wedge into a permanent view
change on boot. CurrentNodeConfig gained the TransportPorts
field that was already present on OtherNodeConfig; the bus
now has a place to read its own tcp_replica bind port from.

Per-connection body-byte limiter (C7)

A new FrameRateLimiter (token bucket, 32 MiB/s sustained, 256
MiB burst by default) gates the body-read allocation in
framing::read_message. A peer claiming a valid 64 MiB body
goes through once then has to wait seconds before the next
burst; the hard ceiling on MAX_MESSAGE_SIZE stays as-is.

Zero-copy framing (W1, W3)

framing::read_message was allocating three Vecs and copying
the header + body twice to reassemble the final Owned. It is
now a single Owned<MESSAGE_ALIGN>::with_capacity(HEADER_SIZE)
handed straight to compio's read_exact, grown in-place via
reserve_exact after the size field is parsed, then a second
read_exact into owned.slice(HEADER_SIZE..total_size). The
backing AVec's buffer is reused across both reads: one alloc
(with at most one in-place realloc) and zero memcpys of the
data. iggy_binary_protocol's Owned<ALIGN> now implements
IoBuf / IoBufMut / SetLen so compio can drive the read
directly. A compile-time static assert guards the hardcoded
offset_of!(GenericHeader, size) == 48 the reader relies on.

Router blocking-send fix (C8)

core/shard/src/router.rs was calling the blocking
crossfire::MTx::send from the compio reactor, which could park
the io_uring thread and stall every other connection on the
shard when an inbox filled up. Both dispatch and dispatch_request
now use try_send and log-drop on Full / Disconnected; consensus
recovers via WAL retransmit or view change.

Misc (Cx1, W4, W6, W7, W8, W9, W10, N3, N4)

- connector.rs's dead is_err branch on insert() is replaced
  with an expect() documenting the directional-rule invariant.
- SendError::Io is removed (no constructor anywhere).
- replica_listener doc adds an explicit trusted-network-only
  warning for the handshake (no mTLS / shared secret).
- writer_task MAX_BATCH doc drops the "tunable per-deployment"
  claim.
- Duplicate client id insert is now unreachable!() with a
  helpful diagnostic.
- chain-replication failure paths use structured
  tracing::warn!(error = ?e, ...) so oncall can grep the
  SendError variant.
- MessageBus trait doc advertises the no-yield property of
  send_to_* in the production impl.
- writer_task's license-header "czpressed" typo fixed.
- transports/mod.rs 40-line sketch compressed to a tracking
  reference for IGGY-112.

Deferred with TODO(hubcio)

- Chain-replicate-before-journal-append ordering (C4) is
  flagged at both metadata and partitions sites; an ordering
  decision issue is needed, not a patch in this PR.
- Fan-out deep_copy (W2) in plane_helpers and the three shard
  send sites is flagged; requires a trait-level change to
  MessageBus::send_to_* taking Frozen<MESSAGE_ALIGN> so the
  primary freezes once and fan-out is refcount bumps.

Scope

Consensus, metadata, partitions, shard, and simulator are
updated to the new MessageBus shape; core/server is left
untouched as legacy code. MESSAGE_ALIGN promoted to pub in
iggy_binary_protocol::consensus is consistent with the
existing Message<GenericHeader> leak through the MessageBus
trait and is accepted for this PR (N8).

Follow-up issues to file:
- W2 MessageBus::send_to_*(.., Frozen<MESSAGE_ALIGN>) trait-
  level fan-out fix (benchmark-driven)
- W5 SendError::Backpressure(Message) carrying payload back
  to the caller (benchmark-driven)
- W11 SimOutbox bounded mode (simulator coverage)
- N6 ConnectionRegistry<u8> -> Vec<Option<Entry>> micro-opt
- N7 drain parallel via FuturesUnordered
- C4 VSR chain-replicate-vs-append ordering decision
- IGGY-112 Transport trait family + mTLS handshake
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant