diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index d9dc1ab..8ad67a7 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -273,6 +273,8 @@ The supervisor enforces two independent timeouts on every read from a peer: - **Liveness (per-recv, `LIVENESS_TIMEOUT = 10s`).** Each `recv` waits at most this long. Any received frame — including a `Heartbeat` — resets the clock. A peer emitting heartbeats every 2s during a long-running hook is therefore *not* declared dead, no matter how long that hook takes. - **Wall-clock (per-phase budget) + `WIRE_SLACK`.** Regardless of heartbeats, the supervisor aborts once the configured budget elapses: `drain_grace` for the drain phase, `deadline` (overall) for everything else. Each cap is extended by `WIRE_SLACK` (1 s) on the supervisor's read side. The peer's clock for a phase starts when it deserializes the request (`T_s + δ_net`), and a reply produced after running to that budget still has to traverse `δ_net + δ_serialize` on the way back; without the slack, the supervisor would abort a frame already on the wire. The slack is conservative — Unix-socket round-trip + frame serialization is well under 1 s — and is not a tuning knob. +The per-recv timeout is armed with `SO_RCVTIMEO` (`UnixStream::set_read_timeout`) before each read. macOS and the BSDs reject `setsockopt` with `EINVAL` for *any* option once a socket is fully shut down (`SS_CANTRCVMORE | SS_CANTSENDMORE`) — exactly the state a peer that closed its end leaves behind — whereas Linux accepts it and the following read simply returns EOF. The supervisor therefore arms the timeout through a helper (`arm_recv_timeout`) that, on `EINVAL`, confirms via a non-blocking peek that the read cannot block (the peer is gone: a buffered frame is delivered, otherwise EOF) and proceeds to read without the timeout. This keeps the read path identical across platforms — crucially, a peer that sends a complete frame (e.g. `Ready`) and then exits still has that frame drained from the socket buffer rather than being dropped as a spurious abort. + The incumbent feeds this design by spawning a background heartbeat thread for the duration of `Drainable::drain` and `Drainable::seal`. The thread writes a `Heartbeat` frame every `HEARTBEAT_INTERVAL` (2s). When the hook returns, an RAII guard signals the thread to stop and joins it before the main thread sends the next protocol frame — so there is never concurrent writing on the control socket. The 5× safety margin (2s heartbeat vs 10s liveness) absorbs scheduler hiccups. The seal-wait loop additionally skips `SealProgress` frames so a consumer with a multi-shard seal can emit explicit progress (`shards_sealed`, `shards_total`, `last_revision`) on top of the implicit heartbeat liveness signal. The payload is consumed silently today; a future version may surface it in metrics. diff --git a/crates/handoff-tests/tests/crash_matrix.rs b/crates/handoff-tests/tests/crash_matrix.rs index c667534..04839f6 100644 --- a/crates/handoff-tests/tests/crash_matrix.rs +++ b/crates/handoff-tests/tests/crash_matrix.rs @@ -74,9 +74,11 @@ fn supervisor_crashes_after_spawn_successor() { exit.assert_crashed_at(&fx, points::S_AFTER_SPAWN_SUCCESSOR); assert!(primitive.alive(), "O survives S crash post-spawn"); - // N was spawned and wrote its pid marker before attempting handshake. + // N was spawned and writes its pid marker before attempting handshake. + // N starts up asynchronously to S's crash, so wait briefly for the + // marker rather than racing N's first action against S's exit. assert!( - read_marker_value(&fx.markers, "successor-pid").is_some(), + fx.wait_marker("successor-pid", Duration::from_secs(3)), "N must have written successor-pid before handshake" ); // handshake either never completed (write to dead socket) or N had not @@ -186,7 +188,7 @@ fn supervisor_crashes_after_prepare_sent() { assert!(primitive.alive(), "incumbent must survive S crash pre-seal"); assert!(fx.marker_exists("drain-called")); assert!( - fx.marker_exists("resume-called"), + fx.wait_marker("resume-called", Duration::from_secs(3)), "drained-without-seal cleanup should call resume_after_abort" ); @@ -221,7 +223,7 @@ fn supervisor_crashes_after_seal_complete_recv() { assert!(primitive.alive(), "O must survive S crash post-seal"); assert!(fx.marker_exists("seal-called")); assert!( - fx.marker_exists("resume-called"), + fx.wait_marker("resume-called", Duration::from_secs(3)), "sealed-then-EOF should trigger resume_after_abort" ); @@ -352,7 +354,7 @@ fn supervisor_crashes_after_begin_sent_converges() { FlockState::Held { pid, alive: true } if pid as u32 == o_pid => { // Outcome B: O recovered. assert!( - fx.marker_exists("resume-called"), + fx.wait_marker("resume-called", Duration::from_secs(3)), "O's recovery path must have run resume_after_abort" ); // Journal could be either Sealing (S journaled it before @@ -548,7 +550,7 @@ fn successor_crashes_after_handshake() { assert!(primitive.alive(), "O survives N crash post-handshake"); assert!( - fx.marker_exists("resume-called"), + fx.wait_marker("resume-called", Duration::from_secs(3)), "O must be told to resume after N is killed" ); match fx.flock_state() { @@ -579,7 +581,7 @@ fn successor_crashes_after_begin() { assert!(!fx.marker_exists("successor-ready")); assert!(primitive.alive(), "O survives N crash after Begin"); - assert!(fx.marker_exists("resume-called")); + assert!(fx.wait_marker("resume-called", Duration::from_secs(3))); match fx.flock_state() { FlockState::Held { pid, alive: true } => assert_eq!(pid as u32, o_pid), other => panic!("flock not held by live O after N crash + Resume: {other:?}"), @@ -612,7 +614,7 @@ fn successor_crashes_before_ready() { // O is still alive; resume marker fired. assert!(primitive.alive(), "O should survive N crash"); - assert!(fx.marker_exists("resume-called")); + assert!(fx.wait_marker("resume-called", Duration::from_secs(3))); // O holds the flock; journal cleared (supervisor returned cleanly). match fx.flock_state() { @@ -645,7 +647,7 @@ fn incumbent_seal_failure_keeps_o_alive() { assert!(primitive.alive(), "O survives seal-failure abort"); assert!(fx.marker_exists("seal-called")); assert!( - fx.marker_exists("resume-called"), + fx.wait_marker("resume-called", Duration::from_secs(3)), "seal_failure must trigger O's resume_after_abort" ); diff --git a/crates/handoff/src/supervisor.rs b/crates/handoff/src/supervisor.rs index 23343ca..4cb3f4e 100644 --- a/crates/handoff/src/supervisor.rs +++ b/crates/handoff/src/supervisor.rs @@ -371,7 +371,7 @@ impl Supervisor { } let remaining = seal_read_deadline - now; let recv_timeout = LIVENESS_TIMEOUT.min(remaining).max(MIN_READ_TIMEOUT); - o_stream.set_read_timeout(Some(recv_timeout))?; + arm_recv_timeout(&o_stream, recv_timeout)?; match read_message(&mut o_stream) { Ok((_, Message::SealProgress { .. })) => continue, Ok((_, Message::Heartbeat { .. })) => continue, @@ -652,7 +652,7 @@ impl Supervisor { // Bound the initial Hello so a peer that accepted the connection but // hangs before writing can't block `perform_handoff` indefinitely. // Cleared after the read regardless of outcome. - stream.set_read_timeout(Some(HELLO_READ_TIMEOUT))?; + arm_recv_timeout(stream, HELLO_READ_TIMEOUT)?; let read_result = read_message(stream); let _ = stream.set_read_timeout(None); let (_v, peer_hello) = match read_result { @@ -857,7 +857,7 @@ where // successful frame — heartbeat or otherwise — resets this on // the next iteration. let recv_timeout = LIVENESS_TIMEOUT.min(remaining).max(MIN_READ_TIMEOUT); - stream.set_read_timeout(Some(recv_timeout))?; + arm_recv_timeout(stream, recv_timeout)?; match read_message(stream) { Ok((_, Message::Heartbeat { .. })) => continue, Ok((_, Message::SealProgress { .. })) => continue, @@ -881,6 +881,54 @@ where } } +/// Arm the per-recv read timeout on `stream`, tolerating the macOS/BSD quirk +/// where the peer has closed mid-handshake. +/// +/// We bound each socket read with `SO_RCVTIMEO` (via `set_read_timeout`) so a +/// silent peer can't wedge the handoff. On Linux this always succeeds. On +/// macOS and the BSDs, `setsockopt` returns `EINVAL` for *any* option once the +/// socket is fully shut down (`SS_CANTRCVMORE | SS_CANTSENDMORE`) — which is +/// exactly what a peer that closed its end leaves behind. That `EINVAL` is not +/// a real error: the peer is gone, so the read can no longer block. We must +/// still perform the read, because a peer that sent a complete frame and then +/// exited leaves that frame buffered (observed: a full `Ready` sitting in the +/// receive buffer behind a shut-down socket) — dropping it would abort a +/// handoff the successor actually completed. +/// +/// So on `EINVAL` we confirm the read cannot block (a non-blocking peek shows +/// EOF or buffered data) and return `Ok(())` *without* an armed timeout; the +/// caller's `read_message` then drains any buffered frame and otherwise sees +/// EOF, taking the same path it would on Linux. If the peek shows the socket +/// is still open and empty (a genuine `EINVAL` we can't explain away), we +/// surface the error rather than risk an unbounded blocking read. +fn arm_recv_timeout(stream: &UnixStream, recv_timeout: Duration) -> Result<()> { + match stream.set_read_timeout(Some(recv_timeout)) { + Ok(()) => Ok(()), + Err(e) if e.raw_os_error() == Some(libc::EINVAL) && !read_would_block(stream) => Ok(()), + Err(e) => Err(e.into()), + } +} + +/// Non-blocking peek: `true` only if a read right now would block (socket open, +/// no buffered data, not at EOF). A closed peer (EOF, returns 0) or buffered +/// bytes (returns > 0) both mean a blocking read makes immediate progress, so +/// we report `false`. Any other peek error also means the read won't block (it +/// will surface that error promptly), so likewise `false`. +fn read_would_block(stream: &UnixStream) -> bool { + let mut byte = [0u8; 1]; + // SAFETY: `recv` into a valid 1-byte buffer on a borrowed-but-live fd. + // MSG_PEEK leaves any bytes queued; MSG_DONTWAIT keeps this non-blocking. + let ret = unsafe { + libc::recv( + stream.as_raw_fd(), + byte.as_mut_ptr() as *mut libc::c_void, + 1, + libc::MSG_PEEK | libc::MSG_DONTWAIT, + ) + }; + ret < 0 && std::io::Error::last_os_error().kind() == ErrorKind::WouldBlock +} + fn remaining_until(deadline: Instant) -> Duration { deadline .checked_duration_since(Instant::now())