From b6ce5937b96eea543d115e77fafb0ae1234e8503 Mon Sep 17 00:00:00 2001 From: lukasIO Date: Tue, 9 Jun 2026 10:15:11 +0200 Subject: [PATCH 01/16] initial spec --- livekit/specs/signalling-reconnection.allium | 457 +++++++++++++++++++ 1 file changed, 457 insertions(+) create mode 100644 livekit/specs/signalling-reconnection.allium diff --git a/livekit/specs/signalling-reconnection.allium b/livekit/specs/signalling-reconnection.allium new file mode 100644 index 000000000..8a04032ee --- /dev/null +++ b/livekit/specs/signalling-reconnection.allium @@ -0,0 +1,457 @@ +-- allium: 3 +-- signalling-reconnection.allium +-- +-- AS-IS baseline of the LiveKit Rust SDK's signalling connection and +-- reconnection behaviour. Distilled from the implementation so that planned +-- improvements ("modularize and improve") can be expressed as explicit deltas +-- against a faithful description of what the code does today. +-- +-- Two layers are in scope: +-- 1. The signalling link -- SignalClient in livekit-api/src/signal_client/. +-- Connection liveness, the resume ("restart") of the link, the send queue +-- and its reconnecting gate, token refresh. +-- 2. The reconnect orchestration -- RtcEngine in +-- livekit/src/rtc_engine/mod.rs. The decision to resume vs fully reconnect, +-- escalation, bounded retries, and server LeaveRequest handling. +-- +-- Excludes (treated as external boundaries): +-- - WebSocket framing / HTTP upgrade / TLS, region-URL discovery and the +-- v1->v0 endpoint fallback (connection-establishment mechanics). +-- - Peer-connection and media recovery (RtcSession.restart / restart_publisher +-- / wait_pc_reconnected). Surfaced here only as recovery-attempt outcomes. +-- - The Room ConnectionState machine. Surfaced here only as the lifecycle +-- events the engine emits for it to observe. +-- +-- Reason enum values carried by triggers (passed as bare enum-literal args): +-- ping_timeout | stream_closed | peer_connection_failed | server_leave +-- | reconnect_attempts_exhausted | client_initiated | unknown +-- +-- NOTE: distilled without a running `allium` CLI; structural checks pending. + +config { + -- Initial connect: number of extra attempts after the first (Room default). + join_retries: Integer = 3 + -- Per signalling-link connect attempt timeout (SIGNAL_CONNECT_TIMEOUT). + connect_timeout: Duration = 5.seconds + -- Reconnect loop bound and spacing (RECONNECT_ATTEMPTS / RECONNECT_INTERVAL). + max_reconnect_attempts: Integer = 10 + reconnect_interval: Duration = 5.seconds + -- Settle delay before re-checking PC state on the resume path + -- (PC_RECONNECT_SETTLE_DELAY). Resume-only; full reconnect builds new PCs. + pc_reconnect_settle_delay: Duration = 1.seconds +} + +-- --------------------------------------------------------------------------- +-- Shared vocabulary +-- --------------------------------------------------------------------------- + +-- The action the SFU attaches to a LeaveRequest, telling the client how (or +-- whether) to recover. `reconnect` forces a full reconnect; `resume` a +-- lightweight resume; `disconnect` forbids reconnection. +value LeaveAction { + kind: resume | reconnect | disconnect +} + +-- A signalling message. `kind` determines queue behaviour during a resume. +-- pass_through = Trickle | Offer | Answer | SyncState | Simulate | Leave +-- (mirrors client-sdk-js `passThroughQueueSignals`). Buffering any of these +-- would deadlock the resume, which depends on them flowing. +-- queueable = everything else (AddTrack, Mute, UpdateSubscription, ...). +value SignalMessage { + kind: pass_through | queueable +} + +-- The result of a recovery attempt, as observed by the engine. +value RecoveryOutcome { + succeeded: Boolean + -- True when the failure was a server Leave{Disconnect}: stop, do not retry. + server_disconnect: Boolean +} + +-- =========================================================================== +-- Layer 1: the signalling link (SignalClient) +-- =========================================================================== + +entity SignalConnection { + -- connected : stream up, sends flow immediately. + -- lost : stream dropped unexpectedly (ping timeout / WS close); + -- awaiting an engine-driven resume. No gate yet. + -- reconnecting : a resume is in flight. The link may already be reopened, + -- but the gate is on: queueable sends buffer, pass_through + -- sends still flow. Cleared by MarkReconnected. + -- closed : terminal (clean/local close or server Disconnect). + status: connected | lost | reconnecting | closed + + -- Auth token; refreshable mid-session so a resume can outlive an expired + -- join token (RefreshSignalToken updates this; the engine reads it at resume). + token: String + + -- Liveness parameters advertised by the SFU in the JoinResponse. + ping_interval: Duration + ping_timeout: Duration + + -- Reset on every message received from the server (not just pongs). + last_message_at: Timestamp? + + -- Queueable signals buffered while status = reconnecting; drained in + -- original order by MarkReconnected. + queued_signals: List + + -- The engine that drives this link's recovery (reverse of EngineConnection.signal). + engine: EngineConnection with signal = this + + -- The link is considered dead if nothing arrived within ping_timeout. + is_alive: last_message_at != null and last_message_at + ping_timeout > now + + transitions status { + connected -> lost -- ping timeout / stream closed + connected -> reconnecting -- resume started while WS still up (PC-failure resume) + connected -> closed -- clean / local close + lost -> reconnecting -- engine drives the resume + lost -> closed -- server Disconnect / give up + reconnecting -> connected -- resume finalised (MarkReconnected) + reconnecting -> lost -- resume severed again; engine will retry + reconnecting -> closed -- close during resume + terminal: closed + } +} + +rule EstablishSignalConnection { + when: ConnectSignal(url, token) + ensures: SignalConnection.created( + status: connected, + token: token, + queued_signals: {} + ) + @guidance + -- connect() awaits the SFU's JoinResponse, which supplies ping_interval + -- and ping_timeout. Establishment tries the given URL, then SFU-provided + -- region URLs in order; with single-PC mode it tries the v1 endpoint and + -- falls back to v0 only on 404. A non-404 failure runs a validate() probe + -- to surface a clearer HTTP error. All of that is establishment + -- mechanics, deliberately out of scope. +} + +rule SignalLinkTimesOut { + when: connection: SignalConnection.last_message_at + connection.ping_timeout <= now + requires: connection.status = connected + ensures: connection.status = lost + ensures: SignalLinkLost(connection, reason: ping_timeout) + @guidance + -- A periodic PingReq is sent every ping_interval to keep traffic flowing + -- and measure RTT. last_message_at is reset on ANY received message, so a + -- healthy stream never trips this. When it trips, the stream is torn down + -- and Close is emitted (-> SignalLinkLost). +} + +rule SignalStreamDrops { + when: SignalStreamDropped(connection) + requires: connection.status = connected + ensures: connection.status = lost + ensures: SignalLinkLost(connection, reason: stream_closed) + @guidance + -- The underlying WebSocket closed (server close or transport error) + -- while we did not initiate it. Same observable outcome as a ping + -- timeout: the link is lost and the engine is notified. +} + +rule ResumeSignalLink { + when: ResumeSignalLink(connection) + requires: connection.status in {lost, connected} + ensures: connection.status = reconnecting + ensures: SignalLinkResumed(connection) + @guidance + -- restart(): closes any existing stream and opens a fresh one with + -- reconnect=true and the (possibly refreshed) token, then awaits the + -- SFU's ReconnectResponse (carried by SignalLinkResumed). The + -- reconnecting gate is set BEFORE touching the stream so concurrent sends + -- route to the queue rather than racing a momentary stream=None window. + -- The gate stays on until MarkReconnected -- NOT cleared here. On failure + -- the gate is cleared and status returns to `lost` so the next attempt + -- can re-enter cleanly. +} + +rule SendSignal { + when: SendSignal(connection, message) + requires: connection.status != closed + if connection.status = reconnecting and message.kind = queueable: + ensures: connection.queued_signals = connection.queued_signals + {message} + else: + ensures: SignalDelivered(connection, message) + @guidance + -- pass_through messages always attempt immediate delivery -- the resume + -- depends on them. On the normal (non-reconnecting) path any backlog is + -- flushed in original order before the new message goes out. A delivery + -- failure re-queues a queueable message; a failed pass_through is + -- dropped with a warning. +} + +rule MarkReconnected { + when: MarkReconnected(connection) + requires: connection.status = reconnecting + ensures: connection.status = connected + ensures: connection.queued_signals = {} + @guidance + -- set_reconnected(): clears the gate FIRST, then flushes the queue, so a + -- send racing the flush takes the normal (already-draining) path and + -- queueable sends cannot sneak back into the queue mid-drain. The engine + -- calls this only after the resume has fully recovered. +} + +rule RefreshSignalToken { + when: ServerRefreshesToken(connection, token) + ensures: connection.token = token + @guidance + -- The SFU pushes a RefreshToken so a later resume can authenticate even + -- if the original join token has since expired. +} + +rule CloseSignalLink { + when: CloseSignalLink(connection) + ensures: connection.status = closed + @guidance + -- Local/clean shutdown: sends a Leave{Disconnect} and tears down the + -- stream. Also reached when the engine gives up or the server forbids + -- reconnection. +} + +-- =========================================================================== +-- Layer 2: reconnect orchestration (RtcEngine) +-- =========================================================================== + +entity EngineConnection { + -- connected : a live RtcSession; signalling and PCs up. + -- reconnecting : the reconnect loop owns recovery. + -- closed : terminal (gave up, or server Disconnect). + status: connected | reconnecting | closed + + -- The recovery strategy for the current episode. Escalates resume -> full + -- and NEVER downgrades, so a stale resume request cannot override a + -- full-reconnect decision already taken. + mode: resume | full + + -- 1-based attempt index within the current reconnecting episode; 0 when + -- connected. + attempt: Integer + + -- Gates all reconnection. Cleared when the server sends Leave{Disconnect} + -- (or a recovery attempt hits one), to avoid racing reconnection cycles. + can_reconnect: Boolean + + -- The signalling link this engine drives. + signal: SignalConnection + + transitions status { + connected -> reconnecting + reconnecting -> connected + reconnecting -> closed + connected -> closed + terminal: closed + } +} + +-- --- Reconnection causes ----------------------------------------------------- + +rule ReconnectOnSignalLoss { + when: SignalLinkLost(connection, reason) + let engine = connection.engine + requires: engine.can_reconnect + ensures: ReconnectNeeded(engine, full_reconnect: false, retry_now: false) + @guidance + -- An unexpected signal-link loss (ping timeout or WS close) drives a + -- resume, not urgent: the loop's normal interval applies. +} + +rule ReconnectOnPeerConnectionFailure { + when: PeerConnectionFailed(engine) + requires: engine.can_reconnect + ensures: ReconnectNeeded(engine, full_reconnect: false, retry_now: false) + @guidance + -- A PeerConnection reaching the Failed state drives a resume. The + -- signalling link may still be up; the resume path closes and reopens it. +} + +rule ServerRequestsReconnect { + when: ServerLeaveRequested(engine, action, reason) + requires: action.kind in {resume, reconnect} + requires: engine.can_reconnect + ensures: ReconnectNeeded( + engine, + full_reconnect: action.kind = reconnect, + retry_now: true + ) + @guidance + -- A server Leave with Resume/Reconnect drives recovery immediately + -- (retry_now). Reconnect forces a full reconnect; Resume a resume. +} + +rule ServerRequestsDisconnect { + when: ServerLeaveRequested(engine, action, reason) + requires: action.kind = disconnect + ensures: engine.can_reconnect = false + ensures: engine.status = closed + ensures: EngineDisconnected(engine, reason) + @guidance + -- A server Leave with Disconnect forbids reconnection outright and tears + -- the engine down with the server-supplied reason. +} + +-- --- Starting / escalating the reconnect loop -------------------------------- + +rule StartReconnect { + when: ReconnectNeeded(engine, full_reconnect, retry_now) + requires: engine.can_reconnect + requires: engine.status != reconnecting + ensures: engine.status = reconnecting + ensures: engine.attempt = 1 + if full_reconnect: + ensures: engine.mode = full + ensures: EngineRestarting(engine) + else: + ensures: engine.mode = resume + ensures: EngineResuming(engine) + ensures: ReconnectAttemptStarted(engine) + @guidance + -- Mirrors reconnection_needed() when not already reconnecting. The + -- one-time lifecycle notification (Resuming or Restarting) is emitted + -- here and the Room acts on it (e.g. unpublish local tracks on a full + -- reconnect) before recovery proceeds. NOTE: Restarting is emitted ONLY + -- when the episode STARTS in full mode; an escalation from a failed + -- resume does not re-emit it (see EscalateReconnect / ResumeAttemptFails). +} + +rule EscalateReconnect { + when: ReconnectNeeded(engine, full_reconnect, retry_now) + requires: engine.can_reconnect + requires: engine.status = reconnecting + if full_reconnect: + ensures: engine.mode = full + @guidance + -- Already reconnecting: only escalate resume -> full, never downgrade. A + -- stale signal-loss event (which asks for resume) must not override a + -- full-reconnect decision already taken. retry_now resets the loop's + -- interval so the next attempt fires immediately rather than after the + -- backoff (the attempt itself is driven by the in-flight loop). +} + +-- --- Attempt outcomes -------------------------------------------------------- +-- A reconnect attempt is performed against the recovery boundary (signalling +-- link + RtcSession + PeerConnections). Its outcome returns as one of the +-- stimuli below. See contract ConnectionRecovery for the ordering obligations. + +rule ResumeAttemptSucceeds { + when: ResumeAttemptSucceeded(engine) + requires: engine.status = reconnecting + ensures: engine.status = connected + ensures: engine.attempt = 0 + ensures: MarkReconnected(engine.signal) + ensures: EngineResumed(engine) + @guidance + -- Ordered resume sequence behind this outcome: + -- 1. ResumeSignalLink -> ReconnectResponse (gate on). + -- 2. SignalResumed -> the Room sends SyncState (pass_through, flows + -- despite the gate). + -- 3. publisher re-offer, sent AFTER SyncState. + -- 4. wait for PCs to reconnect, then pc_reconnect_settle_delay before + -- re-checking PC state. + -- 5. re-check the signal link is still alive; if severed, FAIL rather + -- than drain queued mutations into the void. + -- 6. MarkReconnected drains the signal queue. +} + +rule ResumeAttemptFails { + when: ResumeAttemptFailed(engine, server_disconnect, reason) + requires: engine.status = reconnecting + if server_disconnect: + ensures: engine.can_reconnect = false + ensures: engine.status = closed + ensures: EngineDisconnected(engine, reason) + else: + ensures: engine.mode = full + ensures: RetryReconnect(engine) + @guidance + -- A failed resume ESCALATES the episode to full reconnect (never + -- downgraded thereafter) and retries. But if the failure was a server + -- Leave{Disconnect}, the loop bails out instead of escalating: the server + -- is explicitly telling us to stop. +} + +rule RestartAttemptSucceeds { + when: RestartAttemptSucceeded(engine) + requires: engine.status = reconnecting + ensures: engine.status = connected + ensures: engine.attempt = 0 + ensures: EngineRestarted(engine) + @guidance + -- Full reconnect built a brand-new RtcSession (new signalling link, PCs, + -- data channels) and its PCs reached Connected. EngineRestarted lets the + -- Room republish. The new session replaces the old only on success, so a + -- failed attempt leaves the old session usable (e.g. for stats). +} + +rule RestartAttemptFails { + when: RestartAttemptFailed(engine, server_disconnect, reason) + requires: engine.status = reconnecting + if server_disconnect: + ensures: engine.can_reconnect = false + ensures: engine.status = closed + ensures: EngineDisconnected(engine, reason) + else: + ensures: RetryReconnect(engine) +} + +rule RetryReconnect { + when: RetryReconnect(engine) + requires: engine.status = reconnecting + if engine.attempt < config.max_reconnect_attempts: + ensures: engine.attempt = engine.attempt + 1 + ensures: ReconnectAttemptStarted(engine) + else: + ensures: engine.status = closed + ensures: EngineDisconnected(engine, reason: reconnect_attempts_exhausted) + @guidance + -- Attempts are spaced by reconnect_interval; retry_now (from a server + -- Leave) restarts the interval so the next attempt fires immediately. + -- After max_reconnect_attempts failures the engine gives up and closes, + -- always emitting EngineDisconnected so the Room leaves the Reconnecting + -- state rather than hanging there forever. +} + +-- --- Boundary with the recovery layer (RtcSession + PeerConnections) --------- + +contract ConnectionRecovery { + resume: (engine: EngineConnection) -> RecoveryOutcome + full_reconnect: (engine: EngineConnection) -> RecoveryOutcome + + @invariant ResumeOrdering + -- A resume sends SyncState before the publisher re-offer, waits for the + -- PeerConnections to reconnect (with pc_reconnect_settle_delay), re-checks + -- the signalling link is still alive, and only then drains the queue. + + @invariant SuccessReplacesSession + -- On full reconnect the new RtcSession replaces the current one only + -- after it fully succeeds; a failed attempt must leave the prior session + -- intact and usable. + + @invariant DisconnectStopsLoop + -- If a recovery attempt fails with a server Leave{Disconnect}, recovery + -- stops and the engine closes; it does not escalate or retry. +} + +-- --- Boundary with the Room (observer of the connection lifecycle) ----------- + +surface RoomConnectionLifecycle { + facing room: EngineConnection + + exposes: + room.status + room.mode + + @guarantee LifecycleNotifications + -- The engine emits, for the Room to observe: EngineResuming / + -- EngineRestarting (start of an episode, with a synchronous handshake the + -- Room completes before recovery proceeds), EngineResumed / + -- EngineRestarted (success), and EngineDisconnected (give-up or server + -- Disconnect). Restarting is emitted only for episodes that START in full + -- mode; resume->full escalations surface a Resumed-less Restarted. +} From 6823f1ec0cf21c56502266dd7d8e0c6a9b21c828 Mon Sep 17 00:00:00 2001 From: lukasIO Date: Tue, 9 Jun 2026 11:32:05 +0200 Subject: [PATCH 02/16] exp backoff + reconnect hardening --- livekit-api/src/signal_client/mod.rs | 26 +- livekit/specs/signalling-reconnection.allium | 726 ++++++++++++------- livekit/src/rtc_engine/mod.rs | 221 +++++- livekit/tests/reconnection_test.rs | 112 +++ 4 files changed, 786 insertions(+), 299 deletions(-) create mode 100644 livekit/tests/reconnection_test.rs diff --git a/livekit-api/src/signal_client/mod.rs b/livekit-api/src/signal_client/mod.rs index 83db20840..5d4f16256 100644 --- a/livekit-api/src/signal_client/mod.rs +++ b/livekit-api/src/signal_client/mod.rs @@ -192,8 +192,22 @@ impl SignalClient { if matches!(&err, SignalError::WsError(WsError::Http(e)) if e.status() != 403) { log::error!("unexpected signal error: {}", err.to_string()); } - let urls = RegionUrlProvider::fetch_region_urls(url, token).await?; - let mut last_err = err; + + // Region fallback is best-effort. `fetch_region_urls` already + // returns an empty list for non-cloud (direct / self-hosted) + // URLs, so those skip the fallback entirely. If the region fetch + // itself fails (e.g. the region endpoint is unreachable), we must + // NOT mask the original connection error with the fetch error — + // surface the real reason the connection failed instead. + let urls = match RegionUrlProvider::fetch_region_urls(url, token).await { + Ok(urls) => urls, + Err(region_err) => { + log::debug!( + "region url fetch failed ({region_err}); surfacing original connection error" + ); + return Err(err); + } + }; for url in urls.iter() { log::info!("fallback connection to: {}", url); @@ -203,11 +217,15 @@ impl SignalClient { Ok((inner, join_response, stream_events)) => { return Ok(handle_success(inner, join_response, stream_events)) } - Err(err) => last_err = err, + Err(region_conn_err) => { + log::warn!("region fallback connection to {url} failed: {region_conn_err}") + } } } - Err(last_err) + // Every region URL failed (or there were none): surface the + // ORIGINAL primary connection error, not the last region's error. + Err(err) } } } diff --git a/livekit/specs/signalling-reconnection.allium b/livekit/specs/signalling-reconnection.allium index 8a04032ee..c6e1d6d49 100644 --- a/livekit/specs/signalling-reconnection.allium +++ b/livekit/specs/signalling-reconnection.allium @@ -1,93 +1,114 @@ -- allium: 3 -- signalling-reconnection.allium -- --- AS-IS baseline of the LiveKit Rust SDK's signalling connection and --- reconnection behaviour. Distilled from the implementation so that planned --- improvements ("modularize and improve") can be expressed as explicit deltas --- against a faithful description of what the code does today. +-- TARGET behaviour for the LiveKit Rust SDK's signalling connection and +-- reconnection. Evolved from a faithful AS-IS baseline (distilled from +-- livekit-api/src/signal_client/ and livekit/src/rtc_engine/mod.rs) by applying +-- five intentional deltas. Each is marked `-- DELTA n` where it changes +-- behaviour from today's implementation. -- --- Two layers are in scope: --- 1. The signalling link -- SignalClient in livekit-api/src/signal_client/. --- Connection liveness, the resume ("restart") of the link, the send queue --- and its reconnecting gate, token refresh. --- 2. The reconnect orchestration -- RtcEngine in --- livekit/src/rtc_engine/mod.rs. The decision to resume vs fully reconnect, --- escalation, bounded retries, and server LeaveRequest handling. +-- 1. Decoupled resume: an explicit ordered chain of rules (reopen link -> +-- sync state -> republish offer -> await PCs -> re-check link -> drain +-- queue), not one opaque cross-layer sequence. +-- 2. Accurate lifecycle: a resume that escalates to a full reconnect now also +-- emits Restarting; exhaustion reports the original cause, not `unknown`. +-- 3. Backoff: exponential delay with full jitter, attempt-capped, replacing +-- the fixed 5s interval. +-- 4. Monotonic reconnect latch: a terminal `revoked` permission state replaces +-- the ad-hoc `can_reconnect` boolean. +-- 5. Region fallback in scope: region-URL fetch is cloud-only and never masks +-- the original connection error. -- --- Excludes (treated as external boundaries): --- - WebSocket framing / HTTP upgrade / TLS, region-URL discovery and the --- v1->v0 endpoint fallback (connection-establishment mechanics). --- - Peer-connection and media recovery (RtcSession.restart / restart_publisher --- / wait_pc_reconnected). Surfaced here only as recovery-attempt outcomes. --- - The Room ConnectionState machine. Surfaced here only as the lifecycle --- events the engine emits for it to observe. +-- Boundaries: peer-connection / media recovery (RtcSession) and the Room are +-- external; modelled via the MediaLayer / Application surfaces below. -- --- Reason enum values carried by triggers (passed as bare enum-literal args): --- ping_timeout | stream_closed | peer_connection_failed | server_leave --- | reconnect_attempts_exhausted | client_initiated | unknown --- --- NOTE: distilled without a running `allium` CLI; structural checks pending. - -config { - -- Initial connect: number of extra attempts after the first (Room default). - join_retries: Integer = 3 - -- Per signalling-link connect attempt timeout (SIGNAL_CONNECT_TIMEOUT). - connect_timeout: Duration = 5.seconds - -- Reconnect loop bound and spacing (RECONNECT_ATTEMPTS / RECONNECT_INTERVAL). - max_reconnect_attempts: Integer = 10 - reconnect_interval: Duration = 5.seconds - -- Settle delay before re-checking PC state on the resume path - -- (PC_RECONNECT_SETTLE_DELAY). Resume-only; full reconnect builds new PCs. - pc_reconnect_settle_delay: Duration = 1.seconds +-- Checker note (allium 3.2.4): `allium check` reports residual `status` +-- warnings that are CHECKER ARTIFACTS, not spec defects -- every state flagged +-- "never assigned" is plainly assigned by a visible `ensures`. Three confirmed +-- CLI limitations cause them: +-- a. unreachableValue: the status-assignment analysis does not propagate +-- through deep chained-trigger sequences. A status assigned only by a rule +-- several emit/consume hops from an external stimulus (e.g. `reconnecting`, +-- set by StartReconnect two hops past the cause triggers) reads as +-- "never assigned". Short chains are fine. +-- b. noExit / multi-graph: a state whose only exit uses a set-membership +-- guard (`status in {...}`) reports noExit; and only ONE transition graph +-- per entity is honoured (the language ref permits one per field). Each +-- entity therefore declares a single `status` graph, and the +-- EngineConnection.reconnect_permission latch is enforced by construction + +-- the RevokedImpliesClosed invariant instead of a second graph. +-- c. externalEntity.missingSourceHint on Sfu/MediaLayer/Application is +-- expected: these are true external boundaries with no governing spec. +-- The graphs are retained for their documentation value (the transition +-- topology is the heart of the reconnection behaviour). + +------------------------------------------------------------ +-- External Entities +------------------------------------------------------------ + +-- The SFU server the client signals with. +external entity Sfu { } + +-- The peer-connection / RtcSession layer that performs media recovery. Out of +-- scope; surfaced only through the operations the engine demands of it and the +-- outcomes it reports back. +external entity MediaLayer { } + +-- The SDK consumer (Room) that drives connect/send/close and observes the +-- connection lifecycle. +external entity Application { + engine: EngineConnection } --- --------------------------------------------------------------------------- --- Shared vocabulary --- --------------------------------------------------------------------------- - --- The action the SFU attaches to a LeaveRequest, telling the client how (or --- whether) to recover. `reconnect` forces a full reconnect; `resume` a --- lightweight resume; `disconnect` forbids reconnection. -value LeaveAction { - kind: resume | reconnect | disconnect -} +------------------------------------------------------------ +-- Value Types +------------------------------------------------------------ -- A signalling message. `kind` determines queue behaviour during a resume. -- pass_through = Trickle | Offer | Answer | SyncState | Simulate | Leave --- (mirrors client-sdk-js `passThroughQueueSignals`). Buffering any of these --- would deadlock the resume, which depends on them flowing. +-- (mirrors client-sdk-js `passThroughQueueSignals`); buffering any would +-- deadlock the resume, which depends on them flowing. -- queueable = everything else (AddTrack, Mute, UpdateSubscription, ...). value SignalMessage { kind: pass_through | queueable } --- The result of a recovery attempt, as observed by the engine. -value RecoveryOutcome { - succeeded: Boolean - -- True when the failure was a server Leave{Disconnect}: stop, do not retry. - server_disconnect: Boolean +------------------------------------------------------------ +-- Enumerations +------------------------------------------------------------ + +-- Why a recovery episode started or the connection ended. Shared across the +-- cause triggers and the engine's stored reconnect_cause, so it is a named enum. +enum DisconnectCause { + ping_timeout + | stream_closed + | peer_connection_failed + | server_leave + | signal_severed_during_resume + | reconnect_attempts_exhausted + | client_initiated + | unknown } --- =========================================================================== --- Layer 1: the signalling link (SignalClient) --- =========================================================================== +------------------------------------------------------------ +-- Entities and Variants +------------------------------------------------------------ entity SignalConnection { -- connected : stream up, sends flow immediately. -- lost : stream dropped unexpectedly (ping timeout / WS close); - -- awaiting an engine-driven resume. No gate yet. - -- reconnecting : a resume is in flight. The link may already be reopened, - -- but the gate is on: queueable sends buffer, pass_through - -- sends still flow. Cleared by MarkReconnected. + -- awaiting an engine-driven resume. Gate not yet set. + -- reconnecting : a resume is in flight; the gate is on -- queueable sends + -- buffer, pass_through sends still flow. Cleared by + -- MarkReconnected. -- closed : terminal (clean/local close or server Disconnect). status: connected | lost | reconnecting | closed -- Auth token; refreshable mid-session so a resume can outlive an expired - -- join token (RefreshSignalToken updates this; the engine reads it at resume). + -- join token (the engine reads this at resume time). token: String - -- Liveness parameters advertised by the SFU in the JoinResponse. - ping_interval: Duration + -- Liveness parameter advertised by the SFU in the JoinResponse. ping_timeout: Duration -- Reset on every message received from the server (not just pongs). @@ -97,10 +118,11 @@ entity SignalConnection { -- original order by MarkReconnected. queued_signals: List - -- The engine that drives this link's recovery (reverse of EngineConnection.signal). + -- The engine that drives this link's recovery. engine: EngineConnection with signal = this - -- The link is considered dead if nothing arrived within ping_timeout. + -- The link is dead if nothing arrived within ping_timeout. Used by the + -- resume's mid-flight liveness re-check (ResumeRechecksLink). is_alive: last_message_at != null and last_message_at + ping_timeout > now transitions status { @@ -116,43 +138,145 @@ entity SignalConnection { } } -rule EstablishSignalConnection { - when: ConnectSignal(url, token) +entity EngineConnection { + -- connected : a live RtcSession; signalling and PCs up. + -- reconnecting : the reconnect loop owns recovery. + -- closed : terminal (gave up, or server Disconnect). + status: connected | reconnecting | closed + + -- Recovery strategy for the current episode. Escalates resume -> full and + -- NEVER downgrades. + mode: resume | full + + -- DELTA 4: a monotonic latch replacing the ad-hoc can_reconnect boolean and + -- its lock-guarded checks. Only entity creation assigns `permitted`; no rule + -- ever assigns it, and the rules that revoke also close the engine -- so once + -- the server (or a recovery attempt hitting a server Disconnect) forbids + -- reconnection, it stays forbidden. Monotonicity is by construction and is + -- asserted by the RevokedImpliesClosed invariant. + -- (A `permitted -> revoked` terminal transition graph would express this + -- structurally, but the entity's `status` graph below takes the single graph + -- slot the current checker honours; see the spec header note.) + reconnect_permission: permitted | revoked + + -- 1-based attempt index within the current episode; 0 when connected. + attempt: Integer + + -- DELTA 2: the cause that started the current episode, carried through so + -- that giving up reports the real reason instead of `unknown`. + reconnect_cause: DisconnectCause? + + -- The signalling link this engine drives. + signal: SignalConnection + + transitions status { + connected -> reconnecting + reconnecting -> connected + reconnecting -> closed + connected -> closed + terminal: closed + } +} + +------------------------------------------------------------ +-- Config +------------------------------------------------------------ + +config { + -- Initial connect: extra attempts after the first (Room default). + join_retries: Integer = 3 + -- Per signalling-link connect attempt timeout (SIGNAL_CONNECT_TIMEOUT). + connect_timeout: Duration = 5.seconds + -- DELTA 3: exponential backoff with full jitter, attempt-capped. + -- Per-attempt nominal delay = min(reconnect_max_delay_ms, + -- reconnect_base_delay_ms * reconnect_backoff_multiplier^(attempt - 1)); + -- the actual delay is sampled uniformly from [0, nominal] (full jitter). + -- Expressed in milliseconds because Duration has no sub-second unit. + max_reconnect_attempts: Integer = 10 + reconnect_base_delay_ms: Integer = 300 + reconnect_backoff_multiplier: Integer = 2 + reconnect_max_delay_ms: Integer = 7000 + -- Settle delay before re-checking PC state on the resume path + -- (PC_RECONNECT_SETTLE_DELAY). Resume-only; full reconnect builds new PCs. + pc_reconnect_settle_delay: Duration = 1.seconds +} + +------------------------------------------------------------ +-- Rules +------------------------------------------------------------ + +-- === Establishment (DELTA 5: region fallback) =============================== + +rule SignalConnectionEstablished { + when: SignalConnectSucceeded(target, token) ensures: SignalConnection.created( status: connected, token: token, queued_signals: {} ) @guidance - -- connect() awaits the SFU's JoinResponse, which supplies ping_interval - -- and ping_timeout. Establishment tries the given URL, then SFU-provided - -- region URLs in order; with single-PC mode it tries the v1 endpoint and - -- falls back to v0 only on 404. A non-404 failure runs a validate() probe - -- to surface a clearer HTTP error. All of that is establishment - -- mechanics, deliberately out of scope. + -- connect() awaits the SFU's JoinResponse (supplying ping_timeout). + -- Endpoint selection (v1->v0 on 404) is establishment mechanics, kept + -- out of scope; `target` records whether the host is `cloud` or `direct`. +} + +rule PrimaryConnectFailsOnCloud { + when: SignalConnectFailed(target, error) + requires: target = cloud + ensures: RegionFetchRequested(error) + @guidance + -- DELTA 5: only LiveKit Cloud hosts attempt region fallback. +} + +rule PrimaryConnectFailsOnDirect { + when: SignalConnectFailed(target, error) + requires: target = direct + ensures: SignalConnectRejected(error) + @guidance + -- DELTA 5: direct / self-hosted endpoints skip region fallback entirely + -- and surface the original connection error immediately. +} + +rule RegionFetchFails { + when: RegionFetchFailed(original_error) + ensures: SignalConnectRejected(original_error) + @guidance + -- DELTA 5: a failed region-URL fetch must NEVER mask the original + -- connection error (today the fetch error is surfaced instead). +} + +rule RegionFetchSucceeds { + when: RegionUrlsFetched(original_error) + ensures: RegionConnectAttemptRequested(original_error) + @guidance + -- Try each region URL in order; success yields SignalConnectSucceeded. } +rule AllRegionsFail { + when: RegionConnectsExhausted(original_error) + ensures: SignalConnectRejected(original_error) + @guidance + -- DELTA 5: when every region URL has failed, surface the ORIGINAL + -- primary connection error rather than the last region's error. +} + +-- === Signalling-link liveness & lifecycle =================================== + rule SignalLinkTimesOut { when: connection: SignalConnection.last_message_at + connection.ping_timeout <= now requires: connection.status = connected ensures: connection.status = lost - ensures: SignalLinkLost(connection, reason: ping_timeout) + ensures: SignalLinkLost(connection, cause: ping_timeout) @guidance - -- A periodic PingReq is sent every ping_interval to keep traffic flowing - -- and measure RTT. last_message_at is reset on ANY received message, so a - -- healthy stream never trips this. When it trips, the stream is torn down - -- and Close is emitted (-> SignalLinkLost). + -- last_message_at resets on ANY received message, so a healthy stream + -- never trips this. A periodic PingReq keeps traffic flowing. } rule SignalStreamDrops { when: SignalStreamDropped(connection) requires: connection.status = connected ensures: connection.status = lost - ensures: SignalLinkLost(connection, reason: stream_closed) - @guidance - -- The underlying WebSocket closed (server close or transport error) - -- while we did not initiate it. Same observable outcome as a ping - -- timeout: the link is lost and the engine is notified. + ensures: SignalLinkLost(connection, cause: stream_closed) } rule ResumeSignalLink { @@ -162,13 +286,18 @@ rule ResumeSignalLink { ensures: SignalLinkResumed(connection) @guidance -- restart(): closes any existing stream and opens a fresh one with - -- reconnect=true and the (possibly refreshed) token, then awaits the - -- SFU's ReconnectResponse (carried by SignalLinkResumed). The - -- reconnecting gate is set BEFORE touching the stream so concurrent sends - -- route to the queue rather than racing a momentary stream=None window. - -- The gate stays on until MarkReconnected -- NOT cleared here. On failure - -- the gate is cleared and status returns to `lost` so the next attempt - -- can re-enter cleanly. + -- reconnect=true and the (possibly refreshed) token, awaiting the SFU's + -- ReconnectResponse. The gate is set BEFORE touching the stream so + -- concurrent sends route to the queue, and stays on until MarkReconnected. +} + +rule SignalResumeFails { + when: SignalResumeFailed(connection) + requires: connection.status = reconnecting + ensures: connection.status = lost + @guidance + -- Reopening the stream failed; the gate is cleared and the link returns + -- to `lost` so the engine's next attempt can re-enter cleanly. } rule SendSignal { @@ -179,11 +308,9 @@ rule SendSignal { else: ensures: SignalDelivered(connection, message) @guidance - -- pass_through messages always attempt immediate delivery -- the resume - -- depends on them. On the normal (non-reconnecting) path any backlog is - -- flushed in original order before the new message goes out. A delivery - -- failure re-queues a queueable message; a failed pass_through is - -- dropped with a warning. + -- pass_through messages always attempt immediate delivery. On the normal + -- path any backlog is flushed in original order first. A failed queueable + -- send re-queues; a failed pass_through is dropped with a warning. } rule MarkReconnected { @@ -192,118 +319,92 @@ rule MarkReconnected { ensures: connection.status = connected ensures: connection.queued_signals = {} @guidance - -- set_reconnected(): clears the gate FIRST, then flushes the queue, so a - -- send racing the flush takes the normal (already-draining) path and - -- queueable sends cannot sneak back into the queue mid-drain. The engine - -- calls this only after the resume has fully recovered. + -- set_reconnected(): clears the gate FIRST, then flushes, so a racing + -- send takes the normal draining path and cannot re-queue mid-drain. } rule RefreshSignalToken { when: ServerRefreshesToken(connection, token) ensures: connection.token = token - @guidance - -- The SFU pushes a RefreshToken so a later resume can authenticate even - -- if the original join token has since expired. } rule CloseSignalLink { when: CloseSignalLink(connection) + requires: connection.status in {connected, lost, reconnecting} ensures: connection.status = closed - @guidance - -- Local/clean shutdown: sends a Leave{Disconnect} and tears down the - -- stream. Also reached when the engine gives up or the server forbids - -- reconnection. } --- =========================================================================== --- Layer 2: reconnect orchestration (RtcEngine) --- =========================================================================== - -entity EngineConnection { - -- connected : a live RtcSession; signalling and PCs up. - -- reconnecting : the reconnect loop owns recovery. - -- closed : terminal (gave up, or server Disconnect). - status: connected | reconnecting | closed - - -- The recovery strategy for the current episode. Escalates resume -> full - -- and NEVER downgrades, so a stale resume request cannot override a - -- full-reconnect decision already taken. - mode: resume | full +-- === Engine: initial connection ============================================= - -- 1-based attempt index within the current reconnecting episode; 0 when - -- connected. - attempt: Integer - - -- Gates all reconnection. Cleared when the server sends Leave{Disconnect} - -- (or a recovery attempt hits one), to avoid racing reconnection cycles. - can_reconnect: Boolean - - -- The signalling link this engine drives. - signal: SignalConnection - - transitions status { - connected -> reconnecting - reconnecting -> connected - reconnecting -> closed - connected -> closed - terminal: closed - } +rule EngineConnects { + when: EngineSessionConnected(signal) + ensures: EngineConnection.created( + status: connected, + mode: resume, + reconnect_permission: permitted, + attempt: 0, + signal: signal + ) } --- --- Reconnection causes ----------------------------------------------------- +-- === Engine: reconnection causes ============================================ rule ReconnectOnSignalLoss { - when: SignalLinkLost(connection, reason) + when: SignalLinkLost(connection, cause) let engine = connection.engine - requires: engine.can_reconnect - ensures: ReconnectNeeded(engine, full_reconnect: false, retry_now: false) + requires: engine.reconnect_permission = permitted + ensures: ReconnectNeeded(engine, full_reconnect: false, retry_now: false, cause: cause) @guidance - -- An unexpected signal-link loss (ping timeout or WS close) drives a - -- resume, not urgent: the loop's normal interval applies. + -- Unexpected signal loss drives a (non-urgent) resume. } rule ReconnectOnPeerConnectionFailure { when: PeerConnectionFailed(engine) - requires: engine.can_reconnect - ensures: ReconnectNeeded(engine, full_reconnect: false, retry_now: false) - @guidance - -- A PeerConnection reaching the Failed state drives a resume. The - -- signalling link may still be up; the resume path closes and reopens it. + requires: engine.reconnect_permission = permitted + ensures: ReconnectNeeded( + engine, + full_reconnect: false, + retry_now: false, + cause: peer_connection_failed + ) } rule ServerRequestsReconnect { - when: ServerLeaveRequested(engine, action, reason) - requires: action.kind in {resume, reconnect} - requires: engine.can_reconnect + when: ServerLeaveRequested(engine, action) + requires: action in {resume, reconnect} + requires: engine.reconnect_permission = permitted ensures: ReconnectNeeded( engine, - full_reconnect: action.kind = reconnect, - retry_now: true + full_reconnect: action = reconnect, + retry_now: true, + cause: server_leave ) @guidance - -- A server Leave with Resume/Reconnect drives recovery immediately - -- (retry_now). Reconnect forces a full reconnect; Resume a resume. + -- A server Leave with Resume/Reconnect drives recovery immediately. } rule ServerRequestsDisconnect { - when: ServerLeaveRequested(engine, action, reason) - requires: action.kind = disconnect - ensures: engine.can_reconnect = false + when: ServerLeaveRequested(engine, action) + requires: action = disconnect + requires: engine.reconnect_permission = permitted + requires: engine.status in {connected, reconnecting} + ensures: engine.reconnect_permission = revoked ensures: engine.status = closed - ensures: EngineDisconnected(engine, reason) + ensures: EngineDisconnected(engine, cause: server_leave) @guidance - -- A server Leave with Disconnect forbids reconnection outright and tears - -- the engine down with the server-supplied reason. + -- A server Leave with Disconnect forbids reconnection outright (DELTA 4: + -- terminal `revoked`) and tears the engine down. } --- --- Starting / escalating the reconnect loop -------------------------------- +-- === Engine: starting / escalating the loop ================================= rule StartReconnect { - when: ReconnectNeeded(engine, full_reconnect, retry_now) - requires: engine.can_reconnect - requires: engine.status != reconnecting + when: ReconnectNeeded(engine, full_reconnect, retry_now, cause) + requires: engine.reconnect_permission = permitted + requires: engine.status = connected ensures: engine.status = reconnecting ensures: engine.attempt = 1 + ensures: engine.reconnect_cause = cause if full_reconnect: ensures: engine.mode = full ensures: EngineRestarting(engine) @@ -312,68 +413,131 @@ rule StartReconnect { ensures: EngineResuming(engine) ensures: ReconnectAttemptStarted(engine) @guidance - -- Mirrors reconnection_needed() when not already reconnecting. The - -- one-time lifecycle notification (Resuming or Restarting) is emitted - -- here and the Room acts on it (e.g. unpublish local tracks on a full - -- reconnect) before recovery proceeds. NOTE: Restarting is emitted ONLY - -- when the episode STARTS in full mode; an escalation from a failed - -- resume does not re-emit it (see EscalateReconnect / ResumeAttemptFails). + -- Mirrors reconnection_needed() on a fresh episode. The Room acts on the + -- one-time Resuming/Restarting notification (e.g. unpublish local tracks + -- on a full reconnect) before recovery proceeds. } rule EscalateReconnect { - when: ReconnectNeeded(engine, full_reconnect, retry_now) - requires: engine.can_reconnect + when: ReconnectNeeded(engine, full_reconnect, retry_now, cause) + requires: engine.reconnect_permission = permitted requires: engine.status = reconnecting - if full_reconnect: + if full_reconnect and engine.mode = resume: ensures: engine.mode = full + ensures: EngineRestarting(engine) + @guidance + -- Already reconnecting: only escalate resume -> full, never downgrade. + -- DELTA 2: the escalation now emits Restarting so the Room sees a discrete + -- event matching the mode it is in (previously the Room saw Resuming then + -- Restarted with no Restarting between). retry_now restarts the backoff so + -- the in-flight loop's next attempt fires immediately. +} + +-- === Engine: per-attempt dispatch =========================================== + +rule DispatchReconnectAttempt { + when: ReconnectAttemptStarted(engine) + requires: engine.status = reconnecting + if engine.mode = resume: + ensures: ResumeSignalLink(engine.signal) + else: + ensures: FullReconnectRequested(engine) + @guidance + -- Resume re-opens the existing link and recovers in place; full reconnect + -- asks the media layer to build a brand-new session. +} + +-- === Engine: the decoupled resume chain (DELTA 1) =========================== +-- Each step is a named rule chained to the next, so the ordering that today is +-- implicit across signal+session+engine is an explicit, testable sequence. + +rule ResumeSendsSyncState { + when: SignalLinkResumed(connection) + let engine = connection.engine + requires: engine.status = reconnecting + requires: engine.mode = resume + ensures: SyncStateSent(engine) + @guidance + -- Step 2: with the link reopened, the Room sends SyncState (a + -- pass_through, so it flows despite the gate) so the SFU resyncs state. +} + +rule ResumeRepublishesPublisher { + when: SyncStateSent(engine) + requires: engine.status = reconnecting + ensures: RepublishPublisherRequested(engine) @guidance - -- Already reconnecting: only escalate resume -> full, never downgrade. A - -- stale signal-loss event (which asks for resume) must not override a - -- full-reconnect decision already taken. retry_now resets the loop's - -- interval so the next attempt fires immediately rather than after the - -- backoff (the attempt itself is driven by the in-flight loop). + -- Step 3: the publisher offer is re-sent AFTER SyncState -- ordering a + -- refactor must preserve, now guaranteed by this chain not a comment. } --- --- Attempt outcomes -------------------------------------------------------- --- A reconnect attempt is performed against the recovery boundary (signalling --- link + RtcSession + PeerConnections). Its outcome returns as one of the --- stimuli below. See contract ConnectionRecovery for the ordering obligations. +rule ResumeAwaitsPeerConnections { + when: PublisherRepublished(engine) + requires: engine.status = reconnecting + ensures: AwaitPeerConnectionsRequested(engine, settle: config.pc_reconnect_settle_delay) + @guidance + -- Step 4: wait for the PeerConnections to reconnect, then apply the + -- settle delay before trusting their state. +} + +rule ResumeRechecksLink { + when: PeerConnectionsReconnected(engine) + requires: engine.status = reconnecting + if not engine.signal.is_alive: + ensures: ResumeAttemptFailed( + engine, + server_disconnect: false, + cause: signal_severed_during_resume + ) + else: + ensures: ResumeQueueDrainRequested(engine) + @guidance + -- Step 5: if the link died while we waited for PCs, FAIL rather than + -- drain queued mutations into the void. +} + +rule ResumeDrainsQueue { + when: ResumeQueueDrainRequested(engine) + requires: engine.status = reconnecting + ensures: MarkReconnected(engine.signal) + ensures: ResumeAttemptSucceeded(engine) + @guidance + -- Step 6: the resume has fully recovered; drain the signal queue. +} + +-- === Engine: attempt outcomes =============================================== rule ResumeAttemptSucceeds { when: ResumeAttemptSucceeded(engine) requires: engine.status = reconnecting ensures: engine.status = connected ensures: engine.attempt = 0 - ensures: MarkReconnected(engine.signal) + ensures: engine.reconnect_cause = null ensures: EngineResumed(engine) +} + +rule ResumeAttemptDisconnects { + when: ResumeAttemptFailed(engine, server_disconnect, cause) + requires: engine.status = reconnecting + requires: server_disconnect + ensures: engine.reconnect_permission = revoked + ensures: engine.status = closed + ensures: EngineDisconnected(engine, cause: cause) @guidance - -- Ordered resume sequence behind this outcome: - -- 1. ResumeSignalLink -> ReconnectResponse (gate on). - -- 2. SignalResumed -> the Room sends SyncState (pass_through, flows - -- despite the gate). - -- 3. publisher re-offer, sent AFTER SyncState. - -- 4. wait for PCs to reconnect, then pc_reconnect_settle_delay before - -- re-checking PC state. - -- 5. re-check the signal link is still alive; if severed, FAIL rather - -- than drain queued mutations into the void. - -- 6. MarkReconnected drains the signal queue. -} - -rule ResumeAttemptFails { - when: ResumeAttemptFailed(engine, server_disconnect, reason) + -- A resume that failed because the server sent Leave{Disconnect}: stop, + -- do not escalate or retry. +} + +rule ResumeAttemptEscalates { + when: ResumeAttemptFailed(engine, server_disconnect, cause) requires: engine.status = reconnecting - if server_disconnect: - ensures: engine.can_reconnect = false - ensures: engine.status = closed - ensures: EngineDisconnected(engine, reason) - else: - ensures: engine.mode = full - ensures: RetryReconnect(engine) + requires: not server_disconnect + ensures: engine.mode = full + ensures: EngineRestarting(engine) + ensures: RetryReconnect(engine) @guidance - -- A failed resume ESCALATES the episode to full reconnect (never - -- downgraded thereafter) and retries. But if the failure was a server - -- Leave{Disconnect}, the loop bails out instead of escalating: the server - -- is explicitly telling us to stop. + -- DELTA 2: a failed resume escalates to full reconnect AND emits + -- Restarting on the escalation (previously silent), then retries. } rule RestartAttemptSucceeds { @@ -381,77 +545,123 @@ rule RestartAttemptSucceeds { requires: engine.status = reconnecting ensures: engine.status = connected ensures: engine.attempt = 0 + ensures: engine.reconnect_cause = null ensures: EngineRestarted(engine) @guidance - -- Full reconnect built a brand-new RtcSession (new signalling link, PCs, - -- data channels) and its PCs reached Connected. EngineRestarted lets the - -- Room republish. The new session replaces the old only on success, so a - -- failed attempt leaves the old session usable (e.g. for stats). + -- The new RtcSession's PCs reached Connected. The new session replaces + -- the old only on success, so a failed attempt leaves the old usable. } -rule RestartAttemptFails { - when: RestartAttemptFailed(engine, server_disconnect, reason) +rule RestartAttemptDisconnects { + when: RestartAttemptFailed(engine, server_disconnect, cause) requires: engine.status = reconnecting - if server_disconnect: - ensures: engine.can_reconnect = false - ensures: engine.status = closed - ensures: EngineDisconnected(engine, reason) - else: - ensures: RetryReconnect(engine) + requires: server_disconnect + ensures: engine.reconnect_permission = revoked + ensures: engine.status = closed + ensures: EngineDisconnected(engine, cause: cause) +} + +rule RestartAttemptRetries { + when: RestartAttemptFailed(engine, server_disconnect, cause) + requires: engine.status = reconnecting + requires: not server_disconnect + ensures: RetryReconnect(engine) } -rule RetryReconnect { +rule RetryReconnectAgain { when: RetryReconnect(engine) requires: engine.status = reconnecting - if engine.attempt < config.max_reconnect_attempts: - ensures: engine.attempt = engine.attempt + 1 - ensures: ReconnectAttemptStarted(engine) - else: - ensures: engine.status = closed - ensures: EngineDisconnected(engine, reason: reconnect_attempts_exhausted) + requires: engine.attempt < config.max_reconnect_attempts + ensures: engine.attempt = engine.attempt + 1 + ensures: ReconnectAttemptStarted(engine) @guidance - -- Attempts are spaced by reconnect_interval; retry_now (from a server - -- Leave) restarts the interval so the next attempt fires immediately. - -- After max_reconnect_attempts failures the engine gives up and closes, - -- always emitting EngineDisconnected so the Room leaves the Reconnecting - -- state rather than hanging there forever. + -- DELTA 3: attempts are spaced by exponential backoff with full jitter + -- (see config); a server-Leave retry_now collapses the delay to zero. } --- --- Boundary with the recovery layer (RtcSession + PeerConnections) --------- +rule ReconnectExhausted { + when: RetryReconnect(engine) + requires: engine.status = reconnecting + requires: engine.attempt >= config.max_reconnect_attempts + ensures: engine.status = closed + ensures: EngineDisconnected(engine, cause: reconnect_attempts_exhausted) + @guidance + -- DELTA 2: on giving up, EngineDisconnected reports + -- reconnect_attempts_exhausted; the engine's reconnect_cause also remains + -- available to the Room for the underlying reason, instead of `unknown`. + -- Always emitted so the Room leaves the Reconnecting state. +} -contract ConnectionRecovery { - resume: (engine: EngineConnection) -> RecoveryOutcome - full_reconnect: (engine: EngineConnection) -> RecoveryOutcome +------------------------------------------------------------ +-- Invariants +------------------------------------------------------------ - @invariant ResumeOrdering - -- A resume sends SyncState before the publisher re-offer, waits for the - -- PeerConnections to reconnect (with pc_reconnect_settle_delay), re-checks - -- the signalling link is still alive, and only then drains the queue. +invariant RevokedImpliesClosed { + for engine in EngineConnection: + engine.reconnect_permission = revoked implies engine.status = closed +} - @invariant SuccessReplacesSession - -- On full reconnect the new RtcSession replaces the current one only - -- after it fully succeeds; a failed attempt must leave the prior session - -- intact and usable. +------------------------------------------------------------ +-- Surfaces +------------------------------------------------------------ - @invariant DisconnectStopsLoop - -- If a recovery attempt fails with a server Leave{Disconnect}, recovery - -- stops and the engine closes; it does not escalate or retry. -} +-- The application drives signalling sends/close and observes the lifecycle. +surface SignalCommands { + facing app: Application --- --- Boundary with the Room (observer of the connection lifecycle) ----------- + provides: + SendSignal(app) + CloseSignalLink(app) +} surface RoomConnectionLifecycle { - facing room: EngineConnection + facing app: Application exposes: - room.status - room.mode + app.engine.status + app.engine.mode + app.engine.reconnect_cause @guarantee LifecycleNotifications -- The engine emits, for the Room to observe: EngineResuming / - -- EngineRestarting (start of an episode, with a synchronous handshake the - -- Room completes before recovery proceeds), EngineResumed / - -- EngineRestarted (success), and EngineDisconnected (give-up or server - -- Disconnect). Restarting is emitted only for episodes that START in full - -- mode; resume->full escalations surface a Resumed-less Restarted. + -- EngineRestarting (start of an episode, or a resume->full escalation), + -- EngineResumed / EngineRestarted (success), EngineDisconnected (give-up + -- or server Disconnect). DELTA 2: an escalated full reconnect now also + -- emits Restarting. +} + +-- The SFU pushes connection outcomes, liveness loss, leave requests and tokens. +surface ServerSignalling { + facing sfu: Sfu + + provides: + SignalConnectSucceeded(sfu) + SignalConnectFailed(sfu) + RegionUrlsFetched(sfu) + RegionFetchFailed(sfu) + RegionConnectsExhausted(sfu) + SignalStreamDropped(sfu) + SignalResumeFailed(sfu) + ServerLeaveRequested(sfu) + ServerRefreshesToken(sfu) +} + +-- The media layer reports recovery progress and outcomes; the engine demands +-- the recovery operations from it. +surface MediaRecovery { + facing media: MediaLayer + + provides: + EngineSessionConnected(media) + PeerConnectionFailed(media) + PublisherRepublished(media) + PeerConnectionsReconnected(media) + RestartAttemptSucceeded(media) + RestartAttemptFailed(media) + + @guarantee RecoveryOperations + -- The engine drives the media layer via RepublishPublisherRequested, + -- AwaitPeerConnectionsRequested (which applies pc_reconnect_settle_delay) + -- and FullReconnectRequested. A full reconnect must leave the prior + -- session intact until the new one fully succeeds. } diff --git a/livekit/src/rtc_engine/mod.rs b/livekit/src/rtc_engine/mod.rs index f560ef554..afb0e8a39 100644 --- a/livekit/src/rtc_engine/mod.rs +++ b/livekit/src/rtc_engine/mod.rs @@ -16,13 +16,17 @@ use libwebrtc::prelude::*; use livekit_api::signal_client::{SignalError, SignalOptions}; use livekit_datatrack::backend as dt; use livekit_protocol as proto; -use livekit_runtime::{interval, Interval, JoinHandle, MissedTickBehavior}; +use livekit_runtime::JoinHandle; use parking_lot::{RwLock, RwLockReadGuard}; -use std::{borrow::Cow, fmt::Debug, sync::Arc, time::Duration}; +use std::{ + borrow::Cow, + fmt::Debug, + sync::Arc, + time::{Duration, SystemTime, UNIX_EPOCH}, +}; use thiserror::Error; use tokio::sync::{ - mpsc, oneshot, Mutex as AsyncMutex, Notify, RwLock as AsyncRwLock, - RwLockReadGuard as AsyncRwLockReadGuard, + mpsc, oneshot, Notify, RwLock as AsyncRwLock, RwLockReadGuard as AsyncRwLockReadGuard, }; pub use self::rtc_session::{SessionStats, INITIAL_BUFFERED_AMOUNT_LOW_THRESHOLD}; @@ -53,6 +57,37 @@ pub(crate) type EngineResult = Result; pub const RECONNECT_ATTEMPTS: u32 = 10; pub const RECONNECT_INTERVAL: Duration = Duration::from_secs(5); +/// Exponential-backoff-with-full-jitter parameters for spacing reconnect +/// attempts. The per-attempt delay is sampled uniformly from +/// `[0, min(RECONNECT_MAX_DELAY, RECONNECT_BASE_DELAY * MULTIPLIER^(attempt-1))]`. +/// This replaces the previous fixed [`RECONNECT_INTERVAL`] spacing: it recovers +/// faster from transient blips and spreads retries to avoid synchronised +/// reconnect storms across many clients after a server hiccup. +pub const RECONNECT_BASE_DELAY: Duration = Duration::from_millis(300); +pub const RECONNECT_BACKOFF_MULTIPLIER: u64 = 2; +pub const RECONNECT_MAX_DELAY: Duration = Duration::from_secs(7); + +/// Un-jittered backoff ceiling for the given 1-based reconnect attempt: +/// `min(RECONNECT_MAX_DELAY, RECONNECT_BASE_DELAY * MULTIPLIER^(attempt-1))`, +/// floored at 1ms. Grows geometrically until it saturates at the cap. +fn reconnect_backoff_nominal(attempt: u32) -> Duration { + let base = RECONNECT_BASE_DELAY.as_millis() as u64; + let cap = RECONNECT_MAX_DELAY.as_millis() as u64; + let exp = RECONNECT_BACKOFF_MULTIPLIER.saturating_pow(attempt.saturating_sub(1)); + Duration::from_millis(base.saturating_mul(exp).min(cap).max(1)) +} + +/// Full-jitter backoff delay for the given 1-based reconnect attempt: sampled +/// uniformly from `[0, reconnect_backoff_nominal(attempt)]`. A dependency-free +/// pseudo-random source from the system clock is sufficient — backoff jitter +/// does not need cryptographic quality, only de-correlation across clients. +fn reconnect_backoff_delay(attempt: u32) -> Duration { + let nominal = reconnect_backoff_nominal(attempt).as_millis() as u64; + let seed = + SystemTime::now().duration_since(UNIX_EPOCH).map(|d| d.subsec_nanos() as u64).unwrap_or(0); + Duration::from_millis(seed % (nominal + 1)) +} + /// Settling delay before checking PeerConnection state on the resume path. /// /// Lets a freshly issued ICE-restart offer/answer round-trip take effect when the @@ -224,6 +259,11 @@ struct EngineHandle { // If full_reconnect is true, the next attempt will not try to resume // and will instead do a full reconnect full_reconnect: bool, + + // The disconnect reason that started the current reconnection episode. + // Carried through so that, if reconnection ultimately fails, the engine + // closes with the original cause rather than a generic `UnknownReason`. + reconnect_reason: DisconnectReason, engine_task: Option<(JoinHandle<()>, oneshot::Sender<()>)>, } @@ -242,7 +282,10 @@ struct EngineInner { // We can simply wait for reconnection by trying to acquire a read lock. // (This also prevents new reconnection to happens if a read guard is still held) reconnecting_lock: AsyncRwLock<()>, - reconnecting_interval: AsyncMutex, + + // Signalled when a server-requested reconnect wants the next attempt to fire + // immediately, collapsing the exponential backoff wait between attempts. + retry_now_notify: Arc, } pub struct RtcEngine { @@ -409,8 +452,6 @@ impl EngineInner { session.wait_pc_connection().await?; let (engine_tx, engine_rx) = mpsc::unbounded_channel(); - let mut interval = interval(RECONNECT_INTERVAL); - interval.set_missed_tick_behavior(MissedTickBehavior::Delay); let inner = Arc::new(Self { lk_runtime, engine_tx, @@ -421,11 +462,12 @@ impl EngineInner { reconnecting: false, can_reconnect: true, full_reconnect: false, + reconnect_reason: DisconnectReason::UnknownReason, engine_task: None, }), options, reconnecting_lock: AsyncRwLock::default(), - reconnecting_interval: AsyncMutex::new(interval), + retry_now_notify: Arc::new(Notify::new()), }); // Start initial tasks @@ -527,6 +569,7 @@ impl EngineInner { self.reconnection_needed( retry_now, action == proto::leave_request::Action::Reconnect, + reason, ); } proto::leave_request::Action::Disconnect => { @@ -710,7 +753,13 @@ impl EngineInner { /// Start the reconnect task if not already started /// Ask to retry directly if `retry_now` is true /// Ask for a full reconnect if `full_reconnect` is true - fn reconnection_needed(self: &Arc, retry_now: bool, full_reconnect: bool) { + /// `reason` is the disconnect cause that triggered this reconnection + fn reconnection_needed( + self: &Arc, + retry_now: bool, + full_reconnect: bool, + reason: DisconnectReason, + ) { let mut running_handle = self.running_handle.write(); if !running_handle.can_reconnect { @@ -718,9 +767,6 @@ impl EngineInner { } if running_handle.reconnecting { - // If we're already reconnecting just update the interval to restart a new attempt - // ASAP - // Only escalate to full reconnect, never downgrade. Stale signal-close // events (which request resume) must not override a full reconnect decision // made by the reconnect loop after a failed resume attempt. @@ -728,11 +774,10 @@ impl EngineInner { running_handle.full_reconnect = true; } + // Wake the in-flight reconnect loop so its next attempt fires + // immediately, collapsing the backoff wait. if retry_now { - let inner = self.clone(); - livekit_runtime::spawn(async move { - inner.reconnecting_interval.lock().await.reset(); - }); + self.retry_now_notify.notify_one(); } return; @@ -740,6 +785,9 @@ impl EngineInner { running_handle.reconnecting = true; running_handle.full_reconnect = full_reconnect; + // Remember the cause so a failed reconnection closes with it rather than + // a generic UnknownReason. + running_handle.reconnect_reason = reason; livekit_runtime::spawn({ let inner = self.clone(); @@ -760,7 +808,20 @@ impl EngineInner { res = inner.reconnect_task() => { if res.is_err() { log::error!("failed to reconnect to the livekit room"); - inner.close(DisconnectReason::UnknownReason).await; + // The loop may already have closed the engine with an + // accurate reason (e.g. a server Disconnect hit + // mid-attempt). Only close here for the paths that + // didn't — chiefly attempt exhaustion — and do so with + // the cause that started this episode rather than a + // generic UnknownReason, avoiding a duplicate + // Disconnected event with a stale reason. + let (already_closed, reason) = { + let handle = inner.running_handle.read(); + (handle.closed, handle.reconnect_reason) + }; + if !already_closed { + inner.close(reason).await; + } } else { log::info!("RtcEngine successfully recovered") } @@ -792,6 +853,15 @@ impl EngineInner { ) }; + // Lifecycle notifications are emitted once per mode: Resuming the first + // time the episode resumes, Restarting the first time it (re)enters full + // reconnect. Crucially this includes an escalation from a failed resume, + // which previously emitted no Restarting at all -- leaving the Room to + // observe Resuming followed by Restarted with no Restarting between + // (DELTA 2). + let mut resuming_emitted = false; + let mut restarting_emitted = false; + for i in 1..=RECONNECT_ATTEMPTS { let (is_closed, full_reconnect) = { let running_handle = self.running_handle.read(); @@ -803,7 +873,8 @@ impl EngineInner { } if full_reconnect { - if i == 1 { + if !restarting_emitted { + restarting_emitted = true; let (tx, rx) = oneshot::channel(); let _ = self.engine_tx.send(EngineEvent::Restarting(tx)); let _ = rx.await; @@ -838,7 +909,8 @@ impl EngineInner { } } } else { - if i == 1 { + if !resuming_emitted { + resuming_emitted = true; let (tx, rx) = oneshot::channel(); let _ = self.engine_tx.send(EngineEvent::Resuming(tx)); let _ = rx.await; @@ -868,7 +940,16 @@ impl EngineInner { } } - self.reconnecting_interval.lock().await.tick().await; + // Exponential backoff with full jitter between attempts (DELTA 3). + // A server-requested reconnect signals retry_now_notify to collapse + // this wait so the next attempt fires immediately. + let backoff = reconnect_backoff_delay(i); + tokio::select! { + _ = livekit_runtime::sleep(backoff) => {} + _ = self.retry_now_notify.notified() => { + log::debug!("retry_now signalled, skipping reconnect backoff"); + } + } } Err(EngineError::Connection( @@ -926,36 +1007,59 @@ impl EngineInner { Ok(()) } - /// Try to restart the current session + /// Resume the current session in place (the lightweight reconnect path). + /// + /// The steps below run in a fixed order that any change must preserve, and + /// each non-trivial seam is its own method so the sequence — and the reason + /// for the ordering — is explicit rather than implied by statement order. + /// Mirrors the resume chain in `livekit/specs/signalling-reconnection.allium`: + /// 1. reopen the signalling link (queue gate stays on until step 4); + /// 2. SyncState before the publisher re-offer; + /// 3. re-offer the publisher, then await PC reconnection + settle; + /// 4. re-check link liveness, then drain the queue. async fn try_resume_connection(&self) -> EngineResult<()> { let session = self.running_handle.read().session.clone(); - let reconnect_response = session.restart().await?; - let (tx, rx) = oneshot::channel(); - let _ = self.engine_tx.send(EngineEvent::SignalResumed { reconnect_response, tx }); + // 1. Reopen the signalling link. The SignalClient stays gated + // (`reconnecting=true`) so queueable mutations buffer until step 4. + let reconnect_response = session.restart().await?; - // With SignalResumed, the room will send a SyncState message to the server. - // SyncState is a pass-through signal so it goes out immediately even though - // the SignalClient is still in `reconnecting=true` state. - let _ = rx.await; + // 2. Hand the ReconnectResponse to the room and wait until it has sent + // SyncState, which must precede the publisher re-offer. + self.resume_sync_state(reconnect_response).await; - // The publisher offer must be sent AFTER the SyncState message + // 3. Re-offer the publisher (strictly AFTER SyncState) and wait for the + // PeerConnections to reconnect, applying the settle delay. session.restart_publisher().await?; session.wait_pc_reconnected(PC_RECONNECT_SETTLE_DELAY).await?; - // Re-check the signal connection BEFORE flushing the queue. If the WS died - // while we were waiting for PCs to come back, draining queued mutations - // would just push them into the void; better to bail and let the engine - // try a fresh resume (or escalate). + // 4. Re-check link liveness and drain the queued mutations. + self.resume_finalize(&session).await + } + + /// Resume step 2: announce the resume to the room and block until it has + /// sent SyncState. SyncState is a pass-through signal, so it reaches the + /// server immediately even though the SignalClient is still gated. + async fn resume_sync_state(&self, reconnect_response: proto::ReconnectResponse) { + let (tx, rx) = oneshot::channel(); + let _ = self.engine_tx.send(EngineEvent::SignalResumed { reconnect_response, tx }); + // The room replies on `tx` once SyncState has gone out. + let _ = rx.await; + } + + /// Resume step 4: confirm the signalling link survived the PC-reconnect wait + /// before draining the queue. If the WS died while we were waiting for the + /// PeerConnections, draining queued mutations would just push them into the + /// void; bail instead and let the engine try a fresh resume (or escalate). + async fn resume_finalize(&self, session: &RtcSession) -> EngineResult<()> { if !session.signal_client().is_connected().await { return Err(EngineError::Connection("signal connection severed during resume".into())); } - // Flush queued mutations and clear the `reconnecting` flag — at this point - // the resume has fully recovered, so deferred subscription updates / mutes - // / etc. should now reach the server. Mirrors `client.setReconnected()`. + // Flush queued mutations and clear the `reconnecting` gate — the resume + // has fully recovered, so deferred subscription updates / mutes / etc. + // should now reach the server. Mirrors `client.setReconnected()`. session.signal_client().set_reconnected().await; - Ok(()) } } @@ -1028,4 +1132,47 @@ mod tests { ); } } + + #[test] + fn backoff_nominal_grows_geometrically_then_caps() { + // attempt 1 == base, then x2 each step, until it saturates at the cap. + assert_eq!(reconnect_backoff_nominal(1), RECONNECT_BASE_DELAY); + assert_eq!( + reconnect_backoff_nominal(2), + RECONNECT_BASE_DELAY * RECONNECT_BACKOFF_MULTIPLIER as u32 + ); + assert_eq!( + reconnect_backoff_nominal(3), + RECONNECT_BASE_DELAY * (RECONNECT_BACKOFF_MULTIPLIER * RECONNECT_BACKOFF_MULTIPLIER) as u32 + ); + + // Monotonic non-decreasing and never above the cap. + let mut prev = Duration::ZERO; + for attempt in 1..=RECONNECT_ATTEMPTS { + let nominal = reconnect_backoff_nominal(attempt); + assert!(nominal >= prev, "backoff must not decrease (attempt {attempt})"); + assert!(nominal <= RECONNECT_MAX_DELAY, "backoff must not exceed the cap"); + prev = nominal; + } + + // Late attempts are pinned to the cap, and large attempt indices don't + // overflow into a wrapped-around small value. + assert_eq!(reconnect_backoff_nominal(RECONNECT_ATTEMPTS), RECONNECT_MAX_DELAY); + assert_eq!(reconnect_backoff_nominal(u32::MAX), RECONNECT_MAX_DELAY); + } + + #[test] + fn backoff_delay_stays_within_nominal_jitter_window() { + // Full jitter: every sample must land within [0, nominal(attempt)]. + for attempt in 1..=RECONNECT_ATTEMPTS { + let nominal = reconnect_backoff_nominal(attempt); + for _ in 0..1000 { + let delay = reconnect_backoff_delay(attempt); + assert!( + delay <= nominal, + "jittered delay {delay:?} exceeded nominal {nominal:?} (attempt {attempt})" + ); + } + } + } } diff --git a/livekit/tests/reconnection_test.rs b/livekit/tests/reconnection_test.rs new file mode 100644 index 000000000..9e268accd --- /dev/null +++ b/livekit/tests/reconnection_test.rs @@ -0,0 +1,112 @@ +// Copyright 2025 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Reconnection Tests +//! +//! Exercises the engine reconnection paths end-to-end against a running +//! `livekit-server --dev` (or LiveKit Cloud), via [`Room::simulate_scenario`]: +//! +//! - `SignalReconnect` drives the lightweight *resume* path. +//! - `FullReconnect` asks the server to issue `LeaveRequest{Reconnect}`, forcing +//! a *full* reconnect (new session, republish). +//! +//! Both should surface `Reconnecting` then `Reconnected` and return the room to +//! `Connected`. These guard the lifecycle-event and recovery behaviour the +//! engine's reconnect loop is responsible for. +//! +//! Environment variables (same as the other e2e suites): +//! - LIVEKIT_URL (default ws://localhost:7880) +//! - LIVEKIT_API_KEY (default "devkey") +//! - LIVEKIT_API_SECRET (default "secret") +//! +//! Run: +//! cargo test -p livekit --features "__lk-e2e-test,native-tls" --test reconnection_test -- --nocapture + +#![cfg(feature = "__lk-e2e-test")] + +#[cfg(feature = "__lk-e2e-test")] +use { + anyhow::{anyhow, bail, Result}, + common::test_rooms, + livekit::{ConnectionState, Room, RoomEvent, SimulateScenario}, + std::time::Duration, + tokio::{sync::mpsc::UnboundedReceiver, time::timeout}, +}; + +mod common; + +/// Drives a reconnection via `scenario` and asserts the room reports +/// `Reconnecting`, then `Reconnected`, and ends up `Connected` again. +#[cfg(feature = "__lk-e2e-test")] +async fn assert_recovers( + room: Room, + mut events: UnboundedReceiver, + scenario: SimulateScenario, +) -> Result<()> { + assert_eq!(room.connection_state(), ConnectionState::Connected); + + // Kick off the reconnection. These scenarios return promptly (they close the + // local signal channel or ask the server to issue a Leave); recovery then + // proceeds asynchronously and surfaces as room events, which are buffered on + // an unbounded channel until we observe them below. + room.simulate_scenario(scenario).await.map_err(|e| anyhow!("simulate_scenario failed: {e:?}"))?; + + // Expect Reconnecting, then Reconnected. Ignore unrelated events in between. + let observe = async { + let mut saw_reconnecting = false; + while let Some(event) = events.recv().await { + match event { + RoomEvent::Reconnecting => saw_reconnecting = true, + RoomEvent::Reconnected => { + if !saw_reconnecting { + bail!("received Reconnected without a preceding Reconnecting"); + } + return Ok(()); + } + RoomEvent::Disconnected { reason } => { + bail!("room disconnected during reconnection: {reason:?}"); + } + _ => {} + } + } + bail!("event stream ended before the room reconnected"); + }; + + timeout(Duration::from_secs(30), observe).await??; + + assert_eq!( + room.connection_state(), + ConnectionState::Connected, + "room should be Connected after recovery" + ); + Ok(()) +} + +#[cfg(feature = "__lk-e2e-test")] +#[test_log::test(tokio::test)] +async fn test_signal_reconnect_resumes() -> Result<()> { + let (room, events) = test_rooms(1).await?.pop().unwrap(); + assert_recovers(room, events, SimulateScenario::SignalReconnect).await +} + +// `ForceTcp` injects a client-side `Leave{Reconnect}`, deterministically +// driving a *full* reconnect (new session, republish) without relying on the +// server to echo a leave — unlike `FullReconnect`, whose server-side simulation +// is not honoured by every server build. +#[cfg(feature = "__lk-e2e-test")] +#[test_log::test(tokio::test)] +async fn test_force_reconnect_does_full_reconnect() -> Result<()> { + let (room, events) = test_rooms(1).await?.pop().unwrap(); + assert_recovers(room, events, SimulateScenario::ForceTcp).await +} From ef983628f107deef86343c31d0bf731b12ef0fde Mon Sep 17 00:00:00 2001 From: lukasIO Date: Tue, 9 Jun 2026 11:35:17 +0200 Subject: [PATCH 03/16] fix region fetch behaviour --- livekit-api/src/signal_client/mod.rs | 30 +++++++++++--------- livekit/specs/signalling-reconnection.allium | 15 ++++++---- 2 files changed, 25 insertions(+), 20 deletions(-) diff --git a/livekit-api/src/signal_client/mod.rs b/livekit-api/src/signal_client/mod.rs index 5d4f16256..0dfb85f9e 100644 --- a/livekit-api/src/signal_client/mod.rs +++ b/livekit-api/src/signal_client/mod.rs @@ -193,22 +193,28 @@ impl SignalClient { log::error!("unexpected signal error: {}", err.to_string()); } - // Region fallback is best-effort. `fetch_region_urls` already - // returns an empty list for non-cloud (direct / self-hosted) - // URLs, so those skip the fallback entirely. If the region fetch - // itself fails (e.g. the region endpoint is unreachable), we must - // NOT mask the original connection error with the fetch error — - // surface the real reason the connection failed instead. + // Fetching region URLs is best-effort. `fetch_region_urls` + // already returns an empty list for non-cloud (direct / + // self-hosted) URLs, so those skip the fallback entirely. If the + // fetch itself fails (e.g. the region endpoint is unreachable), + // that must NOT be fatal: log a warning and fall back to the + // original connection error rather than masking it with the + // fetch error. let urls = match RegionUrlProvider::fetch_region_urls(url, token).await { Ok(urls) => urls, Err(region_err) => { - log::debug!( - "region url fetch failed ({region_err}); surfacing original connection error" + log::warn!( + "failed to fetch region urls: {region_err}; surfacing original connection error" ); return Err(err); } }; + // With no region URLs to try, this surfaces the original error. + // Otherwise we keep the most recent region attempt error, so that + // if every region fails the caller sees why the last region + // connection failed. + let mut last_err = err; for url in urls.iter() { log::info!("fallback connection to: {}", url); match SignalInner::connect(url, token, options.clone(), publisher_offer.clone()) @@ -217,15 +223,11 @@ impl SignalClient { Ok((inner, join_response, stream_events)) => { return Ok(handle_success(inner, join_response, stream_events)) } - Err(region_conn_err) => { - log::warn!("region fallback connection to {url} failed: {region_conn_err}") - } + Err(region_conn_err) => last_err = region_conn_err, } } - // Every region URL failed (or there were none): surface the - // ORIGINAL primary connection error, not the last region's error. - Err(err) + Err(last_err) } } } diff --git a/livekit/specs/signalling-reconnection.allium b/livekit/specs/signalling-reconnection.allium index c6e1d6d49..809557c24 100644 --- a/livekit/specs/signalling-reconnection.allium +++ b/livekit/specs/signalling-reconnection.allium @@ -241,8 +241,9 @@ rule RegionFetchFails { when: RegionFetchFailed(original_error) ensures: SignalConnectRejected(original_error) @guidance - -- DELTA 5: a failed region-URL fetch must NEVER mask the original - -- connection error (today the fetch error is surfaced instead). + -- DELTA 5: failing to FETCH region URLs is best-effort, never fatal. It + -- is logged as a warning and the ORIGINAL connection error is surfaced, + -- rather than masking it with the fetch error (today's `?` is fatal). } rule RegionFetchSucceeds { @@ -253,11 +254,13 @@ rule RegionFetchSucceeds { } rule AllRegionsFail { - when: RegionConnectsExhausted(original_error) - ensures: SignalConnectRejected(original_error) + when: RegionConnectsExhausted(last_region_error) + ensures: SignalConnectRejected(last_region_error) @guidance - -- DELTA 5: when every region URL has failed, surface the ORIGINAL - -- primary connection error rather than the last region's error. + -- When region URLs WERE fetched and tried but every one failed, surface + -- the LAST region connection error -- that is the most relevant reason a + -- region attempt failed. (Distinct from RegionFetchFails, which is about + -- failing to fetch the list at all.) } -- === Signalling-link liveness & lifecycle =================================== From 1e1b65a371f30128fc38d3add6cbd9fb47514487 Mon Sep 17 00:00:00 2001 From: lukasIO Date: Tue, 9 Jun 2026 12:05:21 +0200 Subject: [PATCH 04/16] fix full reconnect case --- livekit/src/rtc_engine/mod.rs | 5 +++-- livekit/src/rtc_engine/rtc_session.rs | 19 ++++++++++++------- livekit/tests/reconnection_test.rs | 10 ++++------ 3 files changed, 19 insertions(+), 15 deletions(-) diff --git a/livekit/src/rtc_engine/mod.rs b/livekit/src/rtc_engine/mod.rs index afb0e8a39..e2f17e686 100644 --- a/livekit/src/rtc_engine/mod.rs +++ b/livekit/src/rtc_engine/mod.rs @@ -110,8 +110,9 @@ pub enum SimulateScenario { Migration, ForceTcp, ForceTls, - /// Tells the server to issue a `LeaveRequest{Reconnect}`, forcing a - /// full reconnect (new RtcSession, republish required). + /// Client-driven full reconnect: forces the next reconnect to be a full + /// reconnect (new RtcSession, republish required) and triggers it locally, + /// without relying on the server. Mirrors client-sdk-js's `full-reconnect`. FullReconnect, } diff --git a/livekit/src/rtc_engine/rtc_session.rs b/livekit/src/rtc_engine/rtc_session.rs index eb8d12a8f..17ed8dd54 100644 --- a/livekit/src/rtc_engine/rtc_session.rs +++ b/livekit/src/rtc_engine/rtc_session.rs @@ -1855,13 +1855,18 @@ impl SessionInner { .await? } SimulateScenario::FullReconnect => { - self.signal_client - .send(proto::signal_request::Message::Simulate(proto::SimulateScenario { - scenario: Some( - proto::simulate_scenario::Scenario::LeaveRequestFullReconnect(true), - ), - })) - .await; + // Client-driven full reconnect, mirroring client-sdk-js's + // `full-reconnect` scenario: force the next reconnect to be a + // full reconnect and trigger it locally, rather than asking the + // server to echo a Leave. The server-side + // `LeaveRequestFullReconnect` simulation is not honoured by every + // server build, so relying on it makes this scenario flaky. + self.on_signal_event(proto::signal_response::Message::Leave(proto::LeaveRequest { + action: proto::leave_request::Action::Reconnect.into(), + reason: DisconnectReason::ClientInitiated as i32, + ..Default::default() + })) + .await? } } Ok(()) diff --git a/livekit/tests/reconnection_test.rs b/livekit/tests/reconnection_test.rs index 9e268accd..645768c1f 100644 --- a/livekit/tests/reconnection_test.rs +++ b/livekit/tests/reconnection_test.rs @@ -100,13 +100,11 @@ async fn test_signal_reconnect_resumes() -> Result<()> { assert_recovers(room, events, SimulateScenario::SignalReconnect).await } -// `ForceTcp` injects a client-side `Leave{Reconnect}`, deterministically -// driving a *full* reconnect (new session, republish) without relying on the -// server to echo a leave — unlike `FullReconnect`, whose server-side simulation -// is not honoured by every server build. +// `FullReconnect` forces a full reconnect (new session, republish) — driven +// client-side, so it does not depend on the server echoing a leave. #[cfg(feature = "__lk-e2e-test")] #[test_log::test(tokio::test)] -async fn test_force_reconnect_does_full_reconnect() -> Result<()> { +async fn test_full_reconnect_recovers() -> Result<()> { let (room, events) = test_rooms(1).await?.pop().unwrap(); - assert_recovers(room, events, SimulateScenario::ForceTcp).await + assert_recovers(room, events, SimulateScenario::FullReconnect).await } From 3782e84817d3851fe8d528702325af23fad7b3e6 Mon Sep 17 00:00:00 2001 From: lukasIO Date: Tue, 9 Jun 2026 12:17:42 +0200 Subject: [PATCH 05/16] resume reconnect --- livekit/src/rtc_engine/mod.rs | 5 +++++ livekit/src/rtc_engine/rtc_session.rs | 15 +++++++++++++++ livekit/tests/reconnection_test.rs | 12 ++++++++++++ 3 files changed, 32 insertions(+) diff --git a/livekit/src/rtc_engine/mod.rs b/livekit/src/rtc_engine/mod.rs index e2f17e686..ce193c1fa 100644 --- a/livekit/src/rtc_engine/mod.rs +++ b/livekit/src/rtc_engine/mod.rs @@ -114,6 +114,11 @@ pub enum SimulateScenario { /// reconnect (new RtcSession, republish required) and triggers it locally, /// without relying on the server. Mirrors client-sdk-js's `full-reconnect`. FullReconnect, + /// Asks the server to drop the signalling connection during the next resume, + /// then triggers a resume locally. The resume cannot complete, so the engine + /// escalates to a full reconnect — exercising the resume→full escalation + /// path. Mirrors client-sdk-js's `disconnect-signal-on-resume`. + DisconnectSignalOnResume, } #[derive(Error, Debug)] diff --git a/livekit/src/rtc_engine/rtc_session.rs b/livekit/src/rtc_engine/rtc_session.rs index 17ed8dd54..8dd9d9d41 100644 --- a/livekit/src/rtc_engine/rtc_session.rs +++ b/livekit/src/rtc_engine/rtc_session.rs @@ -1790,6 +1790,21 @@ impl SessionInner { SimulateScenario::SignalReconnect => { self.signal_client.close().await; } + SimulateScenario::DisconnectSignalOnResume => { + // Tell the server to drop the signalling link during the next + // resume, then trigger a resume by closing the link locally. The + // server kills the resumed signal, so the resume fails and the + // engine escalates to a full reconnect. Mirrors client-sdk-js's + // `disconnect-signal-on-resume`. + self.signal_client + .send(proto::signal_request::Message::Simulate(proto::SimulateScenario { + scenario: Some( + proto::simulate_scenario::Scenario::DisconnectSignalOnResume(true), + ), + })) + .await; + self.signal_client.close().await; + } SimulateScenario::Speaker => { self.signal_client .send(proto::signal_request::Message::Simulate(proto::SimulateScenario { diff --git a/livekit/tests/reconnection_test.rs b/livekit/tests/reconnection_test.rs index 645768c1f..728dc93a2 100644 --- a/livekit/tests/reconnection_test.rs +++ b/livekit/tests/reconnection_test.rs @@ -108,3 +108,15 @@ async fn test_full_reconnect_recovers() -> Result<()> { let (room, events) = test_rooms(1).await?.pop().unwrap(); assert_recovers(room, events, SimulateScenario::FullReconnect).await } + +// The server drops the signalling link during the resume, so the resume cannot +// complete and the engine must escalate to a full reconnect. Recovery here is +// only possible via that escalation, so a successful Reconnecting → Reconnected +// exercises the resume→full path (and the Restarting emitted on escalation, +// which drives the Room's remote-participant cleanup before the full reconnect). +#[cfg(feature = "__lk-e2e-test")] +#[test_log::test(tokio::test)] +async fn test_resume_failure_escalates_to_full_reconnect() -> Result<()> { + let (room, events) = test_rooms(1).await?.pop().unwrap(); + assert_recovers(room, events, SimulateScenario::DisconnectSignalOnResume).await +} From 6ed56db6f250dd9974c2721adf24f65bd616b865 Mon Sep 17 00:00:00 2001 From: lukasIO Date: Tue, 9 Jun 2026 12:22:13 +0200 Subject: [PATCH 06/16] add exhaustion test --- livekit/tests/reconnection_test.rs | 115 ++++++++++++++++++++++++++++- 1 file changed, 112 insertions(+), 3 deletions(-) diff --git a/livekit/tests/reconnection_test.rs b/livekit/tests/reconnection_test.rs index 728dc93a2..7d0795c41 100644 --- a/livekit/tests/reconnection_test.rs +++ b/livekit/tests/reconnection_test.rs @@ -39,9 +39,15 @@ use { anyhow::{anyhow, bail, Result}, common::test_rooms, - livekit::{ConnectionState, Room, RoomEvent, SimulateScenario}, - std::time::Duration, - tokio::{sync::mpsc::UnboundedReceiver, time::timeout}, + libwebrtc::native::create_random_uuid, + livekit::{ConnectionState, Room, RoomEvent, RoomOptions, SimulateScenario}, + livekit_api::access_token::{AccessToken, VideoGrants}, + std::{env, net::SocketAddr, time::Duration}, + tokio::{ + net::{TcpListener, TcpStream}, + sync::{mpsc::UnboundedReceiver, watch}, + time::timeout, + }, }; mod common; @@ -120,3 +126,106 @@ async fn test_resume_failure_escalates_to_full_reconnect() -> Result<()> { let (room, events) = test_rooms(1).await?.pop().unwrap(); assert_recovers(room, events, SimulateScenario::DisconnectSignalOnResume).await } + +/// A minimal TCP proxy in front of the signalling server that can be killed. +/// Sending `true` on the returned channel closes the in-flight connection and +/// stops accepting, so the client's reconnect attempts all fail (connection +/// refused). Used to drive the engine's reconnect loop to exhaustion. +#[cfg(feature = "__lk-e2e-test")] +async fn start_killable_proxy(target_host_port: String) -> (SocketAddr, watch::Sender) { + let listener = TcpListener::bind("127.0.0.1:0").await.expect("bind proxy"); + let addr = listener.local_addr().expect("proxy addr"); + let (kill_tx, kill_rx) = watch::channel(false); + + tokio::spawn(async move { + loop { + let mut kr = kill_rx.clone(); + tokio::select! { + _ = kr.changed() => break, // kill: stop accepting; dropping `listener` refuses new connects + accepted = listener.accept() => { + let Ok((mut inbound, _)) = accepted else { break }; + let target = target_host_port.clone(); + let mut kr2 = kill_rx.clone(); + tokio::spawn(async move { + if let Ok(mut outbound) = TcpStream::connect(&target).await { + tokio::select! { + _ = kr2.changed() => {} // kill: drop both streams, severing the client link + _ = tokio::io::copy_bidirectional(&mut inbound, &mut outbound) => {} + } + } + }); + } + } + } + }); + + (addr, kill_tx) +} + +// When every reconnect attempt fails, the engine must exhaust its bounded +// retries and emit Disconnected — it must NOT stay stuck in Reconnecting +// forever. We connect through a killable proxy, kill it, and assert the room +// reaches Disconnected after a reconnection was attempted. (The reason is +// UnknownReason here because a dropped signal link carries no richer cause; the +// #2b improvement surfaces a meaningful cause only when the triggering event +// has one.) Slow by design: it waits out the full bounded backoff sequence. +#[cfg(feature = "__lk-e2e-test")] +#[test_log::test(tokio::test)] +async fn test_reconnect_exhaustion_disconnects() -> Result<()> { + let api_key = env::var("LIVEKIT_API_KEY").unwrap_or_else(|_| "devkey".into()); + let api_secret = env::var("LIVEKIT_API_SECRET").unwrap_or_else(|_| "secret".into()); + let server_url = env::var("LIVEKIT_URL").unwrap_or_else(|_| "ws://localhost:7880".into()); + + // Derive host:port for raw TCP forwarding (the WS upgrade rides over it). + let target = server_url + .split("://") + .last() + .and_then(|rest| rest.split('/').next()) + .unwrap_or("localhost:7880") + .to_string(); + + let (proxy_addr, kill) = start_killable_proxy(target).await; + let proxy_url = format!("ws://{proxy_addr}"); + + let room_name = format!("test_room_{}", create_random_uuid()); + let token = AccessToken::with_api_key(&api_key, &api_secret) + .with_ttl(Duration::from_secs(30 * 60)) + .with_grants(VideoGrants { room_join: true, room: room_name, ..Default::default() }) + .with_identity("p0") + .with_name("Participant 0") + .to_jwt()?; + + let (room, mut events) = Room::connect(&proxy_url, &token, RoomOptions::default()).await?; + assert_eq!(room.connection_state(), ConnectionState::Connected); + + // Sever the link and refuse all reconnects. + kill.send(true).ok(); + + let observe = async { + let mut saw_reconnecting = false; + while let Some(event) = events.recv().await { + match event { + RoomEvent::Reconnecting => saw_reconnecting = true, + RoomEvent::Disconnected { reason } => { + if !saw_reconnecting { + bail!("disconnected without attempting reconnection first"); + } + return Ok(reason); + } + _ => {} + } + } + bail!("event stream ended before the room reported Disconnected"); + }; + + // Generous timeout: the engine works through its full bounded backoff before + // giving up. + let _reason = timeout(Duration::from_secs(90), observe).await??; + + assert_eq!( + room.connection_state(), + ConnectionState::Disconnected, + "room must reach Disconnected after reconnection is exhausted, not hang in Reconnecting" + ); + Ok(()) +} From f3242cd13ec3dd719147f567fcda6893d46d5371 Mon Sep 17 00:00:00 2001 From: lukasIO Date: Tue, 9 Jun 2026 12:25:29 +0200 Subject: [PATCH 07/16] Create lukas_reconnect.md --- .changeset/lukas_reconnect.md | 8 ++++++++ 1 file changed, 8 insertions(+) create mode 100644 .changeset/lukas_reconnect.md diff --git a/.changeset/lukas_reconnect.md b/.changeset/lukas_reconnect.md new file mode 100644 index 000000000..35377a265 --- /dev/null +++ b/.changeset/lukas_reconnect.md @@ -0,0 +1,8 @@ +--- +livekit: patch +livekit-api: patch +livekit-ffi: patch +livekit-uniffi: patch +--- + +harden reconnect behaviour - #1148 (@lukasIO) From f5c42f1037ee1d6848e15d12118eb1bf8f3155c3 Mon Sep 17 00:00:00 2001 From: lukasIO Date: Tue, 9 Jun 2026 15:05:08 +0200 Subject: [PATCH 08/16] sync signalling-reconnection spec to latest --- livekit/specs/signalling-reconnection.allium | 106 ++++++++++++++----- 1 file changed, 80 insertions(+), 26 deletions(-) diff --git a/livekit/specs/signalling-reconnection.allium b/livekit/specs/signalling-reconnection.allium index 809557c24..9ff110582 100644 --- a/livekit/specs/signalling-reconnection.allium +++ b/livekit/specs/signalling-reconnection.allium @@ -22,6 +22,25 @@ -- Boundaries: peer-connection / media recovery (RtcSession) and the Room are -- external; modelled via the MediaLayer / Application surfaces below. -- +-- Structure (two layers, kept in one spec — they share the SignalConnection +-- entity, the connect/region/validate path, the auth classification and +-- DisconnectCause, so splitting would force cross-spec entity sharing for little +-- gain; the modular seams are expressed as boundary surfaces instead): +-- +-- LAYER 1 — Signalling link (mirrors livekit-api/signal_client) +-- entity SignalConnection; rules under "Layer 1" below: +-- * Establishment — initial connect, region fallback, auth rejection. +-- (The engine-side join-retries loop is config-level, see join_retries; +-- its only modelled outcome is auth fast-fail, noted there.) +-- * Liveness & lifecycle — ping/timeout, resume, send/queue, token, close. +-- boundary surfaces: SignalCommands (app), ServerSignalling (SFU). +-- +-- LAYER 2 — Engine reconnect orchestration (mirrors rtc_engine) +-- entity EngineConnection; rules under "Layer 2" below: initial connection, +-- reconnection causes, start/escalate, per-attempt dispatch, the decoupled +-- resume chain, and attempt outcomes (incl. terminal/non-retryable failures). +-- boundary surfaces: MediaRecovery (RtcSession), RoomConnectionLifecycle (Room). +-- -- Checker note (allium 3.2.4): `allium check` reports residual `status` -- warnings that are CHECKER ARTIFACTS, not spec defects -- every state flagged -- "never assigned" is plainly assigned by a visible `ensures`. Three confirmed @@ -85,7 +104,7 @@ enum DisconnectCause { | peer_connection_failed | server_leave | signal_severed_during_resume - | reconnect_attempts_exhausted + | unauthorized | client_initiated | unknown } @@ -121,9 +140,11 @@ entity SignalConnection { -- The engine that drives this link's recovery. engine: EngineConnection with signal = this - -- The link is dead if nothing arrived within ping_timeout. Used by the - -- resume's mid-flight liveness re-check (ResumeRechecksLink). - is_alive: last_message_at != null and last_message_at + ping_timeout > now + -- The link is present and usable while connected or mid-resume; `lost` or + -- `closed` mean the stream is gone. Used by the resume's mid-flight re-check + -- (ResumeRechecksLink) and mirrors the implementation's `is_connected()` + -- (stream-present) check rather than ping-timeout liveness. + is_connected: status in {connected, reconnecting} transitions status { connected -> lost -- ping timeout / stream closed @@ -157,6 +178,10 @@ entity EngineConnection { -- (A `permitted -> revoked` terminal transition graph would express this -- structurally, but the entity's `status` graph below takes the single graph -- slot the current checker honours; see the spec header note.) + -- NOTE (spec<->code): the implementation realises this as a `can_reconnect` + -- bool that is only ever set false (never back to true) within a session, + -- always alongside a close() — i.e. it already satisfies this latch and the + -- RevokedImpliesClosed invariant; the enum is the spec-level model. reconnect_permission: permitted | revoked -- 1-based attempt index within the current episode; 0 when connected. @@ -183,7 +208,10 @@ entity EngineConnection { ------------------------------------------------------------ config { - -- Initial connect: extra attempts after the first (Room default). + -- Initial connect: extra attempts after the first (Room default). A + -- validated auth failure (HTTP 401/403, DisconnectCause unauthorized) is + -- surfaced immediately and does NOT consume these retries (R4.2), matching + -- the reconnect loop's terminal-failure handling. join_retries: Integer = 3 -- Per signalling-link connect attempt timeout (SIGNAL_CONNECT_TIMEOUT). connect_timeout: Duration = 5.seconds @@ -205,6 +233,10 @@ config { -- Rules ------------------------------------------------------------ +-- ########################################################################### +-- ## LAYER 1 — Signalling link (mirrors livekit-api/signal_client) +-- ########################################################################### + -- === Establishment (DELTA 5: region fallback) =============================== rule SignalConnectionEstablished { @@ -226,6 +258,9 @@ rule PrimaryConnectFailsOnCloud { ensures: RegionFetchRequested(error) @guidance -- DELTA 5: only LiveKit Cloud hosts attempt region fallback. + -- DELTA R3.2: the region list is cached per host with a TTL (default + -- 5s), so a RegionFetchRequested within the TTL is served from cache + -- instead of re-paying the network fetch on every reconnect attempt. } rule PrimaryConnectFailsOnDirect { @@ -337,6 +372,10 @@ rule CloseSignalLink { ensures: connection.status = closed } +-- ########################################################################### +-- ## LAYER 2 — Engine reconnect orchestration (mirrors rtc_engine) +-- ########################################################################### + -- === Engine: initial connection ============================================= rule EngineConnects { @@ -434,6 +473,10 @@ rule EscalateReconnect { -- event matching the mode it is in (previously the Room saw Resuming then -- Restarted with no Restarting between). retry_now restarts the backoff so -- the in-flight loop's next attempt fires immediately. + -- NOTE (spec<->code): the implementation emits Restarting lazily, on the + -- next attempt the loop runs in full mode (a `restarting_emitted` latch), + -- rather than eagerly at this escalation event. Net effect is identical + -- (Restarting always precedes the full reconnect). } -- === Engine: per-attempt dispatch =========================================== @@ -486,10 +529,10 @@ rule ResumeAwaitsPeerConnections { rule ResumeRechecksLink { when: PeerConnectionsReconnected(engine) requires: engine.status = reconnecting - if not engine.signal.is_alive: + if not engine.signal.is_connected: ensures: ResumeAttemptFailed( engine, - server_disconnect: false, + terminal: false, cause: signal_severed_during_resume ) else: @@ -519,28 +562,32 @@ rule ResumeAttemptSucceeds { ensures: EngineResumed(engine) } -rule ResumeAttemptDisconnects { - when: ResumeAttemptFailed(engine, server_disconnect, cause) +rule ResumeAttemptFailsTerminally { + when: ResumeAttemptFailed(engine, terminal, cause) requires: engine.status = reconnecting - requires: server_disconnect + requires: terminal ensures: engine.reconnect_permission = revoked ensures: engine.status = closed ensures: EngineDisconnected(engine, cause: cause) @guidance - -- A resume that failed because the server sent Leave{Disconnect}: stop, - -- do not escalate or retry. + -- A resume that failed in a NON-RETRYABLE way: stop, do not escalate or + -- retry. Terminal failures are (a) a server Leave{Disconnect} + -- (cause = server_leave) and (b) DELTA R4.2: an authentication failure + -- (cause = unauthorized, a server-validated HTTP 401/403) — the same + -- token will not succeed on retry, so retrying would just hammer the + -- server. `cause` carries which one for the Room. } rule ResumeAttemptEscalates { - when: ResumeAttemptFailed(engine, server_disconnect, cause) + when: ResumeAttemptFailed(engine, terminal, cause) requires: engine.status = reconnecting - requires: not server_disconnect + requires: not terminal ensures: engine.mode = full ensures: EngineRestarting(engine) ensures: RetryReconnect(engine) @guidance - -- DELTA 2: a failed resume escalates to full reconnect AND emits - -- Restarting on the escalation (previously silent), then retries. + -- DELTA 2: a (retryable) failed resume escalates to full reconnect AND + -- emits Restarting on the escalation (previously silent), then retries. } rule RestartAttemptSucceeds { @@ -555,19 +602,22 @@ rule RestartAttemptSucceeds { -- the old only on success, so a failed attempt leaves the old usable. } -rule RestartAttemptDisconnects { - when: RestartAttemptFailed(engine, server_disconnect, cause) +rule RestartAttemptFailsTerminally { + when: RestartAttemptFailed(engine, terminal, cause) requires: engine.status = reconnecting - requires: server_disconnect + requires: terminal ensures: engine.reconnect_permission = revoked ensures: engine.status = closed ensures: EngineDisconnected(engine, cause: cause) + @guidance + -- Non-retryable full-reconnect failure: a server Leave{Disconnect} or + -- (DELTA R4.2) a validated auth failure (cause = unauthorized). Stop. } rule RestartAttemptRetries { - when: RestartAttemptFailed(engine, server_disconnect, cause) + when: RestartAttemptFailed(engine, terminal, cause) requires: engine.status = reconnecting - requires: not server_disconnect + requires: not terminal ensures: RetryReconnect(engine) } @@ -587,12 +637,16 @@ rule ReconnectExhausted { requires: engine.status = reconnecting requires: engine.attempt >= config.max_reconnect_attempts ensures: engine.status = closed - ensures: EngineDisconnected(engine, cause: reconnect_attempts_exhausted) + ensures: EngineDisconnected(engine, cause: engine.reconnect_cause) @guidance - -- DELTA 2: on giving up, EngineDisconnected reports - -- reconnect_attempts_exhausted; the engine's reconnect_cause also remains - -- available to the Room for the underlying reason, instead of `unknown`. - -- Always emitted so the Room leaves the Reconnecting state. + -- DELTA 2: on giving up, EngineDisconnected reports the cause that STARTED + -- this episode (engine.reconnect_cause), threaded through instead of a + -- generic `unknown`. It is always emitted so the Room leaves Reconnecting + -- rather than hanging there. + -- NOTE (spec<->code): there is no dedicated "attempts exhausted" disconnect + -- reason in the wire protocol (proto DisconnectReason), so the original + -- cause is what surfaces. A distinct exhausted reason would require a + -- protocol addition. } ------------------------------------------------------------ From 049a4ede7b822d61b0cfff07f22645b57dc279b9 Mon Sep 17 00:00:00 2001 From: lukasIO Date: Fri, 19 Jun 2026 10:53:30 +0200 Subject: [PATCH 09/16] Add DisconnectSignalOnResume scenario to FFI variants --- livekit-ffi/protocol/room.proto | 4 ++++ livekit-ffi/src/server/requests.rs | 3 +++ 2 files changed, 7 insertions(+) diff --git a/livekit-ffi/protocol/room.proto b/livekit-ffi/protocol/room.proto index 08b051084..eb59dcba2 100644 --- a/livekit-ffi/protocol/room.proto +++ b/livekit-ffi/protocol/room.proto @@ -84,6 +84,10 @@ enum SimulateScenarioKind { // Asks the server to send `LeaveRequest{Reconnect}`, forcing a full // reconnect (new RtcSession; SDK republishes existing local tracks). SIMULATE_FULL_RECONNECT = 7; + // Asks the server to drop the signalling connection during the next resume, + // then triggers a resume locally. The resume cannot complete, so the engine + // escalates to a full reconnect — exercising the resume→full escalation path. + SIMULATE_DISCONNECT_SIGNAL_ON_RESUME = 8; } message SimulateScenarioRequest { required uint64 room_handle = 1; diff --git a/livekit-ffi/src/server/requests.rs b/livekit-ffi/src/server/requests.rs index 57c1e38d7..bb94f6232 100644 --- a/livekit-ffi/src/server/requests.rs +++ b/livekit-ffi/src/server/requests.rs @@ -99,6 +99,9 @@ fn on_simulate_scenario( proto::SimulateScenarioKind::SimulateForceTcp => SimulateScenario::ForceTcp, proto::SimulateScenarioKind::SimulateForceTls => SimulateScenario::ForceTls, proto::SimulateScenarioKind::SimulateFullReconnect => SimulateScenario::FullReconnect, + proto::SimulateScenarioKind::SimulateDisconnectSignalOnResume => { + SimulateScenario::DisconnectSignalOnResume + } }; let ffi_room = server.retrieve_handle::(request.room_handle)?.clone(); From e3320d3af251eb607bbcea1d4fec914cbd8bd5a4 Mon Sep 17 00:00:00 2001 From: lukasIO Date: Fri, 19 Jun 2026 12:04:04 +0200 Subject: [PATCH 10/16] notify --- livekit/src/rtc_engine/mod.rs | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/livekit/src/rtc_engine/mod.rs b/livekit/src/rtc_engine/mod.rs index ce193c1fa..116c2ac10 100644 --- a/livekit/src/rtc_engine/mod.rs +++ b/livekit/src/rtc_engine/mod.rs @@ -738,6 +738,14 @@ impl EngineInner { // exhausts all attempts leaves the room stuck in Reconnecting forever because // the room's task never sees the event that drives `handle_disconnected`. let _ = self.engine_tx.send(EngineEvent::Disconnected { reason }); + + // Signal any in-flight reconnect loop to stop. The reconnect task selects + // on `close_notifier`, both at the top-level (cancelling the whole task) + // and within its backoff wait (breaking the loop early). We notify LAST, + // after teardown has completed: the reconnect loop's own bail paths call + // `close()` from inside the task, so notifying earlier could let the + // top-level select drop the task mid-`close()` and leave teardown partial. + self.close_notifier.notify_waiters(); } /// When waiting for reconnection, it ensures we're always using the latest session. @@ -808,7 +816,13 @@ impl EngineInner { tokio::select! { _ = &mut close_receiver => { + // The engine was closed; abandon the reconnect attempt. + // Clear `reconnecting` (the success/failure path below does + // this after the select; this branch returns early so it + // must do so itself) to avoid leaving a closed engine stuck + // with reconnecting = true. log::debug!("reconnection cancelled"); + inner.running_handle.write().reconnecting = false; return; } res = inner.reconnect_task() => { @@ -948,13 +962,18 @@ impl EngineInner { // Exponential backoff with full jitter between attempts (DELTA 3). // A server-requested reconnect signals retry_now_notify to collapse - // this wait so the next attempt fires immediately. + // this wait so the next attempt fires immediately; a close signals + // close_notifier to break out of the loop early (the next iteration's + // `is_closed` check then returns) instead of waiting out the backoff. let backoff = reconnect_backoff_delay(i); tokio::select! { _ = livekit_runtime::sleep(backoff) => {} _ = self.retry_now_notify.notified() => { log::debug!("retry_now signalled, skipping reconnect backoff"); } + _ = self.close_notifier.notified() => { + log::debug!("engine closed, cancelling reconnect backoff"); + } } } From 965fa6972b663d88072274fdddb3777b95809ae3 Mon Sep 17 00:00:00 2001 From: lukasIO Date: Fri, 19 Jun 2026 12:30:53 +0200 Subject: [PATCH 11/16] move reconnect behaviour into own module --- livekit/src/rtc_engine/mod.rs | 87 ++--------------- livekit/src/rtc_engine/reconnect_strategy.rs | 98 ++++++++++++++++++++ 2 files changed, 106 insertions(+), 79 deletions(-) create mode 100644 livekit/src/rtc_engine/reconnect_strategy.rs diff --git a/livekit/src/rtc_engine/mod.rs b/livekit/src/rtc_engine/mod.rs index 116c2ac10..e51058248 100644 --- a/livekit/src/rtc_engine/mod.rs +++ b/livekit/src/rtc_engine/mod.rs @@ -22,7 +22,7 @@ use std::{ borrow::Cow, fmt::Debug, sync::Arc, - time::{Duration, SystemTime, UNIX_EPOCH}, + time::Duration, }; use thiserror::Error; use tokio::sync::{ @@ -47,47 +47,19 @@ use crate::{ChatMessage, E2eeManager, TranscriptionSegment}; mod dc_sender; pub mod lk_runtime; mod peer_transport; +mod reconnect_strategy; mod rtc_events; mod rtc_session; +// Re-exported to preserve the public `rtc_engine::RECONNECT_*` paths. +pub use reconnect_strategy::{ + RECONNECT_ATTEMPTS, RECONNECT_BACKOFF_MULTIPLIER, RECONNECT_BASE_DELAY, RECONNECT_MAX_DELAY, +}; + pub(crate) type EngineEmitter = mpsc::UnboundedSender; pub(crate) type EngineEvents = mpsc::UnboundedReceiver; pub(crate) type EngineResult = Result; -pub const RECONNECT_ATTEMPTS: u32 = 10; -pub const RECONNECT_INTERVAL: Duration = Duration::from_secs(5); - -/// Exponential-backoff-with-full-jitter parameters for spacing reconnect -/// attempts. The per-attempt delay is sampled uniformly from -/// `[0, min(RECONNECT_MAX_DELAY, RECONNECT_BASE_DELAY * MULTIPLIER^(attempt-1))]`. -/// This replaces the previous fixed [`RECONNECT_INTERVAL`] spacing: it recovers -/// faster from transient blips and spreads retries to avoid synchronised -/// reconnect storms across many clients after a server hiccup. -pub const RECONNECT_BASE_DELAY: Duration = Duration::from_millis(300); -pub const RECONNECT_BACKOFF_MULTIPLIER: u64 = 2; -pub const RECONNECT_MAX_DELAY: Duration = Duration::from_secs(7); - -/// Un-jittered backoff ceiling for the given 1-based reconnect attempt: -/// `min(RECONNECT_MAX_DELAY, RECONNECT_BASE_DELAY * MULTIPLIER^(attempt-1))`, -/// floored at 1ms. Grows geometrically until it saturates at the cap. -fn reconnect_backoff_nominal(attempt: u32) -> Duration { - let base = RECONNECT_BASE_DELAY.as_millis() as u64; - let cap = RECONNECT_MAX_DELAY.as_millis() as u64; - let exp = RECONNECT_BACKOFF_MULTIPLIER.saturating_pow(attempt.saturating_sub(1)); - Duration::from_millis(base.saturating_mul(exp).min(cap).max(1)) -} - -/// Full-jitter backoff delay for the given 1-based reconnect attempt: sampled -/// uniformly from `[0, reconnect_backoff_nominal(attempt)]`. A dependency-free -/// pseudo-random source from the system clock is sufficient — backoff jitter -/// does not need cryptographic quality, only de-correlation across clients. -fn reconnect_backoff_delay(attempt: u32) -> Duration { - let nominal = reconnect_backoff_nominal(attempt).as_millis() as u64; - let seed = - SystemTime::now().duration_since(UNIX_EPOCH).map(|d| d.subsec_nanos() as u64).unwrap_or(0); - Duration::from_millis(seed % (nominal + 1)) -} - /// Settling delay before checking PeerConnection state on the resume path. /// /// Lets a freshly issued ICE-restart offer/answer round-trip take effect when the @@ -965,7 +937,7 @@ impl EngineInner { // this wait so the next attempt fires immediately; a close signals // close_notifier to break out of the loop early (the next iteration's // `is_closed` check then returns) instead of waiting out the backoff. - let backoff = reconnect_backoff_delay(i); + let backoff = reconnect_strategy::delay(i); tokio::select! { _ = livekit_runtime::sleep(backoff) => {} _ = self.retry_now_notify.notified() => { @@ -1157,47 +1129,4 @@ mod tests { ); } } - - #[test] - fn backoff_nominal_grows_geometrically_then_caps() { - // attempt 1 == base, then x2 each step, until it saturates at the cap. - assert_eq!(reconnect_backoff_nominal(1), RECONNECT_BASE_DELAY); - assert_eq!( - reconnect_backoff_nominal(2), - RECONNECT_BASE_DELAY * RECONNECT_BACKOFF_MULTIPLIER as u32 - ); - assert_eq!( - reconnect_backoff_nominal(3), - RECONNECT_BASE_DELAY * (RECONNECT_BACKOFF_MULTIPLIER * RECONNECT_BACKOFF_MULTIPLIER) as u32 - ); - - // Monotonic non-decreasing and never above the cap. - let mut prev = Duration::ZERO; - for attempt in 1..=RECONNECT_ATTEMPTS { - let nominal = reconnect_backoff_nominal(attempt); - assert!(nominal >= prev, "backoff must not decrease (attempt {attempt})"); - assert!(nominal <= RECONNECT_MAX_DELAY, "backoff must not exceed the cap"); - prev = nominal; - } - - // Late attempts are pinned to the cap, and large attempt indices don't - // overflow into a wrapped-around small value. - assert_eq!(reconnect_backoff_nominal(RECONNECT_ATTEMPTS), RECONNECT_MAX_DELAY); - assert_eq!(reconnect_backoff_nominal(u32::MAX), RECONNECT_MAX_DELAY); - } - - #[test] - fn backoff_delay_stays_within_nominal_jitter_window() { - // Full jitter: every sample must land within [0, nominal(attempt)]. - for attempt in 1..=RECONNECT_ATTEMPTS { - let nominal = reconnect_backoff_nominal(attempt); - for _ in 0..1000 { - let delay = reconnect_backoff_delay(attempt); - assert!( - delay <= nominal, - "jittered delay {delay:?} exceeded nominal {nominal:?} (attempt {attempt})" - ); - } - } - } } diff --git a/livekit/src/rtc_engine/reconnect_strategy.rs b/livekit/src/rtc_engine/reconnect_strategy.rs new file mode 100644 index 000000000..bfd304f52 --- /dev/null +++ b/livekit/src/rtc_engine/reconnect_strategy.rs @@ -0,0 +1,98 @@ +// Copyright 2025 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Reconnect backoff schedule. +//! +//! Computes the delay between reconnect attempts as exponential backoff with +//! full jitter. This replaces a previously fixed reconnect interval: it recovers +//! faster from transient blips and spreads retries to avoid synchronised +//! reconnect storms across many clients after a server hiccup. + +use std::time::{Duration, SystemTime, UNIX_EPOCH}; + +/// Maximum number of reconnect attempts before the engine gives up and closes. +pub const RECONNECT_ATTEMPTS: u32 = 10; + +/// Exponential-backoff-with-full-jitter parameters for spacing reconnect +/// attempts. The per-attempt delay is sampled uniformly from +/// `[0, min(RECONNECT_MAX_DELAY, RECONNECT_BASE_DELAY * MULTIPLIER^(attempt-1))]`. +pub const RECONNECT_BASE_DELAY: Duration = Duration::from_millis(300); +pub const RECONNECT_BACKOFF_MULTIPLIER: u64 = 2; +pub const RECONNECT_MAX_DELAY: Duration = Duration::from_secs(7); + +/// Un-jittered backoff ceiling for the given 1-based reconnect attempt: +/// `min(RECONNECT_MAX_DELAY, RECONNECT_BASE_DELAY * MULTIPLIER^(attempt-1))`, +/// floored at 1ms. Grows geometrically until it saturates at the cap. +pub(super) fn nominal(attempt: u32) -> Duration { + let base = RECONNECT_BASE_DELAY.as_millis() as u64; + let cap = RECONNECT_MAX_DELAY.as_millis() as u64; + let exp = RECONNECT_BACKOFF_MULTIPLIER.saturating_pow(attempt.saturating_sub(1)); + Duration::from_millis(base.saturating_mul(exp).min(cap).max(1)) +} + +/// Full-jitter backoff delay for the given 1-based reconnect attempt: sampled +/// uniformly from `[0, nominal(attempt)]`. A dependency-free pseudo-random +/// source from the system clock is sufficient — backoff jitter does not need +/// cryptographic quality, only de-correlation across clients. +pub(super) fn delay(attempt: u32) -> Duration { + let nominal = nominal(attempt).as_millis() as u64; + let seed = + SystemTime::now().duration_since(UNIX_EPOCH).map(|d| d.subsec_nanos() as u64).unwrap_or(0); + Duration::from_millis(seed % (nominal + 1)) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn backoff_nominal_grows_geometrically_then_caps() { + // attempt 1 == base, then x2 each step, until it saturates at the cap. + assert_eq!(nominal(1), RECONNECT_BASE_DELAY); + assert_eq!(nominal(2), RECONNECT_BASE_DELAY * RECONNECT_BACKOFF_MULTIPLIER as u32); + assert_eq!( + nominal(3), + RECONNECT_BASE_DELAY * (RECONNECT_BACKOFF_MULTIPLIER * RECONNECT_BACKOFF_MULTIPLIER) as u32 + ); + + // Monotonic non-decreasing and never above the cap. + let mut prev = Duration::ZERO; + for attempt in 1..=RECONNECT_ATTEMPTS { + let nominal = nominal(attempt); + assert!(nominal >= prev, "backoff must not decrease (attempt {attempt})"); + assert!(nominal <= RECONNECT_MAX_DELAY, "backoff must not exceed the cap"); + prev = nominal; + } + + // Late attempts are pinned to the cap, and large attempt indices don't + // overflow into a wrapped-around small value. + assert_eq!(nominal(RECONNECT_ATTEMPTS), RECONNECT_MAX_DELAY); + assert_eq!(nominal(u32::MAX), RECONNECT_MAX_DELAY); + } + + #[test] + fn backoff_delay_stays_within_nominal_jitter_window() { + // Full jitter: every sample must land within [0, nominal(attempt)]. + for attempt in 1..=RECONNECT_ATTEMPTS { + let nominal = nominal(attempt); + for _ in 0..1000 { + let delay = delay(attempt); + assert!( + delay <= nominal, + "jittered delay {delay:?} exceeded nominal {nominal:?} (attempt {attempt})" + ); + } + } + } +} From f712b86d341298596ee15ec08f9bc9cd4ebd43ba Mon Sep 17 00:00:00 2001 From: github-actions <41898282+github-actions[bot]@users.noreply.github.com> Date: Fri, 19 Jun 2026 11:44:00 +0000 Subject: [PATCH 12/16] generated protobuf --- livekit-ffi-node-bindings/proto/room_pb.d.ts | 9 +++++++++ livekit-ffi-node-bindings/proto/room_pb.js | 1 + 2 files changed, 10 insertions(+) diff --git a/livekit-ffi-node-bindings/proto/room_pb.d.ts b/livekit-ffi-node-bindings/proto/room_pb.d.ts index 5f97a5764..1e6eb88e5 100644 --- a/livekit-ffi-node-bindings/proto/room_pb.d.ts +++ b/livekit-ffi-node-bindings/proto/room_pb.d.ts @@ -81,6 +81,15 @@ export declare enum SimulateScenarioKind { * @generated from enum value: SIMULATE_FULL_RECONNECT = 7; */ SIMULATE_FULL_RECONNECT = 7, + + /** + * Asks the server to drop the signalling connection during the next resume, + * then triggers a resume locally. The resume cannot complete, so the engine + * escalates to a full reconnect — exercising the resume→full escalation path. + * + * @generated from enum value: SIMULATE_DISCONNECT_SIGNAL_ON_RESUME = 8; + */ + SIMULATE_DISCONNECT_SIGNAL_ON_RESUME = 8, } /** diff --git a/livekit-ffi-node-bindings/proto/room_pb.js b/livekit-ffi-node-bindings/proto/room_pb.js index 4f4df2404..45489a104 100644 --- a/livekit-ffi-node-bindings/proto/room_pb.js +++ b/livekit-ffi-node-bindings/proto/room_pb.js @@ -49,6 +49,7 @@ const SimulateScenarioKind = /*@__PURE__*/ proto2.makeEnum( {no: 5, name: "SIMULATE_FORCE_TCP"}, {no: 6, name: "SIMULATE_FORCE_TLS"}, {no: 7, name: "SIMULATE_FULL_RECONNECT"}, + {no: 8, name: "SIMULATE_DISCONNECT_SIGNAL_ON_RESUME"}, ], ); From f55f166fad5af16642ed5bb196711a51e5baa12c Mon Sep 17 00:00:00 2001 From: lukasIO Date: Fri, 19 Jun 2026 13:45:40 +0200 Subject: [PATCH 13/16] fmt --- livekit/src/rtc_engine/reconnect_strategy.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/livekit/src/rtc_engine/reconnect_strategy.rs b/livekit/src/rtc_engine/reconnect_strategy.rs index bfd304f52..bfb58b0a0 100644 --- a/livekit/src/rtc_engine/reconnect_strategy.rs +++ b/livekit/src/rtc_engine/reconnect_strategy.rs @@ -63,7 +63,8 @@ mod tests { assert_eq!(nominal(2), RECONNECT_BASE_DELAY * RECONNECT_BACKOFF_MULTIPLIER as u32); assert_eq!( nominal(3), - RECONNECT_BASE_DELAY * (RECONNECT_BACKOFF_MULTIPLIER * RECONNECT_BACKOFF_MULTIPLIER) as u32 + RECONNECT_BASE_DELAY + * (RECONNECT_BACKOFF_MULTIPLIER * RECONNECT_BACKOFF_MULTIPLIER) as u32 ); // Monotonic non-decreasing and never above the cap. From 54aec02ee17ac6fd527b51ef0cf6543484e642c1 Mon Sep 17 00:00:00 2001 From: lukasIO Date: Fri, 19 Jun 2026 13:47:21 +0200 Subject: [PATCH 14/16] fmt --- livekit/tests/reconnection_test.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/livekit/tests/reconnection_test.rs b/livekit/tests/reconnection_test.rs index 7d0795c41..c5a6a56d2 100644 --- a/livekit/tests/reconnection_test.rs +++ b/livekit/tests/reconnection_test.rs @@ -66,7 +66,9 @@ async fn assert_recovers( // local signal channel or ask the server to issue a Leave); recovery then // proceeds asynchronously and surfaces as room events, which are buffered on // an unbounded channel until we observe them below. - room.simulate_scenario(scenario).await.map_err(|e| anyhow!("simulate_scenario failed: {e:?}"))?; + room.simulate_scenario(scenario) + .await + .map_err(|e| anyhow!("simulate_scenario failed: {e:?}"))?; // Expect Reconnecting, then Reconnected. Ignore unrelated events in between. let observe = async { From 3d3df191d72431e32c5aeafbb86802ec4c09d841 Mon Sep 17 00:00:00 2001 From: lukasIO Date: Fri, 19 Jun 2026 14:15:29 +0200 Subject: [PATCH 15/16] fix build and update spec --- livekit/specs/signalling-reconnection.allium | 17 +++++++++++++++++ livekit/src/rtc_engine/mod.rs | 2 +- 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/livekit/specs/signalling-reconnection.allium b/livekit/specs/signalling-reconnection.allium index 9ff110582..67414ec86 100644 --- a/livekit/specs/signalling-reconnection.allium +++ b/livekit/specs/signalling-reconnection.allium @@ -438,6 +438,20 @@ rule ServerRequestsDisconnect { -- terminal `revoked`) and tears the engine down. } +rule CloseEngine { + when: CloseEngine(engine) + requires: engine.status in {connected, reconnecting} + ensures: engine.reconnect_permission = revoked + ensures: engine.status = closed + ensures: EngineDisconnected(engine, cause: client_initiated) + @guidance + -- The Application explicitly closes the room. Lands `closed` from any + -- non-terminal state — including mid-reconnect, cancelling an in-flight + -- reconnect. The implementation signals close_notifier, which breaks the + -- reconnect loop's backoff/attempt waits immediately rather than waiting + -- them out; permission is revoked so a late stimulus can't restart it. +} + -- === Engine: starting / escalating the loop ================================= rule StartReconnect { @@ -679,6 +693,9 @@ surface RoomConnectionLifecycle { app.engine.mode app.engine.reconnect_cause + provides: + CloseEngine(app.engine) + @guarantee LifecycleNotifications -- The engine emits, for the Room to observe: EngineResuming / -- EngineRestarting (start of an episode, or a resume->full escalation), diff --git a/livekit/src/rtc_engine/mod.rs b/livekit/src/rtc_engine/mod.rs index 290d22369..f85897bda 100644 --- a/livekit/src/rtc_engine/mod.rs +++ b/livekit/src/rtc_engine/mod.rs @@ -1082,7 +1082,7 @@ impl EngineInner { // the next cycle; pre-fix it was dropped and the engine resumed again. if self.fail_transport_during_next_resume.swap(false, Ordering::AcqRel) { log::warn!("test fault injection: simulating concurrent failure during resume"); - self.reconnection_needed(false, false); + self.reconnection_needed(false, false, DisconnectReason::UnknownReason); } } From 5b7306cdfed214bddedfd0c9a093143ce6a8d01f Mon Sep 17 00:00:00 2001 From: lukasIO Date: Tue, 23 Jun 2026 13:06:27 +0200 Subject: [PATCH 16/16] error early when not authenticated (#1149) ### Before you submit your PR Make sure the following is true before submitting your PR: - [ ] I have read the [contributing guidelines](https://github.com/livekit/rust-sdks/blob/main/CONTRIBUTING.md) and validated that this PR will be accepted. - [ ] I have read and followed the principles regarding breaking changes, testing, and code quality. ### PR description Describe the changes in this PR. Explain what the PR is meant to solve and how to reproduce the issue in the first place. ### Breaking changes If this PR introduces breaking changes, list them here and document the rationale for introducing such a change. ### MSRV If the PR modifies the crate's MSRV (Minimum Supported Rust Version), document it here. ### Testing Ideally, unit test the code you add, but ensure you're not repeating existing test cases. Use as many already written scaffolding, utilities as possible; write your own, when needed. If external services, APIs, tokens are required (e.g., running an LK server instance), provide the necessary information. Make sure your tests perform useful, context-aware assertions and do not simply emulate "happy paths". ### Async We want the project to be runtime-agnostic, so please reuse what's already in [livekit-runtime](https://github.com/livekit/rust-sdks/blob/main/livekit-runtime/) and feel free to add anything missing. It's ok to use Tokio directly, when writing unit tests, if necessary. When testing, do not use artificial delays for the state to "catch up"; instead, respect the event flow and subscribe properly using channels or other mechanisms. --- Cargo.lock | 1 + livekit/Cargo.toml | 1 + livekit/src/rtc_engine/mod.rs | 93 ++++++++++++++++++++ livekit/src/rtc_engine/reconnect_strategy.rs | 16 ++-- 4 files changed, 103 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index aa163b90c..e1100db3c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3825,6 +3825,7 @@ dependencies = [ "bytes", "chrono", "futures-util", + "http 1.4.0", "lazy_static", "libloading 0.8.9", "libwebrtc", diff --git a/livekit/Cargo.toml b/livekit/Cargo.toml index 11b30bdd3..bd21e62ed 100644 --- a/livekit/Cargo.toml +++ b/livekit/Cargo.toml @@ -59,3 +59,4 @@ anyhow = "1.0.99" test-log = "0.2.18" test-case = "3.3" serial_test = "3.0" +http = "1.1" diff --git a/livekit/src/rtc_engine/mod.rs b/livekit/src/rtc_engine/mod.rs index f85897bda..a1a26f4ae 100644 --- a/livekit/src/rtc_engine/mod.rs +++ b/livekit/src/rtc_engine/mod.rs @@ -503,6 +503,14 @@ impl EngineInner { match try_connect().await { Ok(res) => return Ok(res), Err(e) => { + // A validated auth failure (401/403) will not succeed on + // retry with the same token — surface it immediately instead + // of burning the remaining join attempts. Same classification + // as the reconnect loop (see `auth_failure_reason`). + if auth_failure_reason(&e).is_some() { + log::warn!("authentication rejected during connect ({e}); not retrying"); + return Err(e); + } let attempt_i = i + 1; if i < max_retries { log::warn!( @@ -943,6 +951,16 @@ impl EngineInner { "server requested disconnect during restart".into(), )); } + if let Some(reason) = auth_failure_reason(&err) { + log::warn!( + "authentication rejected during restart ({err}); not retrying" + ); + self.running_handle.write().can_reconnect = false; + self.close(reason).await; + return Err(EngineError::Connection( + "authentication failed during reconnect".into(), + )); + } log::error!("restarting connection failed: {}", err); } } @@ -971,6 +989,16 @@ impl EngineInner { "server requested disconnect during resume".into(), )); } + if let Some(reason) = auth_failure_reason(&err) { + log::warn!( + "authentication rejected during resume ({err}); not retrying" + ); + self.running_handle.write().can_reconnect = false; + self.close(reason).await; + return Err(EngineError::Connection( + "authentication failed during reconnect".into(), + )); + } log::error!("resuming connection failed: {}", err); let mut running_handle = self.running_handle.write(); running_handle.full_reconnect = true; @@ -1153,6 +1181,28 @@ fn leave_disconnect_reason(err: &EngineError) -> Option { None } +/// Inspect a reconnect-attempt error for a genuine authentication/authorization +/// failure (HTTP 401/403). Such a failure will not succeed on retry with the +/// same token, so the reconnect loop should bail out immediately rather than +/// burning every attempt (and hammering the server) with credentials it already +/// knows are rejected. +/// +/// We key on `SignalError::Client(401|403)`, which is produced by the server's +/// `rtc/validate` probe (see [`super`]'s `SignalInner::validate`) — an +/// authoritative classification. We deliberately do NOT key on the raw +/// `WsError::Http` upgrade status, because that can be a fabricated 401 masking a +/// transient server error (e.g. a 503 from a saturated node), which IS +/// retryable. A resume that hits a raw 401 simply escalates to a full reconnect, +/// whose connect path runs `validate()` and surfaces the authoritative status. +fn auth_failure_reason(err: &EngineError) -> Option { + if let EngineError::Signal(SignalError::Client(status, _)) = err { + if matches!(status.as_u16(), 401 | 403) { + return Some(DisconnectReason::JoinFailure); + } + } + None +} + #[cfg(test)] mod tests { use super::*; @@ -1200,4 +1250,47 @@ mod tests { ); } } + + #[test] + fn auth_failure_reason_flags_validated_401_and_403() { + // The server's rtc/validate probe surfaces auth failures as Client(4xx). + for status in [401u16, 403] { + let err = EngineError::Signal(SignalError::Client( + http::StatusCode::from_u16(status).unwrap(), + "invalid token".into(), + )); + assert_eq!( + auth_failure_reason(&err), + Some(DisconnectReason::JoinFailure), + "Client({status}) must be treated as a non-retryable auth failure" + ); + } + } + + fn auth_failure_reason_ignores_other_client_and_server_errors() { + let not_auth = [ + // Other client errors are not auth failures. + EngineError::Signal(SignalError::Client(http::StatusCode::NOT_FOUND, "".into())), + EngineError::Signal(SignalError::Client( + http::StatusCode::TOO_MANY_REQUESTS, + "".into(), + )), + // Server errors (e.g. a saturated node) are retryable. + EngineError::Signal(SignalError::Server( + http::StatusCode::SERVICE_UNAVAILABLE, + "".into(), + )), + // Generic connectivity/internal errors are retryable. + EngineError::Connection("network".into()), + EngineError::Internal("bug".into()), + EngineError::Signal(SignalError::SendError), + EngineError::Signal(SignalError::Timeout("waiting".into())), + ]; + for err in ¬_auth { + assert!( + auth_failure_reason(err).is_none(), + "{err:?} must NOT be treated as an auth failure" + ); + } + } } diff --git a/livekit/src/rtc_engine/reconnect_strategy.rs b/livekit/src/rtc_engine/reconnect_strategy.rs index bfb58b0a0..f5daff08e 100644 --- a/livekit/src/rtc_engine/reconnect_strategy.rs +++ b/livekit/src/rtc_engine/reconnect_strategy.rs @@ -70,10 +70,10 @@ mod tests { // Monotonic non-decreasing and never above the cap. let mut prev = Duration::ZERO; for attempt in 1..=RECONNECT_ATTEMPTS { - let nominal = nominal(attempt); - assert!(nominal >= prev, "backoff must not decrease (attempt {attempt})"); - assert!(nominal <= RECONNECT_MAX_DELAY, "backoff must not exceed the cap"); - prev = nominal; + let nominal_duration = nominal(attempt); + assert!(nominal_duration >= prev, "backoff must not decrease (attempt {attempt})"); + assert!(nominal_duration <= RECONNECT_MAX_DELAY, "backoff must not exceed the cap"); + prev = nominal_duration; } // Late attempts are pinned to the cap, and large attempt indices don't @@ -86,12 +86,12 @@ mod tests { fn backoff_delay_stays_within_nominal_jitter_window() { // Full jitter: every sample must land within [0, nominal(attempt)]. for attempt in 1..=RECONNECT_ATTEMPTS { - let nominal = nominal(attempt); + let nominal_duration = nominal(attempt); for _ in 0..1000 { - let delay = delay(attempt); + let delay_duration = delay(attempt); assert!( - delay <= nominal, - "jittered delay {delay:?} exceeded nominal {nominal:?} (attempt {attempt})" + delay_duration <= nominal_duration, + "jittered delay {delay_duration:?} exceeded nominal {nominal_duration:?} (attempt {attempt})" ); } }