Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ARCHITECTURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,8 @@ The supervisor enforces two independent timeouts on every read from a peer:
- **Liveness (per-recv, `LIVENESS_TIMEOUT = 10s`).** Each `recv` waits at most this long. Any received frame — including a `Heartbeat` — resets the clock. A peer emitting heartbeats every 2s during a long-running hook is therefore *not* declared dead, no matter how long that hook takes.
- **Wall-clock (per-phase budget) + `WIRE_SLACK`.** Regardless of heartbeats, the supervisor aborts once the configured budget elapses: `drain_grace` for the drain phase, `deadline` (overall) for everything else. Each cap is extended by `WIRE_SLACK` (1 s) on the supervisor's read side. The peer's clock for a phase starts when it deserializes the request (`T_s + δ_net`), and a reply produced after running to that budget still has to traverse `δ_net + δ_serialize` on the way back; without the slack, the supervisor would abort a frame already on the wire. The slack is conservative — Unix-socket round-trip + frame serialization is well under 1 s — and is not a tuning knob.

The per-recv timeout is armed with `SO_RCVTIMEO` (`UnixStream::set_read_timeout`) before each read. macOS and the BSDs reject `setsockopt` with `EINVAL` for *any* option once a socket is fully shut down (`SS_CANTRCVMORE | SS_CANTSENDMORE`) — exactly the state a peer that closed its end leaves behind — whereas Linux accepts it and the following read simply returns EOF. The supervisor therefore arms the timeout through a helper (`arm_recv_timeout`) that, on `EINVAL`, confirms via a non-blocking peek that the read cannot block (the peer is gone: a buffered frame is delivered, otherwise EOF) and proceeds to read without the timeout. This keeps the read path identical across platforms — crucially, a peer that sends a complete frame (e.g. `Ready`) and then exits still has that frame drained from the socket buffer rather than being dropped as a spurious abort.

The incumbent feeds this design by spawning a background heartbeat thread for the duration of `Drainable::drain` and `Drainable::seal`. The thread writes a `Heartbeat` frame every `HEARTBEAT_INTERVAL` (2s). When the hook returns, an RAII guard signals the thread to stop and joins it before the main thread sends the next protocol frame — so there is never concurrent writing on the control socket. The 5× safety margin (2s heartbeat vs 10s liveness) absorbs scheduler hiccups.

The seal-wait loop additionally skips `SealProgress` frames so a consumer with a multi-shard seal can emit explicit progress (`shards_sealed`, `shards_total`, `last_revision`) on top of the implicit heartbeat liveness signal. The payload is consumed silently today; a future version may surface it in metrics.
Expand Down
20 changes: 11 additions & 9 deletions crates/handoff-tests/tests/crash_matrix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,11 @@ fn supervisor_crashes_after_spawn_successor() {
exit.assert_crashed_at(&fx, points::S_AFTER_SPAWN_SUCCESSOR);

assert!(primitive.alive(), "O survives S crash post-spawn");
// N was spawned and wrote its pid marker before attempting handshake.
// N was spawned and writes its pid marker before attempting handshake.
// N starts up asynchronously to S's crash, so wait briefly for the
// marker rather than racing N's first action against S's exit.
assert!(
read_marker_value(&fx.markers, "successor-pid").is_some(),
fx.wait_marker("successor-pid", Duration::from_secs(3)),
"N must have written successor-pid before handshake"
);
// handshake either never completed (write to dead socket) or N had not
Expand Down Expand Up @@ -186,7 +188,7 @@ fn supervisor_crashes_after_prepare_sent() {
assert!(primitive.alive(), "incumbent must survive S crash pre-seal");
assert!(fx.marker_exists("drain-called"));
assert!(
fx.marker_exists("resume-called"),
fx.wait_marker("resume-called", Duration::from_secs(3)),
"drained-without-seal cleanup should call resume_after_abort"
);

Expand Down Expand Up @@ -221,7 +223,7 @@ fn supervisor_crashes_after_seal_complete_recv() {
assert!(primitive.alive(), "O must survive S crash post-seal");
assert!(fx.marker_exists("seal-called"));
assert!(
fx.marker_exists("resume-called"),
fx.wait_marker("resume-called", Duration::from_secs(3)),
"sealed-then-EOF should trigger resume_after_abort"
);

Expand Down Expand Up @@ -352,7 +354,7 @@ fn supervisor_crashes_after_begin_sent_converges() {
FlockState::Held { pid, alive: true } if pid as u32 == o_pid => {
// Outcome B: O recovered.
assert!(
fx.marker_exists("resume-called"),
fx.wait_marker("resume-called", Duration::from_secs(3)),
"O's recovery path must have run resume_after_abort"
);
// Journal could be either Sealing (S journaled it before
Expand Down Expand Up @@ -548,7 +550,7 @@ fn successor_crashes_after_handshake() {

assert!(primitive.alive(), "O survives N crash post-handshake");
assert!(
fx.marker_exists("resume-called"),
fx.wait_marker("resume-called", Duration::from_secs(3)),
"O must be told to resume after N is killed"
);
match fx.flock_state() {
Expand Down Expand Up @@ -579,7 +581,7 @@ fn successor_crashes_after_begin() {
assert!(!fx.marker_exists("successor-ready"));

assert!(primitive.alive(), "O survives N crash after Begin");
assert!(fx.marker_exists("resume-called"));
assert!(fx.wait_marker("resume-called", Duration::from_secs(3)));
match fx.flock_state() {
FlockState::Held { pid, alive: true } => assert_eq!(pid as u32, o_pid),
other => panic!("flock not held by live O after N crash + Resume: {other:?}"),
Expand Down Expand Up @@ -612,7 +614,7 @@ fn successor_crashes_before_ready() {

// O is still alive; resume marker fired.
assert!(primitive.alive(), "O should survive N crash");
assert!(fx.marker_exists("resume-called"));
assert!(fx.wait_marker("resume-called", Duration::from_secs(3)));

// O holds the flock; journal cleared (supervisor returned cleanly).
match fx.flock_state() {
Expand Down Expand Up @@ -645,7 +647,7 @@ fn incumbent_seal_failure_keeps_o_alive() {
assert!(primitive.alive(), "O survives seal-failure abort");
assert!(fx.marker_exists("seal-called"));
assert!(
fx.marker_exists("resume-called"),
fx.wait_marker("resume-called", Duration::from_secs(3)),
"seal_failure must trigger O's resume_after_abort"
);

Expand Down
54 changes: 51 additions & 3 deletions crates/handoff/src/supervisor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ impl Supervisor {
}
let remaining = seal_read_deadline - now;
let recv_timeout = LIVENESS_TIMEOUT.min(remaining).max(MIN_READ_TIMEOUT);
o_stream.set_read_timeout(Some(recv_timeout))?;
arm_recv_timeout(&o_stream, recv_timeout)?;
match read_message(&mut o_stream) {
Ok((_, Message::SealProgress { .. })) => continue,
Ok((_, Message::Heartbeat { .. })) => continue,
Expand Down Expand Up @@ -652,7 +652,7 @@ impl Supervisor {
// Bound the initial Hello so a peer that accepted the connection but
// hangs before writing can't block `perform_handoff` indefinitely.
// Cleared after the read regardless of outcome.
stream.set_read_timeout(Some(HELLO_READ_TIMEOUT))?;
arm_recv_timeout(stream, HELLO_READ_TIMEOUT)?;
let read_result = read_message(stream);
let _ = stream.set_read_timeout(None);
let (_v, peer_hello) = match read_result {
Expand Down Expand Up @@ -857,7 +857,7 @@ where
// successful frame — heartbeat or otherwise — resets this on
// the next iteration.
let recv_timeout = LIVENESS_TIMEOUT.min(remaining).max(MIN_READ_TIMEOUT);
stream.set_read_timeout(Some(recv_timeout))?;
arm_recv_timeout(stream, recv_timeout)?;
match read_message(stream) {
Ok((_, Message::Heartbeat { .. })) => continue,
Ok((_, Message::SealProgress { .. })) => continue,
Expand All @@ -881,6 +881,54 @@ where
}
}

/// Arm the per-recv read timeout on `stream`, tolerating the macOS/BSD quirk
/// where the peer has closed mid-handshake.
///
/// We bound each socket read with `SO_RCVTIMEO` (via `set_read_timeout`) so a
/// silent peer can't wedge the handoff. On Linux this always succeeds. On
/// macOS and the BSDs, `setsockopt` returns `EINVAL` for *any* option once the
/// socket is fully shut down (`SS_CANTRCVMORE | SS_CANTSENDMORE`) — which is
/// exactly what a peer that closed its end leaves behind. That `EINVAL` is not
/// a real error: the peer is gone, so the read can no longer block. We must
/// still perform the read, because a peer that sent a complete frame and then
/// exited leaves that frame buffered (observed: a full `Ready` sitting in the
/// receive buffer behind a shut-down socket) — dropping it would abort a
/// handoff the successor actually completed.
///
/// So on `EINVAL` we confirm the read cannot block (a non-blocking peek shows
/// EOF or buffered data) and return `Ok(())` *without* an armed timeout; the
/// caller's `read_message` then drains any buffered frame and otherwise sees
/// EOF, taking the same path it would on Linux. If the peek shows the socket
/// is still open and empty (a genuine `EINVAL` we can't explain away), we
/// surface the error rather than risk an unbounded blocking read.
fn arm_recv_timeout(stream: &UnixStream, recv_timeout: Duration) -> Result<()> {
match stream.set_read_timeout(Some(recv_timeout)) {
Ok(()) => Ok(()),
Err(e) if e.raw_os_error() == Some(libc::EINVAL) && !read_would_block(stream) => Ok(()),
Err(e) => Err(e.into()),
}
}

/// Non-blocking peek: `true` only if a read right now would block (socket open,
/// no buffered data, not at EOF). A closed peer (EOF, returns 0) or buffered
/// bytes (returns > 0) both mean a blocking read makes immediate progress, so
/// we report `false`. Any other peek error also means the read won't block (it
/// will surface that error promptly), so likewise `false`.
fn read_would_block(stream: &UnixStream) -> bool {
let mut byte = [0u8; 1];
// SAFETY: `recv` into a valid 1-byte buffer on a borrowed-but-live fd.
// MSG_PEEK leaves any bytes queued; MSG_DONTWAIT keeps this non-blocking.
let ret = unsafe {
libc::recv(
stream.as_raw_fd(),
byte.as_mut_ptr() as *mut libc::c_void,
1,
libc::MSG_PEEK | libc::MSG_DONTWAIT,
)
};
ret < 0 && std::io::Error::last_os_error().kind() == ErrorKind::WouldBlock
}

fn remaining_until(deadline: Instant) -> Duration {
deadline
.checked_duration_since(Instant::now())
Expand Down
Loading