diff --git a/.changeset/lukas_reconnect.md b/.changeset/lukas_reconnect.md new file mode 100644 index 000000000..35377a265 --- /dev/null +++ b/.changeset/lukas_reconnect.md @@ -0,0 +1,8 @@ +--- +livekit: patch +livekit-api: patch +livekit-ffi: patch +livekit-uniffi: patch +--- + +harden reconnect behaviour - #1148 (@lukasIO) diff --git a/Cargo.lock b/Cargo.lock index aa163b90c..e1100db3c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3825,6 +3825,7 @@ dependencies = [ "bytes", "chrono", "futures-util", + "http 1.4.0", "lazy_static", "libloading 0.8.9", "libwebrtc", diff --git a/livekit-api/src/signal_client/mod.rs b/livekit-api/src/signal_client/mod.rs index e02ff6f88..c929dde9e 100644 --- a/livekit-api/src/signal_client/mod.rs +++ b/livekit-api/src/signal_client/mod.rs @@ -215,9 +215,29 @@ impl SignalClient { if matches!(&err, SignalError::WsError(WsError::Http(e)) if e.status() != 403) { log::error!("unexpected signal error: {}", err.to_string()); } - let urls = RegionUrlProvider::fetch_region_urls(url, token).await?; - let mut last_err = err; + // Fetching region URLs is best-effort. `fetch_region_urls` + // already returns an empty list for non-cloud (direct / + // self-hosted) URLs, so those skip the fallback entirely. If the + // fetch itself fails (e.g. the region endpoint is unreachable), + // that must NOT be fatal: log a warning and fall back to the + // original connection error rather than masking it with the + // fetch error. + let urls = match RegionUrlProvider::fetch_region_urls(url, token).await { + Ok(urls) => urls, + Err(region_err) => { + log::warn!( + "failed to fetch region urls: {region_err}; surfacing original connection error" + ); + return Err(err); + } + }; + + // With no region URLs to try, this surfaces the original error. + // Otherwise we keep the most recent region attempt error, so that + // if every region fails the caller sees why the last region + // connection failed. + let mut last_err = err; for url in urls.iter() { log::info!("fallback connection to: {}", url); match SignalInner::connect(url, token, options.clone(), publisher_offer.clone()) @@ -226,7 +246,7 @@ impl SignalClient { Ok((inner, join_response, stream_events)) => { return Ok(handle_success(inner, join_response, stream_events)) } - Err(err) => last_err = err, + Err(region_conn_err) => last_err = region_conn_err, } } diff --git a/livekit-ffi-node-bindings/proto/room_pb.d.ts b/livekit-ffi-node-bindings/proto/room_pb.d.ts index 176c47026..265b8b8af 100644 --- a/livekit-ffi-node-bindings/proto/room_pb.d.ts +++ b/livekit-ffi-node-bindings/proto/room_pb.d.ts @@ -81,6 +81,15 @@ export declare enum SimulateScenarioKind { * @generated from enum value: SIMULATE_FULL_RECONNECT = 7; */ SIMULATE_FULL_RECONNECT = 7, + + /** + * Asks the server to drop the signalling connection during the next resume, + * then triggers a resume locally. The resume cannot complete, so the engine + * escalates to a full reconnect — exercising the resume→full escalation path. + * + * @generated from enum value: SIMULATE_DISCONNECT_SIGNAL_ON_RESUME = 8; + */ + SIMULATE_DISCONNECT_SIGNAL_ON_RESUME = 8, } /** diff --git a/livekit-ffi-node-bindings/proto/room_pb.js b/livekit-ffi-node-bindings/proto/room_pb.js index 7794a1ed0..1cde1dbb4 100644 --- a/livekit-ffi-node-bindings/proto/room_pb.js +++ b/livekit-ffi-node-bindings/proto/room_pb.js @@ -49,6 +49,7 @@ const SimulateScenarioKind = /*@__PURE__*/ proto2.makeEnum( {no: 5, name: "SIMULATE_FORCE_TCP"}, {no: 6, name: "SIMULATE_FORCE_TLS"}, {no: 7, name: "SIMULATE_FULL_RECONNECT"}, + {no: 8, name: "SIMULATE_DISCONNECT_SIGNAL_ON_RESUME"}, ], ); diff --git a/livekit-ffi/protocol/room.proto b/livekit-ffi/protocol/room.proto index fd413bf11..fc3006cc7 100644 --- a/livekit-ffi/protocol/room.proto +++ b/livekit-ffi/protocol/room.proto @@ -84,6 +84,10 @@ enum SimulateScenarioKind { // Asks the server to send `LeaveRequest{Reconnect}`, forcing a full // reconnect (new RtcSession; SDK republishes existing local tracks). SIMULATE_FULL_RECONNECT = 7; + // Asks the server to drop the signalling connection during the next resume, + // then triggers a resume locally. The resume cannot complete, so the engine + // escalates to a full reconnect — exercising the resume→full escalation path. + SIMULATE_DISCONNECT_SIGNAL_ON_RESUME = 8; } message SimulateScenarioRequest { required uint64 room_handle = 1; diff --git a/livekit-ffi/src/server/requests.rs b/livekit-ffi/src/server/requests.rs index 57c1e38d7..bb94f6232 100644 --- a/livekit-ffi/src/server/requests.rs +++ b/livekit-ffi/src/server/requests.rs @@ -99,6 +99,9 @@ fn on_simulate_scenario( proto::SimulateScenarioKind::SimulateForceTcp => SimulateScenario::ForceTcp, proto::SimulateScenarioKind::SimulateForceTls => SimulateScenario::ForceTls, proto::SimulateScenarioKind::SimulateFullReconnect => SimulateScenario::FullReconnect, + proto::SimulateScenarioKind::SimulateDisconnectSignalOnResume => { + SimulateScenario::DisconnectSignalOnResume + } }; let ffi_room = server.retrieve_handle::(request.room_handle)?.clone(); diff --git a/livekit/Cargo.toml b/livekit/Cargo.toml index 11b30bdd3..bd21e62ed 100644 --- a/livekit/Cargo.toml +++ b/livekit/Cargo.toml @@ -59,3 +59,4 @@ anyhow = "1.0.99" test-log = "0.2.18" test-case = "3.3" serial_test = "3.0" +http = "1.1" diff --git a/livekit/specs/signalling-reconnection.allium b/livekit/specs/signalling-reconnection.allium new file mode 100644 index 000000000..67414ec86 --- /dev/null +++ b/livekit/specs/signalling-reconnection.allium @@ -0,0 +1,741 @@ +-- allium: 3 +-- signalling-reconnection.allium +-- +-- TARGET behaviour for the LiveKit Rust SDK's signalling connection and +-- reconnection. Evolved from a faithful AS-IS baseline (distilled from +-- livekit-api/src/signal_client/ and livekit/src/rtc_engine/mod.rs) by applying +-- five intentional deltas. Each is marked `-- DELTA n` where it changes +-- behaviour from today's implementation. +-- +-- 1. Decoupled resume: an explicit ordered chain of rules (reopen link -> +-- sync state -> republish offer -> await PCs -> re-check link -> drain +-- queue), not one opaque cross-layer sequence. +-- 2. Accurate lifecycle: a resume that escalates to a full reconnect now also +-- emits Restarting; exhaustion reports the original cause, not `unknown`. +-- 3. Backoff: exponential delay with full jitter, attempt-capped, replacing +-- the fixed 5s interval. +-- 4. Monotonic reconnect latch: a terminal `revoked` permission state replaces +-- the ad-hoc `can_reconnect` boolean. +-- 5. Region fallback in scope: region-URL fetch is cloud-only and never masks +-- the original connection error. +-- +-- Boundaries: peer-connection / media recovery (RtcSession) and the Room are +-- external; modelled via the MediaLayer / Application surfaces below. +-- +-- Structure (two layers, kept in one spec — they share the SignalConnection +-- entity, the connect/region/validate path, the auth classification and +-- DisconnectCause, so splitting would force cross-spec entity sharing for little +-- gain; the modular seams are expressed as boundary surfaces instead): +-- +-- LAYER 1 — Signalling link (mirrors livekit-api/signal_client) +-- entity SignalConnection; rules under "Layer 1" below: +-- * Establishment — initial connect, region fallback, auth rejection. +-- (The engine-side join-retries loop is config-level, see join_retries; +-- its only modelled outcome is auth fast-fail, noted there.) +-- * Liveness & lifecycle — ping/timeout, resume, send/queue, token, close. +-- boundary surfaces: SignalCommands (app), ServerSignalling (SFU). +-- +-- LAYER 2 — Engine reconnect orchestration (mirrors rtc_engine) +-- entity EngineConnection; rules under "Layer 2" below: initial connection, +-- reconnection causes, start/escalate, per-attempt dispatch, the decoupled +-- resume chain, and attempt outcomes (incl. terminal/non-retryable failures). +-- boundary surfaces: MediaRecovery (RtcSession), RoomConnectionLifecycle (Room). +-- +-- Checker note (allium 3.2.4): `allium check` reports residual `status` +-- warnings that are CHECKER ARTIFACTS, not spec defects -- every state flagged +-- "never assigned" is plainly assigned by a visible `ensures`. Three confirmed +-- CLI limitations cause them: +-- a. unreachableValue: the status-assignment analysis does not propagate +-- through deep chained-trigger sequences. A status assigned only by a rule +-- several emit/consume hops from an external stimulus (e.g. `reconnecting`, +-- set by StartReconnect two hops past the cause triggers) reads as +-- "never assigned". Short chains are fine. +-- b. noExit / multi-graph: a state whose only exit uses a set-membership +-- guard (`status in {...}`) reports noExit; and only ONE transition graph +-- per entity is honoured (the language ref permits one per field). Each +-- entity therefore declares a single `status` graph, and the +-- EngineConnection.reconnect_permission latch is enforced by construction + +-- the RevokedImpliesClosed invariant instead of a second graph. +-- c. externalEntity.missingSourceHint on Sfu/MediaLayer/Application is +-- expected: these are true external boundaries with no governing spec. +-- The graphs are retained for their documentation value (the transition +-- topology is the heart of the reconnection behaviour). + +------------------------------------------------------------ +-- External Entities +------------------------------------------------------------ + +-- The SFU server the client signals with. +external entity Sfu { } + +-- The peer-connection / RtcSession layer that performs media recovery. Out of +-- scope; surfaced only through the operations the engine demands of it and the +-- outcomes it reports back. +external entity MediaLayer { } + +-- The SDK consumer (Room) that drives connect/send/close and observes the +-- connection lifecycle. +external entity Application { + engine: EngineConnection +} + +------------------------------------------------------------ +-- Value Types +------------------------------------------------------------ + +-- A signalling message. `kind` determines queue behaviour during a resume. +-- pass_through = Trickle | Offer | Answer | SyncState | Simulate | Leave +-- (mirrors client-sdk-js `passThroughQueueSignals`); buffering any would +-- deadlock the resume, which depends on them flowing. +-- queueable = everything else (AddTrack, Mute, UpdateSubscription, ...). +value SignalMessage { + kind: pass_through | queueable +} + +------------------------------------------------------------ +-- Enumerations +------------------------------------------------------------ + +-- Why a recovery episode started or the connection ended. Shared across the +-- cause triggers and the engine's stored reconnect_cause, so it is a named enum. +enum DisconnectCause { + ping_timeout + | stream_closed + | peer_connection_failed + | server_leave + | signal_severed_during_resume + | unauthorized + | client_initiated + | unknown +} + +------------------------------------------------------------ +-- Entities and Variants +------------------------------------------------------------ + +entity SignalConnection { + -- connected : stream up, sends flow immediately. + -- lost : stream dropped unexpectedly (ping timeout / WS close); + -- awaiting an engine-driven resume. Gate not yet set. + -- reconnecting : a resume is in flight; the gate is on -- queueable sends + -- buffer, pass_through sends still flow. Cleared by + -- MarkReconnected. + -- closed : terminal (clean/local close or server Disconnect). + status: connected | lost | reconnecting | closed + + -- Auth token; refreshable mid-session so a resume can outlive an expired + -- join token (the engine reads this at resume time). + token: String + + -- Liveness parameter advertised by the SFU in the JoinResponse. + ping_timeout: Duration + + -- Reset on every message received from the server (not just pongs). + last_message_at: Timestamp? + + -- Queueable signals buffered while status = reconnecting; drained in + -- original order by MarkReconnected. + queued_signals: List + + -- The engine that drives this link's recovery. + engine: EngineConnection with signal = this + + -- The link is present and usable while connected or mid-resume; `lost` or + -- `closed` mean the stream is gone. Used by the resume's mid-flight re-check + -- (ResumeRechecksLink) and mirrors the implementation's `is_connected()` + -- (stream-present) check rather than ping-timeout liveness. + is_connected: status in {connected, reconnecting} + + transitions status { + connected -> lost -- ping timeout / stream closed + connected -> reconnecting -- resume started while WS still up (PC-failure resume) + connected -> closed -- clean / local close + lost -> reconnecting -- engine drives the resume + lost -> closed -- server Disconnect / give up + reconnecting -> connected -- resume finalised (MarkReconnected) + reconnecting -> lost -- resume severed again; engine will retry + reconnecting -> closed -- close during resume + terminal: closed + } +} + +entity EngineConnection { + -- connected : a live RtcSession; signalling and PCs up. + -- reconnecting : the reconnect loop owns recovery. + -- closed : terminal (gave up, or server Disconnect). + status: connected | reconnecting | closed + + -- Recovery strategy for the current episode. Escalates resume -> full and + -- NEVER downgrades. + mode: resume | full + + -- DELTA 4: a monotonic latch replacing the ad-hoc can_reconnect boolean and + -- its lock-guarded checks. Only entity creation assigns `permitted`; no rule + -- ever assigns it, and the rules that revoke also close the engine -- so once + -- the server (or a recovery attempt hitting a server Disconnect) forbids + -- reconnection, it stays forbidden. Monotonicity is by construction and is + -- asserted by the RevokedImpliesClosed invariant. + -- (A `permitted -> revoked` terminal transition graph would express this + -- structurally, but the entity's `status` graph below takes the single graph + -- slot the current checker honours; see the spec header note.) + -- NOTE (spec<->code): the implementation realises this as a `can_reconnect` + -- bool that is only ever set false (never back to true) within a session, + -- always alongside a close() — i.e. it already satisfies this latch and the + -- RevokedImpliesClosed invariant; the enum is the spec-level model. + reconnect_permission: permitted | revoked + + -- 1-based attempt index within the current episode; 0 when connected. + attempt: Integer + + -- DELTA 2: the cause that started the current episode, carried through so + -- that giving up reports the real reason instead of `unknown`. + reconnect_cause: DisconnectCause? + + -- The signalling link this engine drives. + signal: SignalConnection + + transitions status { + connected -> reconnecting + reconnecting -> connected + reconnecting -> closed + connected -> closed + terminal: closed + } +} + +------------------------------------------------------------ +-- Config +------------------------------------------------------------ + +config { + -- Initial connect: extra attempts after the first (Room default). A + -- validated auth failure (HTTP 401/403, DisconnectCause unauthorized) is + -- surfaced immediately and does NOT consume these retries (R4.2), matching + -- the reconnect loop's terminal-failure handling. + join_retries: Integer = 3 + -- Per signalling-link connect attempt timeout (SIGNAL_CONNECT_TIMEOUT). + connect_timeout: Duration = 5.seconds + -- DELTA 3: exponential backoff with full jitter, attempt-capped. + -- Per-attempt nominal delay = min(reconnect_max_delay_ms, + -- reconnect_base_delay_ms * reconnect_backoff_multiplier^(attempt - 1)); + -- the actual delay is sampled uniformly from [0, nominal] (full jitter). + -- Expressed in milliseconds because Duration has no sub-second unit. + max_reconnect_attempts: Integer = 10 + reconnect_base_delay_ms: Integer = 300 + reconnect_backoff_multiplier: Integer = 2 + reconnect_max_delay_ms: Integer = 7000 + -- Settle delay before re-checking PC state on the resume path + -- (PC_RECONNECT_SETTLE_DELAY). Resume-only; full reconnect builds new PCs. + pc_reconnect_settle_delay: Duration = 1.seconds +} + +------------------------------------------------------------ +-- Rules +------------------------------------------------------------ + +-- ########################################################################### +-- ## LAYER 1 — Signalling link (mirrors livekit-api/signal_client) +-- ########################################################################### + +-- === Establishment (DELTA 5: region fallback) =============================== + +rule SignalConnectionEstablished { + when: SignalConnectSucceeded(target, token) + ensures: SignalConnection.created( + status: connected, + token: token, + queued_signals: {} + ) + @guidance + -- connect() awaits the SFU's JoinResponse (supplying ping_timeout). + -- Endpoint selection (v1->v0 on 404) is establishment mechanics, kept + -- out of scope; `target` records whether the host is `cloud` or `direct`. +} + +rule PrimaryConnectFailsOnCloud { + when: SignalConnectFailed(target, error) + requires: target = cloud + ensures: RegionFetchRequested(error) + @guidance + -- DELTA 5: only LiveKit Cloud hosts attempt region fallback. + -- DELTA R3.2: the region list is cached per host with a TTL (default + -- 5s), so a RegionFetchRequested within the TTL is served from cache + -- instead of re-paying the network fetch on every reconnect attempt. +} + +rule PrimaryConnectFailsOnDirect { + when: SignalConnectFailed(target, error) + requires: target = direct + ensures: SignalConnectRejected(error) + @guidance + -- DELTA 5: direct / self-hosted endpoints skip region fallback entirely + -- and surface the original connection error immediately. +} + +rule RegionFetchFails { + when: RegionFetchFailed(original_error) + ensures: SignalConnectRejected(original_error) + @guidance + -- DELTA 5: failing to FETCH region URLs is best-effort, never fatal. It + -- is logged as a warning and the ORIGINAL connection error is surfaced, + -- rather than masking it with the fetch error (today's `?` is fatal). +} + +rule RegionFetchSucceeds { + when: RegionUrlsFetched(original_error) + ensures: RegionConnectAttemptRequested(original_error) + @guidance + -- Try each region URL in order; success yields SignalConnectSucceeded. +} + +rule AllRegionsFail { + when: RegionConnectsExhausted(last_region_error) + ensures: SignalConnectRejected(last_region_error) + @guidance + -- When region URLs WERE fetched and tried but every one failed, surface + -- the LAST region connection error -- that is the most relevant reason a + -- region attempt failed. (Distinct from RegionFetchFails, which is about + -- failing to fetch the list at all.) +} + +-- === Signalling-link liveness & lifecycle =================================== + +rule SignalLinkTimesOut { + when: connection: SignalConnection.last_message_at + connection.ping_timeout <= now + requires: connection.status = connected + ensures: connection.status = lost + ensures: SignalLinkLost(connection, cause: ping_timeout) + @guidance + -- last_message_at resets on ANY received message, so a healthy stream + -- never trips this. A periodic PingReq keeps traffic flowing. +} + +rule SignalStreamDrops { + when: SignalStreamDropped(connection) + requires: connection.status = connected + ensures: connection.status = lost + ensures: SignalLinkLost(connection, cause: stream_closed) +} + +rule ResumeSignalLink { + when: ResumeSignalLink(connection) + requires: connection.status in {lost, connected} + ensures: connection.status = reconnecting + ensures: SignalLinkResumed(connection) + @guidance + -- restart(): closes any existing stream and opens a fresh one with + -- reconnect=true and the (possibly refreshed) token, awaiting the SFU's + -- ReconnectResponse. The gate is set BEFORE touching the stream so + -- concurrent sends route to the queue, and stays on until MarkReconnected. +} + +rule SignalResumeFails { + when: SignalResumeFailed(connection) + requires: connection.status = reconnecting + ensures: connection.status = lost + @guidance + -- Reopening the stream failed; the gate is cleared and the link returns + -- to `lost` so the engine's next attempt can re-enter cleanly. +} + +rule SendSignal { + when: SendSignal(connection, message) + requires: connection.status != closed + if connection.status = reconnecting and message.kind = queueable: + ensures: connection.queued_signals = connection.queued_signals + {message} + else: + ensures: SignalDelivered(connection, message) + @guidance + -- pass_through messages always attempt immediate delivery. On the normal + -- path any backlog is flushed in original order first. A failed queueable + -- send re-queues; a failed pass_through is dropped with a warning. +} + +rule MarkReconnected { + when: MarkReconnected(connection) + requires: connection.status = reconnecting + ensures: connection.status = connected + ensures: connection.queued_signals = {} + @guidance + -- set_reconnected(): clears the gate FIRST, then flushes, so a racing + -- send takes the normal draining path and cannot re-queue mid-drain. +} + +rule RefreshSignalToken { + when: ServerRefreshesToken(connection, token) + ensures: connection.token = token +} + +rule CloseSignalLink { + when: CloseSignalLink(connection) + requires: connection.status in {connected, lost, reconnecting} + ensures: connection.status = closed +} + +-- ########################################################################### +-- ## LAYER 2 — Engine reconnect orchestration (mirrors rtc_engine) +-- ########################################################################### + +-- === Engine: initial connection ============================================= + +rule EngineConnects { + when: EngineSessionConnected(signal) + ensures: EngineConnection.created( + status: connected, + mode: resume, + reconnect_permission: permitted, + attempt: 0, + signal: signal + ) +} + +-- === Engine: reconnection causes ============================================ + +rule ReconnectOnSignalLoss { + when: SignalLinkLost(connection, cause) + let engine = connection.engine + requires: engine.reconnect_permission = permitted + ensures: ReconnectNeeded(engine, full_reconnect: false, retry_now: false, cause: cause) + @guidance + -- Unexpected signal loss drives a (non-urgent) resume. +} + +rule ReconnectOnPeerConnectionFailure { + when: PeerConnectionFailed(engine) + requires: engine.reconnect_permission = permitted + ensures: ReconnectNeeded( + engine, + full_reconnect: false, + retry_now: false, + cause: peer_connection_failed + ) +} + +rule ServerRequestsReconnect { + when: ServerLeaveRequested(engine, action) + requires: action in {resume, reconnect} + requires: engine.reconnect_permission = permitted + ensures: ReconnectNeeded( + engine, + full_reconnect: action = reconnect, + retry_now: true, + cause: server_leave + ) + @guidance + -- A server Leave with Resume/Reconnect drives recovery immediately. +} + +rule ServerRequestsDisconnect { + when: ServerLeaveRequested(engine, action) + requires: action = disconnect + requires: engine.reconnect_permission = permitted + requires: engine.status in {connected, reconnecting} + ensures: engine.reconnect_permission = revoked + ensures: engine.status = closed + ensures: EngineDisconnected(engine, cause: server_leave) + @guidance + -- A server Leave with Disconnect forbids reconnection outright (DELTA 4: + -- terminal `revoked`) and tears the engine down. +} + +rule CloseEngine { + when: CloseEngine(engine) + requires: engine.status in {connected, reconnecting} + ensures: engine.reconnect_permission = revoked + ensures: engine.status = closed + ensures: EngineDisconnected(engine, cause: client_initiated) + @guidance + -- The Application explicitly closes the room. Lands `closed` from any + -- non-terminal state — including mid-reconnect, cancelling an in-flight + -- reconnect. The implementation signals close_notifier, which breaks the + -- reconnect loop's backoff/attempt waits immediately rather than waiting + -- them out; permission is revoked so a late stimulus can't restart it. +} + +-- === Engine: starting / escalating the loop ================================= + +rule StartReconnect { + when: ReconnectNeeded(engine, full_reconnect, retry_now, cause) + requires: engine.reconnect_permission = permitted + requires: engine.status = connected + ensures: engine.status = reconnecting + ensures: engine.attempt = 1 + ensures: engine.reconnect_cause = cause + if full_reconnect: + ensures: engine.mode = full + ensures: EngineRestarting(engine) + else: + ensures: engine.mode = resume + ensures: EngineResuming(engine) + ensures: ReconnectAttemptStarted(engine) + @guidance + -- Mirrors reconnection_needed() on a fresh episode. The Room acts on the + -- one-time Resuming/Restarting notification (e.g. unpublish local tracks + -- on a full reconnect) before recovery proceeds. +} + +rule EscalateReconnect { + when: ReconnectNeeded(engine, full_reconnect, retry_now, cause) + requires: engine.reconnect_permission = permitted + requires: engine.status = reconnecting + if full_reconnect and engine.mode = resume: + ensures: engine.mode = full + ensures: EngineRestarting(engine) + @guidance + -- Already reconnecting: only escalate resume -> full, never downgrade. + -- DELTA 2: the escalation now emits Restarting so the Room sees a discrete + -- event matching the mode it is in (previously the Room saw Resuming then + -- Restarted with no Restarting between). retry_now restarts the backoff so + -- the in-flight loop's next attempt fires immediately. + -- NOTE (spec<->code): the implementation emits Restarting lazily, on the + -- next attempt the loop runs in full mode (a `restarting_emitted` latch), + -- rather than eagerly at this escalation event. Net effect is identical + -- (Restarting always precedes the full reconnect). +} + +-- === Engine: per-attempt dispatch =========================================== + +rule DispatchReconnectAttempt { + when: ReconnectAttemptStarted(engine) + requires: engine.status = reconnecting + if engine.mode = resume: + ensures: ResumeSignalLink(engine.signal) + else: + ensures: FullReconnectRequested(engine) + @guidance + -- Resume re-opens the existing link and recovers in place; full reconnect + -- asks the media layer to build a brand-new session. +} + +-- === Engine: the decoupled resume chain (DELTA 1) =========================== +-- Each step is a named rule chained to the next, so the ordering that today is +-- implicit across signal+session+engine is an explicit, testable sequence. + +rule ResumeSendsSyncState { + when: SignalLinkResumed(connection) + let engine = connection.engine + requires: engine.status = reconnecting + requires: engine.mode = resume + ensures: SyncStateSent(engine) + @guidance + -- Step 2: with the link reopened, the Room sends SyncState (a + -- pass_through, so it flows despite the gate) so the SFU resyncs state. +} + +rule ResumeRepublishesPublisher { + when: SyncStateSent(engine) + requires: engine.status = reconnecting + ensures: RepublishPublisherRequested(engine) + @guidance + -- Step 3: the publisher offer is re-sent AFTER SyncState -- ordering a + -- refactor must preserve, now guaranteed by this chain not a comment. +} + +rule ResumeAwaitsPeerConnections { + when: PublisherRepublished(engine) + requires: engine.status = reconnecting + ensures: AwaitPeerConnectionsRequested(engine, settle: config.pc_reconnect_settle_delay) + @guidance + -- Step 4: wait for the PeerConnections to reconnect, then apply the + -- settle delay before trusting their state. +} + +rule ResumeRechecksLink { + when: PeerConnectionsReconnected(engine) + requires: engine.status = reconnecting + if not engine.signal.is_connected: + ensures: ResumeAttemptFailed( + engine, + terminal: false, + cause: signal_severed_during_resume + ) + else: + ensures: ResumeQueueDrainRequested(engine) + @guidance + -- Step 5: if the link died while we waited for PCs, FAIL rather than + -- drain queued mutations into the void. +} + +rule ResumeDrainsQueue { + when: ResumeQueueDrainRequested(engine) + requires: engine.status = reconnecting + ensures: MarkReconnected(engine.signal) + ensures: ResumeAttemptSucceeded(engine) + @guidance + -- Step 6: the resume has fully recovered; drain the signal queue. +} + +-- === Engine: attempt outcomes =============================================== + +rule ResumeAttemptSucceeds { + when: ResumeAttemptSucceeded(engine) + requires: engine.status = reconnecting + ensures: engine.status = connected + ensures: engine.attempt = 0 + ensures: engine.reconnect_cause = null + ensures: EngineResumed(engine) +} + +rule ResumeAttemptFailsTerminally { + when: ResumeAttemptFailed(engine, terminal, cause) + requires: engine.status = reconnecting + requires: terminal + ensures: engine.reconnect_permission = revoked + ensures: engine.status = closed + ensures: EngineDisconnected(engine, cause: cause) + @guidance + -- A resume that failed in a NON-RETRYABLE way: stop, do not escalate or + -- retry. Terminal failures are (a) a server Leave{Disconnect} + -- (cause = server_leave) and (b) DELTA R4.2: an authentication failure + -- (cause = unauthorized, a server-validated HTTP 401/403) — the same + -- token will not succeed on retry, so retrying would just hammer the + -- server. `cause` carries which one for the Room. +} + +rule ResumeAttemptEscalates { + when: ResumeAttemptFailed(engine, terminal, cause) + requires: engine.status = reconnecting + requires: not terminal + ensures: engine.mode = full + ensures: EngineRestarting(engine) + ensures: RetryReconnect(engine) + @guidance + -- DELTA 2: a (retryable) failed resume escalates to full reconnect AND + -- emits Restarting on the escalation (previously silent), then retries. +} + +rule RestartAttemptSucceeds { + when: RestartAttemptSucceeded(engine) + requires: engine.status = reconnecting + ensures: engine.status = connected + ensures: engine.attempt = 0 + ensures: engine.reconnect_cause = null + ensures: EngineRestarted(engine) + @guidance + -- The new RtcSession's PCs reached Connected. The new session replaces + -- the old only on success, so a failed attempt leaves the old usable. +} + +rule RestartAttemptFailsTerminally { + when: RestartAttemptFailed(engine, terminal, cause) + requires: engine.status = reconnecting + requires: terminal + ensures: engine.reconnect_permission = revoked + ensures: engine.status = closed + ensures: EngineDisconnected(engine, cause: cause) + @guidance + -- Non-retryable full-reconnect failure: a server Leave{Disconnect} or + -- (DELTA R4.2) a validated auth failure (cause = unauthorized). Stop. +} + +rule RestartAttemptRetries { + when: RestartAttemptFailed(engine, terminal, cause) + requires: engine.status = reconnecting + requires: not terminal + ensures: RetryReconnect(engine) +} + +rule RetryReconnectAgain { + when: RetryReconnect(engine) + requires: engine.status = reconnecting + requires: engine.attempt < config.max_reconnect_attempts + ensures: engine.attempt = engine.attempt + 1 + ensures: ReconnectAttemptStarted(engine) + @guidance + -- DELTA 3: attempts are spaced by exponential backoff with full jitter + -- (see config); a server-Leave retry_now collapses the delay to zero. +} + +rule ReconnectExhausted { + when: RetryReconnect(engine) + requires: engine.status = reconnecting + requires: engine.attempt >= config.max_reconnect_attempts + ensures: engine.status = closed + ensures: EngineDisconnected(engine, cause: engine.reconnect_cause) + @guidance + -- DELTA 2: on giving up, EngineDisconnected reports the cause that STARTED + -- this episode (engine.reconnect_cause), threaded through instead of a + -- generic `unknown`. It is always emitted so the Room leaves Reconnecting + -- rather than hanging there. + -- NOTE (spec<->code): there is no dedicated "attempts exhausted" disconnect + -- reason in the wire protocol (proto DisconnectReason), so the original + -- cause is what surfaces. A distinct exhausted reason would require a + -- protocol addition. +} + +------------------------------------------------------------ +-- Invariants +------------------------------------------------------------ + +invariant RevokedImpliesClosed { + for engine in EngineConnection: + engine.reconnect_permission = revoked implies engine.status = closed +} + +------------------------------------------------------------ +-- Surfaces +------------------------------------------------------------ + +-- The application drives signalling sends/close and observes the lifecycle. +surface SignalCommands { + facing app: Application + + provides: + SendSignal(app) + CloseSignalLink(app) +} + +surface RoomConnectionLifecycle { + facing app: Application + + exposes: + app.engine.status + app.engine.mode + app.engine.reconnect_cause + + provides: + CloseEngine(app.engine) + + @guarantee LifecycleNotifications + -- The engine emits, for the Room to observe: EngineResuming / + -- EngineRestarting (start of an episode, or a resume->full escalation), + -- EngineResumed / EngineRestarted (success), EngineDisconnected (give-up + -- or server Disconnect). DELTA 2: an escalated full reconnect now also + -- emits Restarting. +} + +-- The SFU pushes connection outcomes, liveness loss, leave requests and tokens. +surface ServerSignalling { + facing sfu: Sfu + + provides: + SignalConnectSucceeded(sfu) + SignalConnectFailed(sfu) + RegionUrlsFetched(sfu) + RegionFetchFailed(sfu) + RegionConnectsExhausted(sfu) + SignalStreamDropped(sfu) + SignalResumeFailed(sfu) + ServerLeaveRequested(sfu) + ServerRefreshesToken(sfu) +} + +-- The media layer reports recovery progress and outcomes; the engine demands +-- the recovery operations from it. +surface MediaRecovery { + facing media: MediaLayer + + provides: + EngineSessionConnected(media) + PeerConnectionFailed(media) + PublisherRepublished(media) + PeerConnectionsReconnected(media) + RestartAttemptSucceeded(media) + RestartAttemptFailed(media) + + @guarantee RecoveryOperations + -- The engine drives the media layer via RepublishPublisherRequested, + -- AwaitPeerConnectionsRequested (which applies pc_reconnect_settle_delay) + -- and FullReconnectRequested. A full reconnect must leave the prior + -- session intact until the new one fully succeeds. +} diff --git a/livekit/src/rtc_engine/mod.rs b/livekit/src/rtc_engine/mod.rs index 50a7f131a..a1a26f4ae 100644 --- a/livekit/src/rtc_engine/mod.rs +++ b/livekit/src/rtc_engine/mod.rs @@ -16,13 +16,12 @@ use libwebrtc::prelude::*; use livekit_api::signal_client::{SignalError, SignalOptions}; use livekit_datatrack::backend as dt; use livekit_protocol as proto; -use livekit_runtime::{interval, Interval, JoinHandle, MissedTickBehavior}; +use livekit_runtime::JoinHandle; use parking_lot::{RwLock, RwLockReadGuard}; use std::{borrow::Cow, fmt::Debug, sync::Arc, time::Duration}; use thiserror::Error; use tokio::sync::{ - mpsc, oneshot, Mutex as AsyncMutex, Notify, RwLock as AsyncRwLock, - RwLockReadGuard as AsyncRwLockReadGuard, + mpsc, oneshot, Notify, RwLock as AsyncRwLock, RwLockReadGuard as AsyncRwLockReadGuard, }; pub use self::rtc_session::{SessionStats, INITIAL_BUFFERED_AMOUNT_LOW_THRESHOLD}; @@ -43,16 +42,19 @@ use crate::{ChatMessage, E2eeManager, TranscriptionSegment}; mod dc_sender; pub mod lk_runtime; mod peer_transport; +mod reconnect_strategy; mod rtc_events; mod rtc_session; +// Re-exported to preserve the public `rtc_engine::RECONNECT_*` paths. +pub use reconnect_strategy::{ + RECONNECT_ATTEMPTS, RECONNECT_BACKOFF_MULTIPLIER, RECONNECT_BASE_DELAY, RECONNECT_MAX_DELAY, +}; + pub(crate) type EngineEmitter = mpsc::UnboundedSender; pub(crate) type EngineEvents = mpsc::UnboundedReceiver; pub(crate) type EngineResult = Result; -pub const RECONNECT_ATTEMPTS: u32 = 10; -pub const RECONNECT_INTERVAL: Duration = Duration::from_secs(5); - /// Settling delay before checking PeerConnection state on the resume path. /// /// Lets a freshly issued ICE-restart offer/answer round-trip take effect when the @@ -75,9 +77,15 @@ pub enum SimulateScenario { Migration, ForceTcp, ForceTls, - /// Tells the server to issue a `LeaveRequest{Reconnect}`, forcing a - /// full reconnect (new RtcSession, republish required). + /// Client-driven full reconnect: forces the next reconnect to be a full + /// reconnect (new RtcSession, republish required) and triggers it locally, + /// without relying on the server. Mirrors client-sdk-js's `full-reconnect`. FullReconnect, + /// Asks the server to drop the signalling connection during the next resume, + /// then triggers a resume locally. The resume cannot complete, so the engine + /// escalates to a full reconnect — exercising the resume→full escalation + /// path. Mirrors client-sdk-js's `disconnect-signal-on-resume`. + DisconnectSignalOnResume, } #[derive(Error, Debug)] @@ -227,6 +235,11 @@ struct EngineHandle { // If full_reconnect is true, the next attempt will not try to resume // and will instead do a full reconnect full_reconnect: bool, + + // The disconnect reason that started the current reconnection episode. + // Carried through so that, if reconnection ultimately fails, the engine + // closes with the original cause rather than a generic `UnknownReason`. + reconnect_reason: DisconnectReason, engine_task: Option<(JoinHandle<()>, oneshot::Sender<()>)>, } @@ -245,8 +258,10 @@ struct EngineInner { // We can simply wait for reconnection by trying to acquire a read lock. // (This also prevents new reconnection to happens if a read guard is still held) reconnecting_lock: AsyncRwLock<()>, - reconnecting_interval: AsyncMutex, + // Signalled when a server-requested reconnect wants the next attempt to fire + // immediately, collapsing the exponential backoff wait between attempts. + retry_now_notify: Arc, /// Test-only fault injection: number of upcoming resume attempts to force to /// fail. Each forced failure decrements this counter and makes /// `try_resume_connection` return an error, which exercises the escalation to a @@ -445,8 +460,6 @@ impl EngineInner { session.wait_pc_connection().await?; let (engine_tx, engine_rx) = mpsc::unbounded_channel(); - let mut interval = interval(RECONNECT_INTERVAL); - interval.set_missed_tick_behavior(MissedTickBehavior::Delay); let inner = Arc::new(Self { lk_runtime, engine_tx, @@ -457,11 +470,12 @@ impl EngineInner { reconnecting: false, can_reconnect: true, full_reconnect: false, + reconnect_reason: DisconnectReason::UnknownReason, engine_task: None, }), options, reconnecting_lock: AsyncRwLock::default(), - reconnecting_interval: AsyncMutex::new(interval), + retry_now_notify: Arc::new(Notify::new()), #[cfg(feature = "__lk-e2e-test")] fail_resume_attempts: std::sync::atomic::AtomicU32::new(0), #[cfg(feature = "__lk-e2e-test")] @@ -489,6 +503,14 @@ impl EngineInner { match try_connect().await { Ok(res) => return Ok(res), Err(e) => { + // A validated auth failure (401/403) will not succeed on + // retry with the same token — surface it immediately instead + // of burning the remaining join attempts. Same classification + // as the reconnect loop (see `auth_failure_reason`). + if auth_failure_reason(&e).is_some() { + log::warn!("authentication rejected during connect ({e}); not retrying"); + return Err(e); + } let attempt_i = i + 1; if i < max_retries { log::warn!( @@ -569,6 +591,7 @@ impl EngineInner { self.reconnection_needed( retry_now, action == proto::leave_request::Action::Reconnect, + reason, ); } proto::leave_request::Action::Disconnect => { @@ -734,6 +757,14 @@ impl EngineInner { // exhausts all attempts leaves the room stuck in Reconnecting forever because // the room's task never sees the event that drives `handle_disconnected`. let _ = self.engine_tx.send(EngineEvent::Disconnected { reason }); + + // Signal any in-flight reconnect loop to stop. The reconnect task selects + // on `close_notifier`, both at the top-level (cancelling the whole task) + // and within its backoff wait (breaking the loop early). We notify LAST, + // after teardown has completed: the reconnect loop's own bail paths call + // `close()` from inside the task, so notifying earlier could let the + // top-level select drop the task mid-`close()` and leave teardown partial. + self.close_notifier.notify_waiters(); } /// When waiting for reconnection, it ensures we're always using the latest session. @@ -755,7 +786,13 @@ impl EngineInner { /// Start the reconnect task if not already started /// Ask to retry directly if `retry_now` is true /// Ask for a full reconnect if `full_reconnect` is true - fn reconnection_needed(self: &Arc, retry_now: bool, full_reconnect: bool) { + /// `reason` is the disconnect cause that triggered this reconnection + fn reconnection_needed( + self: &Arc, + retry_now: bool, + full_reconnect: bool, + reason: DisconnectReason, + ) { let mut running_handle = self.running_handle.write(); if !running_handle.can_reconnect { @@ -777,10 +814,7 @@ impl EngineInner { // Retry as soon as possible when asked, rather than waiting out the backoff. if retry_now { - let inner = self.clone(); - livekit_runtime::spawn(async move { - inner.reconnecting_interval.lock().await.reset(); - }); + self.retry_now_notify.notify_one(); } return; @@ -791,6 +825,9 @@ impl EngineInner { // reconnect (a failed/false-successful resume), keep it. Cleared on a successful // full reconnect in `try_restart_connection`. running_handle.full_reconnect |= full_reconnect; + // Remember the cause so a failed reconnection closes with it rather than + // a generic UnknownReason. + running_handle.reconnect_reason = reason; livekit_runtime::spawn({ let inner = self.clone(); @@ -805,13 +842,32 @@ impl EngineInner { tokio::select! { _ = &mut close_receiver => { + // The engine was closed; abandon the reconnect attempt. + // Clear `reconnecting` (the success/failure path below does + // this after the select; this branch returns early so it + // must do so itself) to avoid leaving a closed engine stuck + // with reconnecting = true. log::debug!("reconnection cancelled"); + inner.running_handle.write().reconnecting = false; return; } res = inner.reconnect_task() => { if res.is_err() { log::error!("failed to reconnect to the livekit room"); - inner.close(DisconnectReason::UnknownReason).await; + // The loop may already have closed the engine with an + // accurate reason (e.g. a server Disconnect hit + // mid-attempt). Only close here for the paths that + // didn't — chiefly attempt exhaustion — and do so with + // the cause that started this episode rather than a + // generic UnknownReason, avoiding a duplicate + // Disconnected event with a stale reason. + let (already_closed, reason) = { + let handle = inner.running_handle.read(); + (handle.closed, handle.reconnect_reason) + }; + if !already_closed { + inner.close(reason).await; + } } else { log::info!("RtcEngine successfully recovered") } @@ -843,6 +899,15 @@ impl EngineInner { ) }; + // Lifecycle notifications are emitted once per mode: Resuming the first + // time the episode resumes, Restarting the first time it (re)enters full + // reconnect. Crucially this includes an escalation from a failed resume, + // which previously emitted no Restarting at all -- leaving the Room to + // observe Resuming followed by Restarted with no Restarting between + // (DELTA 2). + let mut resuming_emitted = false; + let mut restarting_emitted = false; + for i in 1..=RECONNECT_ATTEMPTS { let (is_closed, full_reconnect) = { let running_handle = self.running_handle.read(); @@ -854,7 +919,8 @@ impl EngineInner { } if full_reconnect { - if i == 1 { + if !restarting_emitted { + restarting_emitted = true; let (tx, rx) = oneshot::channel(); let _ = self.engine_tx.send(EngineEvent::Restarting(tx)); let _ = rx.await; @@ -885,11 +951,22 @@ impl EngineInner { "server requested disconnect during restart".into(), )); } + if let Some(reason) = auth_failure_reason(&err) { + log::warn!( + "authentication rejected during restart ({err}); not retrying" + ); + self.running_handle.write().can_reconnect = false; + self.close(reason).await; + return Err(EngineError::Connection( + "authentication failed during reconnect".into(), + )); + } log::error!("restarting connection failed: {}", err); } } } else { - if i == 1 { + if !resuming_emitted { + resuming_emitted = true; let (tx, rx) = oneshot::channel(); let _ = self.engine_tx.send(EngineEvent::Resuming(tx)); let _ = rx.await; @@ -912,6 +989,16 @@ impl EngineInner { "server requested disconnect during resume".into(), )); } + if let Some(reason) = auth_failure_reason(&err) { + log::warn!( + "authentication rejected during resume ({err}); not retrying" + ); + self.running_handle.write().can_reconnect = false; + self.close(reason).await; + return Err(EngineError::Connection( + "authentication failed during reconnect".into(), + )); + } log::error!("resuming connection failed: {}", err); let mut running_handle = self.running_handle.write(); running_handle.full_reconnect = true; @@ -919,7 +1006,21 @@ impl EngineInner { } } - self.reconnecting_interval.lock().await.tick().await; + // Exponential backoff with full jitter between attempts (DELTA 3). + // A server-requested reconnect signals retry_now_notify to collapse + // this wait so the next attempt fires immediately; a close signals + // close_notifier to break out of the loop early (the next iteration's + // `is_closed` check then returns) instead of waiting out the backoff. + let backoff = reconnect_strategy::delay(i); + tokio::select! { + _ = livekit_runtime::sleep(backoff) => {} + _ = self.retry_now_notify.notified() => { + log::debug!("retry_now signalled, skipping reconnect backoff"); + } + _ = self.close_notifier.notified() => { + log::debug!("engine closed, cancelling reconnect backoff"); + } + } } Err(EngineError::Connection( @@ -978,7 +1079,16 @@ impl EngineInner { Ok(()) } - /// Try to restart the current session + /// Resume the current session in place (the lightweight reconnect path). + /// + /// The steps below run in a fixed order that any change must preserve, and + /// each non-trivial seam is its own method so the sequence — and the reason + /// for the ordering — is explicit rather than implied by statement order. + /// Mirrors the resume chain in `livekit/specs/signalling-reconnection.allium`: + /// 1. reopen the signalling link (queue gate stays on until step 4); + /// 2. SyncState before the publisher re-offer; + /// 3. re-offer the publisher, then await PC reconnection + settle; + /// 4. re-check link liveness, then drain the queue. async fn try_resume_connection(self: &Arc) -> EngineResult<()> { // Test-only: force the configured number of resume attempts to fail so tests // can exercise the resume-failure → full-reconnect escalation deterministically. @@ -1000,38 +1110,52 @@ impl EngineInner { // the next cycle; pre-fix it was dropped and the engine resumed again. if self.fail_transport_during_next_resume.swap(false, Ordering::AcqRel) { log::warn!("test fault injection: simulating concurrent failure during resume"); - self.reconnection_needed(false, false); + self.reconnection_needed(false, false, DisconnectReason::UnknownReason); } } let session = self.running_handle.read().session.clone(); - let reconnect_response = session.restart().await?; - let (tx, rx) = oneshot::channel(); - let _ = self.engine_tx.send(EngineEvent::SignalResumed { reconnect_response, tx }); + // 1. Reopen the signalling link. The SignalClient stays gated + // (`reconnecting=true`) so queueable mutations buffer until step 4. + let reconnect_response = session.restart().await?; - // With SignalResumed, the room will send a SyncState message to the server. - // SyncState is a pass-through signal so it goes out immediately even though - // the SignalClient is still in `reconnecting=true` state. - let _ = rx.await; + // 2. Hand the ReconnectResponse to the room and wait until it has sent + // SyncState, which must precede the publisher re-offer. + self.resume_sync_state(reconnect_response).await; - // The publisher offer must be sent AFTER the SyncState message + // 3. Re-offer the publisher (strictly AFTER SyncState) and wait for the + // PeerConnections to reconnect, applying the settle delay. session.restart_publisher().await?; session.wait_pc_reconnected(PC_RECONNECT_SETTLE_DELAY).await?; - // Re-check the signal connection BEFORE flushing the queue. If the WS died - // while we were waiting for PCs to come back, draining queued mutations - // would just push them into the void; better to bail and let the engine - // try a fresh resume (or escalate). + // 4. Re-check link liveness and drain the queued mutations. + self.resume_finalize(&session).await + } + + /// Resume step 2: announce the resume to the room and block until it has + /// sent SyncState. SyncState is a pass-through signal, so it reaches the + /// server immediately even though the SignalClient is still gated. + async fn resume_sync_state(&self, reconnect_response: proto::ReconnectResponse) { + let (tx, rx) = oneshot::channel(); + let _ = self.engine_tx.send(EngineEvent::SignalResumed { reconnect_response, tx }); + // The room replies on `tx` once SyncState has gone out. + let _ = rx.await; + } + + /// Resume step 4: confirm the signalling link survived the PC-reconnect wait + /// before draining the queue. If the WS died while we were waiting for the + /// PeerConnections, draining queued mutations would just push them into the + /// void; bail instead and let the engine try a fresh resume (or escalate). + async fn resume_finalize(&self, session: &RtcSession) -> EngineResult<()> { if !session.signal_client().is_connected().await { return Err(EngineError::Connection("signal connection severed during resume".into())); } - // Flush queued mutations and clear the `reconnecting` flag — at this point - // the resume has fully recovered, so deferred subscription updates / mutes - // / etc. should now reach the server. Mirrors `client.setReconnected()`. + // Flush queued mutations and clear the `reconnecting` gate — the resume + // has fully recovered, so deferred subscription updates / mutes / etc. + // should now reach the server. Mirrors `client.setReconnected()`. session.signal_client().set_reconnected().await; - Ok(()) } } @@ -1057,6 +1181,28 @@ fn leave_disconnect_reason(err: &EngineError) -> Option { None } +/// Inspect a reconnect-attempt error for a genuine authentication/authorization +/// failure (HTTP 401/403). Such a failure will not succeed on retry with the +/// same token, so the reconnect loop should bail out immediately rather than +/// burning every attempt (and hammering the server) with credentials it already +/// knows are rejected. +/// +/// We key on `SignalError::Client(401|403)`, which is produced by the server's +/// `rtc/validate` probe (see [`super`]'s `SignalInner::validate`) — an +/// authoritative classification. We deliberately do NOT key on the raw +/// `WsError::Http` upgrade status, because that can be a fabricated 401 masking a +/// transient server error (e.g. a 503 from a saturated node), which IS +/// retryable. A resume that hits a raw 401 simply escalates to a full reconnect, +/// whose connect path runs `validate()` and surfaces the authoritative status. +fn auth_failure_reason(err: &EngineError) -> Option { + if let EngineError::Signal(SignalError::Client(status, _)) = err { + if matches!(status.as_u16(), 401 | 403) { + return Some(DisconnectReason::JoinFailure); + } + } + None +} + #[cfg(test)] mod tests { use super::*; @@ -1104,4 +1250,47 @@ mod tests { ); } } + + #[test] + fn auth_failure_reason_flags_validated_401_and_403() { + // The server's rtc/validate probe surfaces auth failures as Client(4xx). + for status in [401u16, 403] { + let err = EngineError::Signal(SignalError::Client( + http::StatusCode::from_u16(status).unwrap(), + "invalid token".into(), + )); + assert_eq!( + auth_failure_reason(&err), + Some(DisconnectReason::JoinFailure), + "Client({status}) must be treated as a non-retryable auth failure" + ); + } + } + + fn auth_failure_reason_ignores_other_client_and_server_errors() { + let not_auth = [ + // Other client errors are not auth failures. + EngineError::Signal(SignalError::Client(http::StatusCode::NOT_FOUND, "".into())), + EngineError::Signal(SignalError::Client( + http::StatusCode::TOO_MANY_REQUESTS, + "".into(), + )), + // Server errors (e.g. a saturated node) are retryable. + EngineError::Signal(SignalError::Server( + http::StatusCode::SERVICE_UNAVAILABLE, + "".into(), + )), + // Generic connectivity/internal errors are retryable. + EngineError::Connection("network".into()), + EngineError::Internal("bug".into()), + EngineError::Signal(SignalError::SendError), + EngineError::Signal(SignalError::Timeout("waiting".into())), + ]; + for err in ¬_auth { + assert!( + auth_failure_reason(err).is_none(), + "{err:?} must NOT be treated as an auth failure" + ); + } + } } diff --git a/livekit/src/rtc_engine/reconnect_strategy.rs b/livekit/src/rtc_engine/reconnect_strategy.rs new file mode 100644 index 000000000..f5daff08e --- /dev/null +++ b/livekit/src/rtc_engine/reconnect_strategy.rs @@ -0,0 +1,99 @@ +// Copyright 2025 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Reconnect backoff schedule. +//! +//! Computes the delay between reconnect attempts as exponential backoff with +//! full jitter. This replaces a previously fixed reconnect interval: it recovers +//! faster from transient blips and spreads retries to avoid synchronised +//! reconnect storms across many clients after a server hiccup. + +use std::time::{Duration, SystemTime, UNIX_EPOCH}; + +/// Maximum number of reconnect attempts before the engine gives up and closes. +pub const RECONNECT_ATTEMPTS: u32 = 10; + +/// Exponential-backoff-with-full-jitter parameters for spacing reconnect +/// attempts. The per-attempt delay is sampled uniformly from +/// `[0, min(RECONNECT_MAX_DELAY, RECONNECT_BASE_DELAY * MULTIPLIER^(attempt-1))]`. +pub const RECONNECT_BASE_DELAY: Duration = Duration::from_millis(300); +pub const RECONNECT_BACKOFF_MULTIPLIER: u64 = 2; +pub const RECONNECT_MAX_DELAY: Duration = Duration::from_secs(7); + +/// Un-jittered backoff ceiling for the given 1-based reconnect attempt: +/// `min(RECONNECT_MAX_DELAY, RECONNECT_BASE_DELAY * MULTIPLIER^(attempt-1))`, +/// floored at 1ms. Grows geometrically until it saturates at the cap. +pub(super) fn nominal(attempt: u32) -> Duration { + let base = RECONNECT_BASE_DELAY.as_millis() as u64; + let cap = RECONNECT_MAX_DELAY.as_millis() as u64; + let exp = RECONNECT_BACKOFF_MULTIPLIER.saturating_pow(attempt.saturating_sub(1)); + Duration::from_millis(base.saturating_mul(exp).min(cap).max(1)) +} + +/// Full-jitter backoff delay for the given 1-based reconnect attempt: sampled +/// uniformly from `[0, nominal(attempt)]`. A dependency-free pseudo-random +/// source from the system clock is sufficient — backoff jitter does not need +/// cryptographic quality, only de-correlation across clients. +pub(super) fn delay(attempt: u32) -> Duration { + let nominal = nominal(attempt).as_millis() as u64; + let seed = + SystemTime::now().duration_since(UNIX_EPOCH).map(|d| d.subsec_nanos() as u64).unwrap_or(0); + Duration::from_millis(seed % (nominal + 1)) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn backoff_nominal_grows_geometrically_then_caps() { + // attempt 1 == base, then x2 each step, until it saturates at the cap. + assert_eq!(nominal(1), RECONNECT_BASE_DELAY); + assert_eq!(nominal(2), RECONNECT_BASE_DELAY * RECONNECT_BACKOFF_MULTIPLIER as u32); + assert_eq!( + nominal(3), + RECONNECT_BASE_DELAY + * (RECONNECT_BACKOFF_MULTIPLIER * RECONNECT_BACKOFF_MULTIPLIER) as u32 + ); + + // Monotonic non-decreasing and never above the cap. + let mut prev = Duration::ZERO; + for attempt in 1..=RECONNECT_ATTEMPTS { + let nominal_duration = nominal(attempt); + assert!(nominal_duration >= prev, "backoff must not decrease (attempt {attempt})"); + assert!(nominal_duration <= RECONNECT_MAX_DELAY, "backoff must not exceed the cap"); + prev = nominal_duration; + } + + // Late attempts are pinned to the cap, and large attempt indices don't + // overflow into a wrapped-around small value. + assert_eq!(nominal(RECONNECT_ATTEMPTS), RECONNECT_MAX_DELAY); + assert_eq!(nominal(u32::MAX), RECONNECT_MAX_DELAY); + } + + #[test] + fn backoff_delay_stays_within_nominal_jitter_window() { + // Full jitter: every sample must land within [0, nominal(attempt)]. + for attempt in 1..=RECONNECT_ATTEMPTS { + let nominal_duration = nominal(attempt); + for _ in 0..1000 { + let delay_duration = delay(attempt); + assert!( + delay_duration <= nominal_duration, + "jittered delay {delay_duration:?} exceeded nominal {nominal_duration:?} (attempt {attempt})" + ); + } + } + } +} diff --git a/livekit/src/rtc_engine/rtc_session.rs b/livekit/src/rtc_engine/rtc_session.rs index f77c92ae1..4bb9ff54a 100644 --- a/livekit/src/rtc_engine/rtc_session.rs +++ b/livekit/src/rtc_engine/rtc_session.rs @@ -1908,6 +1908,21 @@ impl SessionInner { SimulateScenario::SignalReconnect => { self.signal_client.close().await; } + SimulateScenario::DisconnectSignalOnResume => { + // Tell the server to drop the signalling link during the next + // resume, then trigger a resume by closing the link locally. The + // server kills the resumed signal, so the resume fails and the + // engine escalates to a full reconnect. Mirrors client-sdk-js's + // `disconnect-signal-on-resume`. + self.signal_client + .send(proto::signal_request::Message::Simulate(proto::SimulateScenario { + scenario: Some( + proto::simulate_scenario::Scenario::DisconnectSignalOnResume(true), + ), + })) + .await; + self.signal_client.close().await; + } SimulateScenario::Speaker => { self.signal_client .send(proto::signal_request::Message::Simulate(proto::SimulateScenario { @@ -1973,13 +1988,18 @@ impl SessionInner { .await? } SimulateScenario::FullReconnect => { - self.signal_client - .send(proto::signal_request::Message::Simulate(proto::SimulateScenario { - scenario: Some( - proto::simulate_scenario::Scenario::LeaveRequestFullReconnect(true), - ), - })) - .await; + // Client-driven full reconnect, mirroring client-sdk-js's + // `full-reconnect` scenario: force the next reconnect to be a + // full reconnect and trigger it locally, rather than asking the + // server to echo a Leave. The server-side + // `LeaveRequestFullReconnect` simulation is not honoured by every + // server build, so relying on it makes this scenario flaky. + self.on_signal_event(proto::signal_response::Message::Leave(proto::LeaveRequest { + action: proto::leave_request::Action::Reconnect.into(), + reason: DisconnectReason::ClientInitiated as i32, + ..Default::default() + })) + .await? } } Ok(()) diff --git a/livekit/tests/reconnection_test.rs b/livekit/tests/reconnection_test.rs new file mode 100644 index 000000000..c5a6a56d2 --- /dev/null +++ b/livekit/tests/reconnection_test.rs @@ -0,0 +1,233 @@ +// Copyright 2025 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Reconnection Tests +//! +//! Exercises the engine reconnection paths end-to-end against a running +//! `livekit-server --dev` (or LiveKit Cloud), via [`Room::simulate_scenario`]: +//! +//! - `SignalReconnect` drives the lightweight *resume* path. +//! - `FullReconnect` asks the server to issue `LeaveRequest{Reconnect}`, forcing +//! a *full* reconnect (new session, republish). +//! +//! Both should surface `Reconnecting` then `Reconnected` and return the room to +//! `Connected`. These guard the lifecycle-event and recovery behaviour the +//! engine's reconnect loop is responsible for. +//! +//! Environment variables (same as the other e2e suites): +//! - LIVEKIT_URL (default ws://localhost:7880) +//! - LIVEKIT_API_KEY (default "devkey") +//! - LIVEKIT_API_SECRET (default "secret") +//! +//! Run: +//! cargo test -p livekit --features "__lk-e2e-test,native-tls" --test reconnection_test -- --nocapture + +#![cfg(feature = "__lk-e2e-test")] + +#[cfg(feature = "__lk-e2e-test")] +use { + anyhow::{anyhow, bail, Result}, + common::test_rooms, + libwebrtc::native::create_random_uuid, + livekit::{ConnectionState, Room, RoomEvent, RoomOptions, SimulateScenario}, + livekit_api::access_token::{AccessToken, VideoGrants}, + std::{env, net::SocketAddr, time::Duration}, + tokio::{ + net::{TcpListener, TcpStream}, + sync::{mpsc::UnboundedReceiver, watch}, + time::timeout, + }, +}; + +mod common; + +/// Drives a reconnection via `scenario` and asserts the room reports +/// `Reconnecting`, then `Reconnected`, and ends up `Connected` again. +#[cfg(feature = "__lk-e2e-test")] +async fn assert_recovers( + room: Room, + mut events: UnboundedReceiver, + scenario: SimulateScenario, +) -> Result<()> { + assert_eq!(room.connection_state(), ConnectionState::Connected); + + // Kick off the reconnection. These scenarios return promptly (they close the + // local signal channel or ask the server to issue a Leave); recovery then + // proceeds asynchronously and surfaces as room events, which are buffered on + // an unbounded channel until we observe them below. + room.simulate_scenario(scenario) + .await + .map_err(|e| anyhow!("simulate_scenario failed: {e:?}"))?; + + // Expect Reconnecting, then Reconnected. Ignore unrelated events in between. + let observe = async { + let mut saw_reconnecting = false; + while let Some(event) = events.recv().await { + match event { + RoomEvent::Reconnecting => saw_reconnecting = true, + RoomEvent::Reconnected => { + if !saw_reconnecting { + bail!("received Reconnected without a preceding Reconnecting"); + } + return Ok(()); + } + RoomEvent::Disconnected { reason } => { + bail!("room disconnected during reconnection: {reason:?}"); + } + _ => {} + } + } + bail!("event stream ended before the room reconnected"); + }; + + timeout(Duration::from_secs(30), observe).await??; + + assert_eq!( + room.connection_state(), + ConnectionState::Connected, + "room should be Connected after recovery" + ); + Ok(()) +} + +#[cfg(feature = "__lk-e2e-test")] +#[test_log::test(tokio::test)] +async fn test_signal_reconnect_resumes() -> Result<()> { + let (room, events) = test_rooms(1).await?.pop().unwrap(); + assert_recovers(room, events, SimulateScenario::SignalReconnect).await +} + +// `FullReconnect` forces a full reconnect (new session, republish) — driven +// client-side, so it does not depend on the server echoing a leave. +#[cfg(feature = "__lk-e2e-test")] +#[test_log::test(tokio::test)] +async fn test_full_reconnect_recovers() -> Result<()> { + let (room, events) = test_rooms(1).await?.pop().unwrap(); + assert_recovers(room, events, SimulateScenario::FullReconnect).await +} + +// The server drops the signalling link during the resume, so the resume cannot +// complete and the engine must escalate to a full reconnect. Recovery here is +// only possible via that escalation, so a successful Reconnecting → Reconnected +// exercises the resume→full path (and the Restarting emitted on escalation, +// which drives the Room's remote-participant cleanup before the full reconnect). +#[cfg(feature = "__lk-e2e-test")] +#[test_log::test(tokio::test)] +async fn test_resume_failure_escalates_to_full_reconnect() -> Result<()> { + let (room, events) = test_rooms(1).await?.pop().unwrap(); + assert_recovers(room, events, SimulateScenario::DisconnectSignalOnResume).await +} + +/// A minimal TCP proxy in front of the signalling server that can be killed. +/// Sending `true` on the returned channel closes the in-flight connection and +/// stops accepting, so the client's reconnect attempts all fail (connection +/// refused). Used to drive the engine's reconnect loop to exhaustion. +#[cfg(feature = "__lk-e2e-test")] +async fn start_killable_proxy(target_host_port: String) -> (SocketAddr, watch::Sender) { + let listener = TcpListener::bind("127.0.0.1:0").await.expect("bind proxy"); + let addr = listener.local_addr().expect("proxy addr"); + let (kill_tx, kill_rx) = watch::channel(false); + + tokio::spawn(async move { + loop { + let mut kr = kill_rx.clone(); + tokio::select! { + _ = kr.changed() => break, // kill: stop accepting; dropping `listener` refuses new connects + accepted = listener.accept() => { + let Ok((mut inbound, _)) = accepted else { break }; + let target = target_host_port.clone(); + let mut kr2 = kill_rx.clone(); + tokio::spawn(async move { + if let Ok(mut outbound) = TcpStream::connect(&target).await { + tokio::select! { + _ = kr2.changed() => {} // kill: drop both streams, severing the client link + _ = tokio::io::copy_bidirectional(&mut inbound, &mut outbound) => {} + } + } + }); + } + } + } + }); + + (addr, kill_tx) +} + +// When every reconnect attempt fails, the engine must exhaust its bounded +// retries and emit Disconnected — it must NOT stay stuck in Reconnecting +// forever. We connect through a killable proxy, kill it, and assert the room +// reaches Disconnected after a reconnection was attempted. (The reason is +// UnknownReason here because a dropped signal link carries no richer cause; the +// #2b improvement surfaces a meaningful cause only when the triggering event +// has one.) Slow by design: it waits out the full bounded backoff sequence. +#[cfg(feature = "__lk-e2e-test")] +#[test_log::test(tokio::test)] +async fn test_reconnect_exhaustion_disconnects() -> Result<()> { + let api_key = env::var("LIVEKIT_API_KEY").unwrap_or_else(|_| "devkey".into()); + let api_secret = env::var("LIVEKIT_API_SECRET").unwrap_or_else(|_| "secret".into()); + let server_url = env::var("LIVEKIT_URL").unwrap_or_else(|_| "ws://localhost:7880".into()); + + // Derive host:port for raw TCP forwarding (the WS upgrade rides over it). + let target = server_url + .split("://") + .last() + .and_then(|rest| rest.split('/').next()) + .unwrap_or("localhost:7880") + .to_string(); + + let (proxy_addr, kill) = start_killable_proxy(target).await; + let proxy_url = format!("ws://{proxy_addr}"); + + let room_name = format!("test_room_{}", create_random_uuid()); + let token = AccessToken::with_api_key(&api_key, &api_secret) + .with_ttl(Duration::from_secs(30 * 60)) + .with_grants(VideoGrants { room_join: true, room: room_name, ..Default::default() }) + .with_identity("p0") + .with_name("Participant 0") + .to_jwt()?; + + let (room, mut events) = Room::connect(&proxy_url, &token, RoomOptions::default()).await?; + assert_eq!(room.connection_state(), ConnectionState::Connected); + + // Sever the link and refuse all reconnects. + kill.send(true).ok(); + + let observe = async { + let mut saw_reconnecting = false; + while let Some(event) = events.recv().await { + match event { + RoomEvent::Reconnecting => saw_reconnecting = true, + RoomEvent::Disconnected { reason } => { + if !saw_reconnecting { + bail!("disconnected without attempting reconnection first"); + } + return Ok(reason); + } + _ => {} + } + } + bail!("event stream ended before the room reported Disconnected"); + }; + + // Generous timeout: the engine works through its full bounded backoff before + // giving up. + let _reason = timeout(Duration::from_secs(90), observe).await??; + + assert_eq!( + room.connection_state(), + ConnectionState::Disconnected, + "room must reach Disconnected after reconnection is exhausted, not hang in Reconnecting" + ); + Ok(()) +}