From 84a84c8e86cbb19577b0d564337ab8c5449345aa Mon Sep 17 00:00:00 2001 From: lukasIO Date: Tue, 9 Jun 2026 14:33:10 +0200 Subject: [PATCH 1/8] error early when not authenticated --- Cargo.lock | 1 + livekit/Cargo.toml | 1 + livekit/specs/signalling-reconnection.allium | 43 ++++++---- livekit/src/rtc_engine/mod.rs | 90 ++++++++++++++++++++ 4 files changed, 119 insertions(+), 16 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b85400203..5a0cffb65 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3999,6 +3999,7 @@ dependencies = [ "bytes", "chrono", "futures-util", + "http 1.4.0", "lazy_static", "libloading 0.8.9", "libwebrtc", diff --git a/livekit/Cargo.toml b/livekit/Cargo.toml index 36fe1af9b..fa222762d 100644 --- a/livekit/Cargo.toml +++ b/livekit/Cargo.toml @@ -53,3 +53,4 @@ anyhow = "1.0.99" test-log = "0.2.18" test-case = "3.3" serial_test = "3.0" +http = "1.1" diff --git a/livekit/specs/signalling-reconnection.allium b/livekit/specs/signalling-reconnection.allium index 809557c24..8f8d8e887 100644 --- a/livekit/specs/signalling-reconnection.allium +++ b/livekit/specs/signalling-reconnection.allium @@ -86,6 +86,7 @@ enum DisconnectCause { | server_leave | signal_severed_during_resume | reconnect_attempts_exhausted + | unauthorized | client_initiated | unknown } @@ -183,7 +184,10 @@ entity EngineConnection { ------------------------------------------------------------ config { - -- Initial connect: extra attempts after the first (Room default). + -- Initial connect: extra attempts after the first (Room default). A + -- validated auth failure (HTTP 401/403, DisconnectCause unauthorized) is + -- surfaced immediately and does NOT consume these retries (R4.2), matching + -- the reconnect loop's terminal-failure handling. join_retries: Integer = 3 -- Per signalling-link connect attempt timeout (SIGNAL_CONNECT_TIMEOUT). connect_timeout: Duration = 5.seconds @@ -489,7 +493,7 @@ rule ResumeRechecksLink { if not engine.signal.is_alive: ensures: ResumeAttemptFailed( engine, - server_disconnect: false, + terminal: false, cause: signal_severed_during_resume ) else: @@ -519,28 +523,32 @@ rule ResumeAttemptSucceeds { ensures: EngineResumed(engine) } -rule ResumeAttemptDisconnects { - when: ResumeAttemptFailed(engine, server_disconnect, cause) +rule ResumeAttemptFailsTerminally { + when: ResumeAttemptFailed(engine, terminal, cause) requires: engine.status = reconnecting - requires: server_disconnect + requires: terminal ensures: engine.reconnect_permission = revoked ensures: engine.status = closed ensures: EngineDisconnected(engine, cause: cause) @guidance - -- A resume that failed because the server sent Leave{Disconnect}: stop, - -- do not escalate or retry. + -- A resume that failed in a NON-RETRYABLE way: stop, do not escalate or + -- retry. Terminal failures are (a) a server Leave{Disconnect} + -- (cause = server_leave) and (b) DELTA R4.2: an authentication failure + -- (cause = unauthorized, a server-validated HTTP 401/403) — the same + -- token will not succeed on retry, so retrying would just hammer the + -- server. `cause` carries which one for the Room. } rule ResumeAttemptEscalates { - when: ResumeAttemptFailed(engine, server_disconnect, cause) + when: ResumeAttemptFailed(engine, terminal, cause) requires: engine.status = reconnecting - requires: not server_disconnect + requires: not terminal ensures: engine.mode = full ensures: EngineRestarting(engine) ensures: RetryReconnect(engine) @guidance - -- DELTA 2: a failed resume escalates to full reconnect AND emits - -- Restarting on the escalation (previously silent), then retries. + -- DELTA 2: a (retryable) failed resume escalates to full reconnect AND + -- emits Restarting on the escalation (previously silent), then retries. } rule RestartAttemptSucceeds { @@ -555,19 +563,22 @@ rule RestartAttemptSucceeds { -- the old only on success, so a failed attempt leaves the old usable. } -rule RestartAttemptDisconnects { - when: RestartAttemptFailed(engine, server_disconnect, cause) +rule RestartAttemptFailsTerminally { + when: RestartAttemptFailed(engine, terminal, cause) requires: engine.status = reconnecting - requires: server_disconnect + requires: terminal ensures: engine.reconnect_permission = revoked ensures: engine.status = closed ensures: EngineDisconnected(engine, cause: cause) + @guidance + -- Non-retryable full-reconnect failure: a server Leave{Disconnect} or + -- (DELTA R4.2) a validated auth failure (cause = unauthorized). Stop. } rule RestartAttemptRetries { - when: RestartAttemptFailed(engine, server_disconnect, cause) + when: RestartAttemptFailed(engine, terminal, cause) requires: engine.status = reconnecting - requires: not server_disconnect + requires: not terminal ensures: RetryReconnect(engine) } diff --git a/livekit/src/rtc_engine/mod.rs b/livekit/src/rtc_engine/mod.rs index ce193c1fa..449248973 100644 --- a/livekit/src/rtc_engine/mod.rs +++ b/livekit/src/rtc_engine/mod.rs @@ -495,6 +495,14 @@ impl EngineInner { match try_connect().await { Ok(res) => return Ok(res), Err(e) => { + // A validated auth failure (401/403) will not succeed on + // retry with the same token — surface it immediately instead + // of burning the remaining join attempts. Same classification + // as the reconnect loop (see `auth_failure_reason`). + if auth_failure_reason(&e).is_some() { + log::warn!("authentication rejected during connect ({e}); not retrying"); + return Err(e); + } let attempt_i = i + 1; if i < max_retries { log::warn!( @@ -911,6 +919,14 @@ impl EngineInner { "server requested disconnect during restart".into(), )); } + if let Some(reason) = auth_failure_reason(&err) { + log::warn!("authentication rejected during restart ({err}); not retrying"); + self.running_handle.write().can_reconnect = false; + self.close(reason).await; + return Err(EngineError::Connection( + "authentication failed during reconnect".into(), + )); + } log::error!("restarting connection failed: {}", err); } } @@ -939,6 +955,14 @@ impl EngineInner { "server requested disconnect during resume".into(), )); } + if let Some(reason) = auth_failure_reason(&err) { + log::warn!("authentication rejected during resume ({err}); not retrying"); + self.running_handle.write().can_reconnect = false; + self.close(reason).await; + return Err(EngineError::Connection( + "authentication failed during reconnect".into(), + )); + } log::error!("resuming connection failed: {}", err); let mut running_handle = self.running_handle.write(); running_handle.full_reconnect = true; @@ -1091,6 +1115,28 @@ fn leave_disconnect_reason(err: &EngineError) -> Option { None } +/// Inspect a reconnect-attempt error for a genuine authentication/authorization +/// failure (HTTP 401/403). Such a failure will not succeed on retry with the +/// same token, so the reconnect loop should bail out immediately rather than +/// burning every attempt (and hammering the server) with credentials it already +/// knows are rejected. +/// +/// We key on `SignalError::Client(401|403)`, which is produced by the server's +/// `rtc/validate` probe (see [`super`]'s `SignalInner::validate`) — an +/// authoritative classification. We deliberately do NOT key on the raw +/// `WsError::Http` upgrade status, because that can be a fabricated 401 masking a +/// transient server error (e.g. a 503 from a saturated node), which IS +/// retryable. A resume that hits a raw 401 simply escalates to a full reconnect, +/// whose connect path runs `validate()` and surfaces the authoritative status. +fn auth_failure_reason(err: &EngineError) -> Option { + if let EngineError::Signal(SignalError::Client(status, _)) = err { + if matches!(status.as_u16(), 401 | 403) { + return Some(DisconnectReason::JoinFailure); + } + } + None +} + #[cfg(test)] mod tests { use super::*; @@ -1139,6 +1185,50 @@ mod tests { } } + #[test] + fn auth_failure_reason_flags_validated_401_and_403() { + // The server's rtc/validate probe surfaces auth failures as Client(4xx). + for status in [401u16, 403] { + let err = EngineError::Signal(SignalError::Client( + http::StatusCode::from_u16(status).unwrap(), + "invalid token".into(), + )); + assert_eq!( + auth_failure_reason(&err), + Some(DisconnectReason::JoinFailure), + "Client({status}) must be treated as a non-retryable auth failure" + ); + } + } + + #[test] + fn auth_failure_reason_ignores_other_client_and_server_errors() { + let not_auth = [ + // Other client errors are not auth failures. + EngineError::Signal(SignalError::Client(http::StatusCode::NOT_FOUND, "".into())), + EngineError::Signal(SignalError::Client( + http::StatusCode::TOO_MANY_REQUESTS, + "".into(), + )), + // Server errors (e.g. a saturated node) are retryable. + EngineError::Signal(SignalError::Server( + http::StatusCode::SERVICE_UNAVAILABLE, + "".into(), + )), + // Generic connectivity/internal errors are retryable. + EngineError::Connection("network".into()), + EngineError::Internal("bug".into()), + EngineError::Signal(SignalError::SendError), + EngineError::Signal(SignalError::Timeout("waiting".into())), + ]; + for err in ¬_auth { + assert!( + auth_failure_reason(err).is_none(), + "{err:?} must NOT be treated as an auth failure" + ); + } + } + #[test] fn backoff_nominal_grows_geometrically_then_caps() { // attempt 1 == base, then x2 each step, until it saturates at the cap. From ae4a1f9e467e6bf45f1effb7f4a413d52a8737e5 Mon Sep 17 00:00:00 2001 From: lukasIO Date: Tue, 9 Jun 2026 14:49:24 +0200 Subject: [PATCH 2/8] region cache --- livekit-api/src/signal_client/region.rs | 96 +++++++++++++++++++- livekit/specs/signalling-reconnection.allium | 30 ++++++ 2 files changed, 121 insertions(+), 5 deletions(-) diff --git a/livekit-api/src/signal_client/region.rs b/livekit-api/src/signal_client/region.rs index 89db74e46..73198624e 100644 --- a/livekit-api/src/signal_client/region.rs +++ b/livekit-api/src/signal_client/region.rs @@ -12,13 +12,60 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::{ + collections::HashMap, + sync::OnceLock, + time::{Duration, Instant}, +}; + use http::header::{HeaderMap, HeaderValue, AUTHORIZATION}; +use parking_lot::Mutex; use serde::Deserialize; use crate::http_client; use super::{SignalError, SignalResult, REGION_FETCH_TIMEOUT}; +/// How long a fetched region list is reused before being re-fetched. Matches +/// client-sdk-js's `DEFAULT_MAX_AGE_MS`. (The server's `Cache-Control: max-age` +/// is not yet honoured — a fixed TTL keeps this backend-agnostic; honouring the +/// header is a possible refinement.) +const REGION_CACHE_TTL: Duration = Duration::from_secs(5); + +struct CachedRegions { + urls: Vec, + fetched_at: Instant, +} + +/// Process-wide region-list cache keyed by host, mirroring client-sdk-js's +/// static `RegionUrlProvider.cache`. Persisting it here (rather than on a +/// per-connection object) means it survives across reconnect attempts — each of +/// which rebuilds the SignalClient — so the reconnect loop does not re-pay the +/// region fetch on every attempt. +fn region_cache() -> &'static Mutex> { + static CACHE: OnceLock>> = OnceLock::new(); + CACHE.get_or_init(|| Mutex::new(HashMap::new())) +} + +/// Cached region URLs for `host` if the entry is still within +/// [`REGION_CACHE_TTL`], else `None` (the caller should re-fetch). +fn cached_region_urls(host: &str) -> Option> { + let cache = region_cache().lock(); + cache.get(host).filter(|e| e.fetched_at.elapsed() < REGION_CACHE_TTL).map(|e| e.urls.clone()) +} + +fn store_region_urls(host: String, urls: Vec) { + region_cache().lock().insert(host, CachedRegions { urls, fetched_at: Instant::now() }); +} + +fn region_host(url: &str) -> SignalResult { + let parsed = url::Url::parse(url).map_err(|err| SignalError::UrlParse(err.to_string()))?; + parsed + .host_str() + .map(|h| h.to_string()) + .ok_or_else(|| SignalError::UrlParse("invalid hostname".into())) +} + pub struct RegionUrlProvider; #[derive(Deserialize)] @@ -34,13 +81,24 @@ pub struct RegionUrlInfo { } impl RegionUrlProvider { + /// Fetch the ordered list of region signalling URLs for a LiveKit Cloud + /// host. Non-cloud (direct / self-hosted) URLs have no regions, so this + /// returns an empty list. Successful results are cached per host for + /// [`REGION_CACHE_TTL`]; failures are never cached. pub async fn fetch_region_urls(url: &str, token: &str) -> SignalResult> { - if is_cloud_url(url)? { - let endpoint = region_endpoint(url)?; - fetch_from_endpoint(&endpoint, token).await - } else { - Ok(vec![]) + if !is_cloud_url(url)? { + return Ok(vec![]); + } + + let host = region_host(url)?; + if let Some(urls) = cached_region_urls(&host) { + return Ok(urls); } + + let endpoint = region_endpoint(url)?; + let urls = fetch_from_endpoint(&endpoint, token).await?; + store_region_urls(host, urls.clone()); + Ok(urls) } } @@ -113,6 +171,34 @@ mod tests { assert!(!is_cloud_url("wss://livekit.cloud.example.com").unwrap()); } + #[test] + fn test_region_host() { + assert_eq!(region_host("wss://myapp.livekit.cloud").unwrap(), "myapp.livekit.cloud"); + assert_eq!(region_host("https://myapp.livekit.cloud/rtc").unwrap(), "myapp.livekit.cloud"); + assert!(region_host("not a url").is_err()); + } + + #[test] + fn region_cache_hits_fresh_and_misses_unknown_or_stale() { + // Unique hosts so the process-wide cache doesn't collide with other tests. + let host = "cache-fresh.livekit.cloud"; + assert!(cached_region_urls(host).is_none(), "unknown host is a miss"); + + let urls = vec!["wss://r1.livekit.cloud".to_string(), "wss://r2.livekit.cloud".to_string()]; + store_region_urls(host.to_string(), urls.clone()); + assert_eq!(cached_region_urls(host), Some(urls), "fresh entry is a hit"); + + // A stale entry (fetched older than the TTL) is treated as a miss. + let stale_host = "cache-stale.livekit.cloud"; + if let Some(past) = Instant::now().checked_sub(REGION_CACHE_TTL * 2) { + region_cache().lock().insert( + stale_host.to_string(), + CachedRegions { urls: vec!["wss://old.livekit.cloud".into()], fetched_at: past }, + ); + assert!(cached_region_urls(stale_host).is_none(), "stale entry is a miss"); + } + } + #[test] fn test_region_endpoint() { assert_eq!( diff --git a/livekit/specs/signalling-reconnection.allium b/livekit/specs/signalling-reconnection.allium index 8f8d8e887..73bc455c7 100644 --- a/livekit/specs/signalling-reconnection.allium +++ b/livekit/specs/signalling-reconnection.allium @@ -22,6 +22,25 @@ -- Boundaries: peer-connection / media recovery (RtcSession) and the Room are -- external; modelled via the MediaLayer / Application surfaces below. -- +-- Structure (two layers, kept in one spec — they share the SignalConnection +-- entity, the connect/region/validate path, the auth classification and +-- DisconnectCause, so splitting would force cross-spec entity sharing for little +-- gain; the modular seams are expressed as boundary surfaces instead): +-- +-- LAYER 1 — Signalling link (mirrors livekit-api/signal_client) +-- entity SignalConnection; rules under "Layer 1" below: +-- * Establishment — initial connect, region fallback, auth rejection. +-- (The engine-side join-retries loop is config-level, see join_retries; +-- its only modelled outcome is auth fast-fail, noted there.) +-- * Liveness & lifecycle — ping/timeout, resume, send/queue, token, close. +-- boundary surfaces: SignalCommands (app), ServerSignalling (SFU). +-- +-- LAYER 2 — Engine reconnect orchestration (mirrors rtc_engine) +-- entity EngineConnection; rules under "Layer 2" below: initial connection, +-- reconnection causes, start/escalate, per-attempt dispatch, the decoupled +-- resume chain, and attempt outcomes (incl. terminal/non-retryable failures). +-- boundary surfaces: MediaRecovery (RtcSession), RoomConnectionLifecycle (Room). +-- -- Checker note (allium 3.2.4): `allium check` reports residual `status` -- warnings that are CHECKER ARTIFACTS, not spec defects -- every state flagged -- "never assigned" is plainly assigned by a visible `ensures`. Three confirmed @@ -209,6 +228,10 @@ config { -- Rules ------------------------------------------------------------ +-- ########################################################################### +-- ## LAYER 1 — Signalling link (mirrors livekit-api/signal_client) +-- ########################################################################### + -- === Establishment (DELTA 5: region fallback) =============================== rule SignalConnectionEstablished { @@ -230,6 +253,9 @@ rule PrimaryConnectFailsOnCloud { ensures: RegionFetchRequested(error) @guidance -- DELTA 5: only LiveKit Cloud hosts attempt region fallback. + -- DELTA R3.2: the region list is cached per host with a TTL (default + -- 5s), so a RegionFetchRequested within the TTL is served from cache + -- instead of re-paying the network fetch on every reconnect attempt. } rule PrimaryConnectFailsOnDirect { @@ -341,6 +367,10 @@ rule CloseSignalLink { ensures: connection.status = closed } +-- ########################################################################### +-- ## LAYER 2 — Engine reconnect orchestration (mirrors rtc_engine) +-- ########################################################################### + -- === Engine: initial connection ============================================= rule EngineConnects { From 286e64338f2b9eae94f08b226be04172415e27ed Mon Sep 17 00:00:00 2001 From: lukasIO Date: Tue, 9 Jun 2026 14:59:09 +0200 Subject: [PATCH 3/8] update spec --- livekit/specs/signalling-reconnection.allium | 33 ++++++++++++++------ 1 file changed, 23 insertions(+), 10 deletions(-) diff --git a/livekit/specs/signalling-reconnection.allium b/livekit/specs/signalling-reconnection.allium index 73bc455c7..9ff110582 100644 --- a/livekit/specs/signalling-reconnection.allium +++ b/livekit/specs/signalling-reconnection.allium @@ -104,7 +104,6 @@ enum DisconnectCause { | peer_connection_failed | server_leave | signal_severed_during_resume - | reconnect_attempts_exhausted | unauthorized | client_initiated | unknown @@ -141,9 +140,11 @@ entity SignalConnection { -- The engine that drives this link's recovery. engine: EngineConnection with signal = this - -- The link is dead if nothing arrived within ping_timeout. Used by the - -- resume's mid-flight liveness re-check (ResumeRechecksLink). - is_alive: last_message_at != null and last_message_at + ping_timeout > now + -- The link is present and usable while connected or mid-resume; `lost` or + -- `closed` mean the stream is gone. Used by the resume's mid-flight re-check + -- (ResumeRechecksLink) and mirrors the implementation's `is_connected()` + -- (stream-present) check rather than ping-timeout liveness. + is_connected: status in {connected, reconnecting} transitions status { connected -> lost -- ping timeout / stream closed @@ -177,6 +178,10 @@ entity EngineConnection { -- (A `permitted -> revoked` terminal transition graph would express this -- structurally, but the entity's `status` graph below takes the single graph -- slot the current checker honours; see the spec header note.) + -- NOTE (spec<->code): the implementation realises this as a `can_reconnect` + -- bool that is only ever set false (never back to true) within a session, + -- always alongside a close() — i.e. it already satisfies this latch and the + -- RevokedImpliesClosed invariant; the enum is the spec-level model. reconnect_permission: permitted | revoked -- 1-based attempt index within the current episode; 0 when connected. @@ -468,6 +473,10 @@ rule EscalateReconnect { -- event matching the mode it is in (previously the Room saw Resuming then -- Restarted with no Restarting between). retry_now restarts the backoff so -- the in-flight loop's next attempt fires immediately. + -- NOTE (spec<->code): the implementation emits Restarting lazily, on the + -- next attempt the loop runs in full mode (a `restarting_emitted` latch), + -- rather than eagerly at this escalation event. Net effect is identical + -- (Restarting always precedes the full reconnect). } -- === Engine: per-attempt dispatch =========================================== @@ -520,7 +529,7 @@ rule ResumeAwaitsPeerConnections { rule ResumeRechecksLink { when: PeerConnectionsReconnected(engine) requires: engine.status = reconnecting - if not engine.signal.is_alive: + if not engine.signal.is_connected: ensures: ResumeAttemptFailed( engine, terminal: false, @@ -628,12 +637,16 @@ rule ReconnectExhausted { requires: engine.status = reconnecting requires: engine.attempt >= config.max_reconnect_attempts ensures: engine.status = closed - ensures: EngineDisconnected(engine, cause: reconnect_attempts_exhausted) + ensures: EngineDisconnected(engine, cause: engine.reconnect_cause) @guidance - -- DELTA 2: on giving up, EngineDisconnected reports - -- reconnect_attempts_exhausted; the engine's reconnect_cause also remains - -- available to the Room for the underlying reason, instead of `unknown`. - -- Always emitted so the Room leaves the Reconnecting state. + -- DELTA 2: on giving up, EngineDisconnected reports the cause that STARTED + -- this episode (engine.reconnect_cause), threaded through instead of a + -- generic `unknown`. It is always emitted so the Room leaves Reconnecting + -- rather than hanging there. + -- NOTE (spec<->code): there is no dedicated "attempts exhausted" disconnect + -- reason in the wire protocol (proto DisconnectReason), so the original + -- cause is what surfaces. A distinct exhausted reason would require a + -- protocol addition. } ------------------------------------------------------------ From af502d3e5ccaf5a895c7497cc40a9a7b758bfcda Mon Sep 17 00:00:00 2001 From: lukasIO Date: Tue, 9 Jun 2026 15:04:46 +0200 Subject: [PATCH 4/8] sync signalling-reconnection spec to latest --- livekit/specs/signalling-reconnection.allium | 63 ++++++++++++++++---- 1 file changed, 53 insertions(+), 10 deletions(-) diff --git a/livekit/specs/signalling-reconnection.allium b/livekit/specs/signalling-reconnection.allium index 8f8d8e887..9ff110582 100644 --- a/livekit/specs/signalling-reconnection.allium +++ b/livekit/specs/signalling-reconnection.allium @@ -22,6 +22,25 @@ -- Boundaries: peer-connection / media recovery (RtcSession) and the Room are -- external; modelled via the MediaLayer / Application surfaces below. -- +-- Structure (two layers, kept in one spec — they share the SignalConnection +-- entity, the connect/region/validate path, the auth classification and +-- DisconnectCause, so splitting would force cross-spec entity sharing for little +-- gain; the modular seams are expressed as boundary surfaces instead): +-- +-- LAYER 1 — Signalling link (mirrors livekit-api/signal_client) +-- entity SignalConnection; rules under "Layer 1" below: +-- * Establishment — initial connect, region fallback, auth rejection. +-- (The engine-side join-retries loop is config-level, see join_retries; +-- its only modelled outcome is auth fast-fail, noted there.) +-- * Liveness & lifecycle — ping/timeout, resume, send/queue, token, close. +-- boundary surfaces: SignalCommands (app), ServerSignalling (SFU). +-- +-- LAYER 2 — Engine reconnect orchestration (mirrors rtc_engine) +-- entity EngineConnection; rules under "Layer 2" below: initial connection, +-- reconnection causes, start/escalate, per-attempt dispatch, the decoupled +-- resume chain, and attempt outcomes (incl. terminal/non-retryable failures). +-- boundary surfaces: MediaRecovery (RtcSession), RoomConnectionLifecycle (Room). +-- -- Checker note (allium 3.2.4): `allium check` reports residual `status` -- warnings that are CHECKER ARTIFACTS, not spec defects -- every state flagged -- "never assigned" is plainly assigned by a visible `ensures`. Three confirmed @@ -85,7 +104,6 @@ enum DisconnectCause { | peer_connection_failed | server_leave | signal_severed_during_resume - | reconnect_attempts_exhausted | unauthorized | client_initiated | unknown @@ -122,9 +140,11 @@ entity SignalConnection { -- The engine that drives this link's recovery. engine: EngineConnection with signal = this - -- The link is dead if nothing arrived within ping_timeout. Used by the - -- resume's mid-flight liveness re-check (ResumeRechecksLink). - is_alive: last_message_at != null and last_message_at + ping_timeout > now + -- The link is present and usable while connected or mid-resume; `lost` or + -- `closed` mean the stream is gone. Used by the resume's mid-flight re-check + -- (ResumeRechecksLink) and mirrors the implementation's `is_connected()` + -- (stream-present) check rather than ping-timeout liveness. + is_connected: status in {connected, reconnecting} transitions status { connected -> lost -- ping timeout / stream closed @@ -158,6 +178,10 @@ entity EngineConnection { -- (A `permitted -> revoked` terminal transition graph would express this -- structurally, but the entity's `status` graph below takes the single graph -- slot the current checker honours; see the spec header note.) + -- NOTE (spec<->code): the implementation realises this as a `can_reconnect` + -- bool that is only ever set false (never back to true) within a session, + -- always alongside a close() — i.e. it already satisfies this latch and the + -- RevokedImpliesClosed invariant; the enum is the spec-level model. reconnect_permission: permitted | revoked -- 1-based attempt index within the current episode; 0 when connected. @@ -209,6 +233,10 @@ config { -- Rules ------------------------------------------------------------ +-- ########################################################################### +-- ## LAYER 1 — Signalling link (mirrors livekit-api/signal_client) +-- ########################################################################### + -- === Establishment (DELTA 5: region fallback) =============================== rule SignalConnectionEstablished { @@ -230,6 +258,9 @@ rule PrimaryConnectFailsOnCloud { ensures: RegionFetchRequested(error) @guidance -- DELTA 5: only LiveKit Cloud hosts attempt region fallback. + -- DELTA R3.2: the region list is cached per host with a TTL (default + -- 5s), so a RegionFetchRequested within the TTL is served from cache + -- instead of re-paying the network fetch on every reconnect attempt. } rule PrimaryConnectFailsOnDirect { @@ -341,6 +372,10 @@ rule CloseSignalLink { ensures: connection.status = closed } +-- ########################################################################### +-- ## LAYER 2 — Engine reconnect orchestration (mirrors rtc_engine) +-- ########################################################################### + -- === Engine: initial connection ============================================= rule EngineConnects { @@ -438,6 +473,10 @@ rule EscalateReconnect { -- event matching the mode it is in (previously the Room saw Resuming then -- Restarted with no Restarting between). retry_now restarts the backoff so -- the in-flight loop's next attempt fires immediately. + -- NOTE (spec<->code): the implementation emits Restarting lazily, on the + -- next attempt the loop runs in full mode (a `restarting_emitted` latch), + -- rather than eagerly at this escalation event. Net effect is identical + -- (Restarting always precedes the full reconnect). } -- === Engine: per-attempt dispatch =========================================== @@ -490,7 +529,7 @@ rule ResumeAwaitsPeerConnections { rule ResumeRechecksLink { when: PeerConnectionsReconnected(engine) requires: engine.status = reconnecting - if not engine.signal.is_alive: + if not engine.signal.is_connected: ensures: ResumeAttemptFailed( engine, terminal: false, @@ -598,12 +637,16 @@ rule ReconnectExhausted { requires: engine.status = reconnecting requires: engine.attempt >= config.max_reconnect_attempts ensures: engine.status = closed - ensures: EngineDisconnected(engine, cause: reconnect_attempts_exhausted) + ensures: EngineDisconnected(engine, cause: engine.reconnect_cause) @guidance - -- DELTA 2: on giving up, EngineDisconnected reports - -- reconnect_attempts_exhausted; the engine's reconnect_cause also remains - -- available to the Room for the underlying reason, instead of `unknown`. - -- Always emitted so the Room leaves the Reconnecting state. + -- DELTA 2: on giving up, EngineDisconnected reports the cause that STARTED + -- this episode (engine.reconnect_cause), threaded through instead of a + -- generic `unknown`. It is always emitted so the Room leaves Reconnecting + -- rather than hanging there. + -- NOTE (spec<->code): there is no dedicated "attempts exhausted" disconnect + -- reason in the wire protocol (proto DisconnectReason), so the original + -- cause is what surfaces. A distinct exhausted reason would require a + -- protocol addition. } ------------------------------------------------------------ From 7746d6754f2aa92fdca17fae576e17681994e9b5 Mon Sep 17 00:00:00 2001 From: lukasIO Date: Tue, 23 Jun 2026 13:12:53 +0200 Subject: [PATCH 5/8] encapsulate in struct --- livekit-api/src/signal_client/region.rs | 66 +++++++++++++++---------- 1 file changed, 39 insertions(+), 27 deletions(-) diff --git a/livekit-api/src/signal_client/region.rs b/livekit-api/src/signal_client/region.rs index 9b46c8781..1c1c36c97 100644 --- a/livekit-api/src/signal_client/region.rs +++ b/livekit-api/src/signal_client/region.rs @@ -27,12 +27,6 @@ use crate::http_client; use super::{SignalError, SignalResult, REGION_FETCH_TIMEOUT}; -/// How long a fetched region list is reused before being re-fetched. Matches -/// client-sdk-js's `DEFAULT_MAX_AGE_MS`. (The server's `Cache-Control: max-age` -/// is not yet honoured — a fixed TTL keeps this backend-agnostic; honouring the -/// header is a possible refinement.) -const REGION_CACHE_TTL: Duration = Duration::from_secs(5); - struct CachedRegions { urls: Vec, fetched_at: Instant, @@ -43,20 +37,37 @@ struct CachedRegions { /// per-connection object) means it survives across reconnect attempts — each of /// which rebuilds the SignalClient — so the reconnect loop does not re-pay the /// region fetch on every attempt. -fn region_cache() -> &'static Mutex> { - static CACHE: OnceLock>> = OnceLock::new(); - CACHE.get_or_init(|| Mutex::new(HashMap::new())) +struct RegionCache { + entries: Mutex>, + ttl: Duration, } -/// Cached region URLs for `host` if the entry is still within -/// [`REGION_CACHE_TTL`], else `None` (the caller should re-fetch). -fn cached_region_urls(host: &str) -> Option> { - let cache = region_cache().lock(); - cache.get(host).filter(|e| e.fetched_at.elapsed() < REGION_CACHE_TTL).map(|e| e.urls.clone()) -} +impl RegionCache { + /// How long a fetched region list is reused before being re-fetched. Matches + /// client-sdk-js's `DEFAULT_MAX_AGE_MS`. (The server's `Cache-Control: max-age` + /// is not yet honoured — a fixed TTL keeps this backend-agnostic; honouring + /// the header is a possible refinement.) + const TTL: Duration = Duration::from_secs(5); + + fn shared() -> &'static RegionCache { + static CACHE: OnceLock = OnceLock::new(); + CACHE.get_or_init(|| Self::new(Self::TTL)) + } + + fn new(ttl: Duration) -> Self { + Self { entries: Mutex::new(HashMap::new()), ttl } + } + + /// Cached region URLs for `host` if the entry is still within [`Self::ttl`], + /// else `None` (the caller should re-fetch). + fn get(&self, host: &str) -> Option> { + let entries = self.entries.lock(); + entries.get(host).filter(|e| e.fetched_at.elapsed() < self.ttl).map(|e| e.urls.clone()) + } -fn store_region_urls(host: String, urls: Vec) { - region_cache().lock().insert(host, CachedRegions { urls, fetched_at: Instant::now() }); + fn insert(&self, host: String, urls: Vec) { + self.entries.lock().insert(host, CachedRegions { urls, fetched_at: Instant::now() }); + } } fn region_host(url: &str) -> SignalResult { @@ -102,20 +113,20 @@ impl RegionUrlProvider { /// Fetch the ordered list of region signalling URLs for a LiveKit Cloud /// host. Non-cloud (direct / self-hosted) URLs have no regions, so this /// returns an empty list. Successful results are cached per host for - /// [`REGION_CACHE_TTL`]; failures are never cached. + /// [`RegionCache::TTL`]; failures are never cached. pub async fn fetch_region_urls(url: &str, token: &str) -> SignalResult> { if !is_cloud_url(url)? { return Ok(vec![]); } let host = region_host(url)?; - if let Some(urls) = cached_region_urls(&host) { + if let Some(urls) = RegionCache::shared().get(&host) { return Ok(urls); } let endpoint = region_endpoint(url)?; let urls = fetch_from_endpoint(&endpoint, token).await?; - store_region_urls(host, urls.clone()); + RegionCache::shared().insert(host, urls.clone()); Ok(urls) } } @@ -342,22 +353,23 @@ mod tests { #[test] fn region_cache_hits_fresh_and_misses_unknown_or_stale() { - // Unique hosts so the process-wide cache doesn't collide with other tests. + let cache = RegionCache::new(RegionCache::TTL); + let host = "cache-fresh.livekit.cloud"; - assert!(cached_region_urls(host).is_none(), "unknown host is a miss"); + assert!(cache.get(host).is_none(), "unknown host is a miss"); let urls = vec!["wss://r1.livekit.cloud".to_string(), "wss://r2.livekit.cloud".to_string()]; - store_region_urls(host.to_string(), urls.clone()); - assert_eq!(cached_region_urls(host), Some(urls), "fresh entry is a hit"); + cache.insert(host.to_string(), urls.clone()); + assert_eq!(cache.get(host), Some(urls), "fresh entry is a hit"); // A stale entry (fetched older than the TTL) is treated as a miss. let stale_host = "cache-stale.livekit.cloud"; - if let Some(past) = Instant::now().checked_sub(REGION_CACHE_TTL * 2) { - region_cache().lock().insert( + if let Some(past) = Instant::now().checked_sub(RegionCache::TTL * 2) { + cache.entries.lock().insert( stale_host.to_string(), CachedRegions { urls: vec!["wss://old.livekit.cloud".into()], fetched_at: past }, ); - assert!(cached_region_urls(stale_host).is_none(), "stale entry is a miss"); + assert!(cache.get(stale_host).is_none(), "stale entry is a miss"); } } From a0614d2f9f109eb0bd009517009b170f9e915d01 Mon Sep 17 00:00:00 2001 From: lukasIO Date: Tue, 23 Jun 2026 13:26:26 +0200 Subject: [PATCH 6/8] honor max-age header --- livekit-api/src/signal_client/mod.rs | 2 +- livekit-api/src/signal_client/region.rs | 164 +++++++++++++++++++----- 2 files changed, 130 insertions(+), 36 deletions(-) diff --git a/livekit-api/src/signal_client/mod.rs b/livekit-api/src/signal_client/mod.rs index c929dde9e..6c739531f 100644 --- a/livekit-api/src/signal_client/mod.rs +++ b/livekit-api/src/signal_client/mod.rs @@ -1299,7 +1299,7 @@ mod tests { let endpoint = format!("http://127.0.0.1:{}/settings/regions", addr.port()); let result = region::fetch_from_endpoint(&endpoint, "fake-token").await; - let urls = result.unwrap(); + let (urls, _max_age) = result.unwrap(); assert_eq!( urls, vec![ diff --git a/livekit-api/src/signal_client/region.rs b/livekit-api/src/signal_client/region.rs index 1c1c36c97..16feac6d3 100644 --- a/livekit-api/src/signal_client/region.rs +++ b/livekit-api/src/signal_client/region.rs @@ -19,7 +19,7 @@ use std::{ time::{Duration, Instant}, }; -use http::header::{HeaderMap, HeaderValue, AUTHORIZATION}; +use http::header::{HeaderMap, HeaderValue, AUTHORIZATION, CACHE_CONTROL}; use parking_lot::Mutex; use serde::Deserialize; @@ -30,6 +30,20 @@ use super::{SignalError, SignalResult, REGION_FETCH_TIMEOUT}; struct CachedRegions { urls: Vec, fetched_at: Instant, + /// Effective lifetime of this entry: the server's `Cache-Control: max-age` + /// when present, otherwise [`RegionCache::default_ttl`]. + ttl: Duration, +} + +/// Outcome of a [`RegionCache::get`] lookup. +enum Cached { + /// Entry exists and is within the TTL — safe to use without re-fetching. + Fresh(Vec), + /// Entry exists but is older than the TTL — the caller should re-fetch, but + /// may fall back to these URLs if the re-fetch fails. + Stale(Vec), + /// No entry for this host. + Miss, } /// Process-wide region-list cache keyed by host, mirroring client-sdk-js's @@ -39,34 +53,41 @@ struct CachedRegions { /// region fetch on every attempt. struct RegionCache { entries: Mutex>, - ttl: Duration, + default_ttl: Duration, } impl RegionCache { - /// How long a fetched region list is reused before being re-fetched. Matches - /// client-sdk-js's `DEFAULT_MAX_AGE_MS`. (The server's `Cache-Control: max-age` - /// is not yet honoured — a fixed TTL keeps this backend-agnostic; honouring - /// the header is a possible refinement.) - const TTL: Duration = Duration::from_secs(5); + /// Fallback entry lifetime, used when the server's region response carries + /// no `Cache-Control: max-age`. Matches client-sdk-js's `DEFAULT_MAX_AGE_MS`. + const DEFAULT_TTL: Duration = Duration::from_secs(5); fn shared() -> &'static RegionCache { static CACHE: OnceLock = OnceLock::new(); - CACHE.get_or_init(|| Self::new(Self::TTL)) + CACHE.get_or_init(|| Self::new(Self::DEFAULT_TTL)) } - fn new(ttl: Duration) -> Self { - Self { entries: Mutex::new(HashMap::new()), ttl } + fn new(default_ttl: Duration) -> Self { + Self { entries: Mutex::new(HashMap::new()), default_ttl } } - /// Cached region URLs for `host` if the entry is still within [`Self::ttl`], - /// else `None` (the caller should re-fetch). - fn get(&self, host: &str) -> Option> { + /// Looks up the cached region URLs for `host`, reporting whether the entry + /// is fresh (within its TTL), stale, or absent. A stale entry is retained so + /// callers can fall back to it when a re-fetch fails. + fn get(&self, host: &str) -> Cached { let entries = self.entries.lock(); - entries.get(host).filter(|e| e.fetched_at.elapsed() < self.ttl).map(|e| e.urls.clone()) + match entries.get(host) { + Some(e) if e.fetched_at.elapsed() < e.ttl => Cached::Fresh(e.urls.clone()), + Some(e) => Cached::Stale(e.urls.clone()), + None => Cached::Miss, + } } - fn insert(&self, host: String, urls: Vec) { - self.entries.lock().insert(host, CachedRegions { urls, fetched_at: Instant::now() }); + /// Stores `urls` for `host`, honouring the server's `Cache-Control: max-age` + /// (`max_age`) as the entry's TTL and falling back to [`Self::default_ttl`] + /// when the header is absent. + fn insert(&self, host: String, urls: Vec, max_age: Option) { + let ttl = max_age.unwrap_or(self.default_ttl); + self.entries.lock().insert(host, CachedRegions { urls, fetched_at: Instant::now(), ttl }); } } @@ -112,29 +133,50 @@ pub struct RegionUrlInfo { impl RegionUrlProvider { /// Fetch the ordered list of region signalling URLs for a LiveKit Cloud /// host. Non-cloud (direct / self-hosted) URLs have no regions, so this - /// returns an empty list. Successful results are cached per host for - /// [`RegionCache::TTL`]; failures are never cached. + /// returns an empty list. Successful results are cached per host for the + /// server's `Cache-Control: max-age` (or [`RegionCache::DEFAULT_TTL`] when + /// absent); failures are never cached. Once an entry goes stale a re-fetch + /// is attempted, but if it fails the stale entry is returned as a fallback + /// rather than surfacing the error. pub async fn fetch_region_urls(url: &str, token: &str) -> SignalResult> { if !is_cloud_url(url)? { return Ok(vec![]); } let host = region_host(url)?; - if let Some(urls) = RegionCache::shared().get(&host) { - return Ok(urls); - } + let cache = RegionCache::shared(); + let stale = match cache.get(&host) { + Cached::Fresh(urls) => return Ok(urls), + Cached::Stale(urls) => Some(urls), + Cached::Miss => None, + }; let endpoint = region_endpoint(url)?; - let urls = fetch_from_endpoint(&endpoint, token).await?; - RegionCache::shared().insert(host, urls.clone()); - Ok(urls) + match fetch_from_endpoint(&endpoint, token).await { + Ok((urls, max_age)) => { + cache.insert(host, urls.clone(), max_age); + Ok(urls) + } + // The fresh fetch failed; fall back to the stale entry if we have + // one rather than failing outright. + Err(err) => match stale { + Some(urls) => { + log::warn!("region fetch failed ({err}); using stale cached regions for {host}"); + Ok(urls) + } + None => Err(err), + }, + } } } +/// Fetches the region list from `endpoint_url`, returning the ordered URLs +/// together with the server's `Cache-Control: max-age` (if any) so the caller +/// can use it as the cache TTL. pub(crate) async fn fetch_from_endpoint( endpoint_url: &str, token: &str, -) -> SignalResult> { +) -> SignalResult<(Vec, Option)> { let fetch_fut = async { let client = http_client::Client::new(); let mut headers = HeaderMap::new(); @@ -149,11 +191,16 @@ pub(crate) async fn fetch_from_endpoint( if !res.status().is_success() { return Err(SignalError::Client(res.status(), res.text().await.unwrap_or_default())); } + + // Read the cache lifetime before `json()` consumes the response. + let max_age = + res.headers().get(CACHE_CONTROL).and_then(|v| v.to_str().ok()).and_then(parse_max_age); + let res = res .json::() .await .map_err(|e| SignalError::RegionError(error_with_chain(&e)))?; - Ok(res.regions.into_iter().map(|i| i.url).collect()) + Ok((res.regions.into_iter().map(|i| i.url).collect(), max_age)) }; livekit_runtime::timeout(REGION_FETCH_TIMEOUT, fetch_fut) @@ -161,6 +208,17 @@ pub(crate) async fn fetch_from_endpoint( .map_err(|_| SignalError::RegionError("region fetch timed out".into()))? } +/// Parses the `max-age` directive (in seconds) out of a `Cache-Control` header +/// value, e.g. `"max-age=300, public"` -> `Some(300s)`. Returns `None` when the +/// directive is absent or unparseable, leaving the caller on the default TTL. +fn parse_max_age(cache_control: &str) -> Option { + cache_control.split(',').find_map(|directive| { + let (name, value) = directive.split_once('=')?; + name.trim().eq_ignore_ascii_case("max-age").then_some(())?; + value.trim().parse::().ok().map(Duration::from_secs) + }) +} + fn is_cloud_url(url: &str) -> SignalResult { let url = url::Url::parse(url).map_err(|err| SignalError::UrlParse(err.to_string()))?; let host = match url.host() { @@ -352,27 +410,63 @@ mod tests { } #[test] - fn region_cache_hits_fresh_and_misses_unknown_or_stale() { - let cache = RegionCache::new(RegionCache::TTL); + fn region_cache_reports_fresh_stale_and_miss() { + let cache = RegionCache::new(RegionCache::DEFAULT_TTL); let host = "cache-fresh.livekit.cloud"; - assert!(cache.get(host).is_none(), "unknown host is a miss"); + assert!(matches!(cache.get(host), Cached::Miss), "unknown host is a miss"); let urls = vec!["wss://r1.livekit.cloud".to_string(), "wss://r2.livekit.cloud".to_string()]; - cache.insert(host.to_string(), urls.clone()); - assert_eq!(cache.get(host), Some(urls), "fresh entry is a hit"); + cache.insert(host.to_string(), urls.clone(), None); + assert!( + matches!(cache.get(host), Cached::Fresh(u) if u == urls), + "fresh entry is a fresh hit" + ); - // A stale entry (fetched older than the TTL) is treated as a miss. + // An entry older than its TTL is reported as stale (retained for fallback). let stale_host = "cache-stale.livekit.cloud"; - if let Some(past) = Instant::now().checked_sub(RegionCache::TTL * 2) { + let stale_urls = vec!["wss://old.livekit.cloud".to_string()]; + if let Some(past) = Instant::now().checked_sub(RegionCache::DEFAULT_TTL * 2) { cache.entries.lock().insert( stale_host.to_string(), - CachedRegions { urls: vec!["wss://old.livekit.cloud".into()], fetched_at: past }, + CachedRegions { + urls: stale_urls.clone(), + fetched_at: past, + ttl: RegionCache::DEFAULT_TTL, + }, + ); + assert!( + matches!(cache.get(stale_host), Cached::Stale(u) if u == stale_urls), + "expired entry is a stale hit" ); - assert!(cache.get(stale_host).is_none(), "stale entry is a miss"); } } + #[test] + fn region_cache_honors_server_max_age() { + // A short max-age expires before the (longer) default TTL would, proving + // the server's Cache-Control wins over the default. + let cache = RegionCache::new(Duration::from_secs(3600)); + let host = "max-age.livekit.cloud"; + let urls = vec!["wss://r1.livekit.cloud".to_string()]; + + cache.insert(host.to_string(), urls.clone(), Some(Duration::ZERO)); + assert!( + matches!(cache.get(host), Cached::Stale(u) if u == urls), + "max-age=0 entry is immediately stale despite the long default TTL" + ); + } + + #[test] + fn test_parse_max_age() { + assert_eq!(parse_max_age("max-age=300"), Some(Duration::from_secs(300))); + assert_eq!(parse_max_age("public, max-age=300"), Some(Duration::from_secs(300))); + assert_eq!(parse_max_age("MAX-AGE=0, no-cache"), Some(Duration::ZERO)); + assert_eq!(parse_max_age("no-store"), None); + assert_eq!(parse_max_age("max-age=notanumber"), None); + assert_eq!(parse_max_age(""), None); + } + #[test] fn test_region_endpoint() { assert_eq!( From b3bcdafe2f8e8dc1e6b2ab5ca84ed16bb370b8ad Mon Sep 17 00:00:00 2001 From: lukasIO Date: Tue, 23 Jun 2026 13:46:18 +0200 Subject: [PATCH 7/8] method for clearing and remove region when from cache on fail --- livekit-api/src/signal_client/mod.rs | 20 +++- livekit-api/src/signal_client/region.rs | 143 +++++++++++++++++++++++- 2 files changed, 155 insertions(+), 8 deletions(-) diff --git a/livekit-api/src/signal_client/mod.rs b/livekit-api/src/signal_client/mod.rs index 6c739531f..e2448d1d1 100644 --- a/livekit-api/src/signal_client/mod.rs +++ b/livekit-api/src/signal_client/mod.rs @@ -238,15 +238,25 @@ impl SignalClient { // if every region fails the caller sees why the last region // connection failed. let mut last_err = err; - for url in urls.iter() { - log::info!("fallback connection to: {}", url); - match SignalInner::connect(url, token, options.clone(), publisher_offer.clone()) - .await + for region_url in urls.iter() { + log::info!("fallback connection to: {}", region_url); + match SignalInner::connect( + region_url, + token, + options.clone(), + publisher_offer.clone(), + ) + .await { Ok((inner, join_response, stream_events)) => { return Ok(handle_success(inner, join_response, stream_events)) } - Err(region_conn_err) => last_err = region_conn_err, + Err(region_conn_err) => { + // This region is unreachable; drop it from the cache + // so the next attempt doesn't hand it out again. + RegionUrlProvider::mark_failed(url, region_url); + last_err = region_conn_err; + } } } diff --git a/livekit-api/src/signal_client/region.rs b/livekit-api/src/signal_client/region.rs index 16feac6d3..6938ae7ad 100644 --- a/livekit-api/src/signal_client/region.rs +++ b/livekit-api/src/signal_client/region.rs @@ -15,13 +15,14 @@ use std::{ collections::HashMap, error::Error as StdError, - sync::OnceLock, + sync::{Arc, OnceLock}, time::{Duration, Instant}, }; use http::header::{HeaderMap, HeaderValue, AUTHORIZATION, CACHE_CONTROL}; use parking_lot::Mutex; use serde::Deserialize; +use tokio::sync::Mutex as AsyncMutex; use crate::http_client; @@ -53,6 +54,10 @@ enum Cached { /// region fetch on every attempt. struct RegionCache { entries: Mutex>, + /// Per-host locks that serialise in-flight fetches, so concurrent cache + /// misses for the same host collapse into a single network request rather + /// than each issuing their own (single-flight). + fetch_locks: Mutex>>>, default_ttl: Duration, } @@ -67,7 +72,22 @@ impl RegionCache { } fn new(default_ttl: Duration) -> Self { - Self { entries: Mutex::new(HashMap::new()), default_ttl } + Self { + entries: Mutex::new(HashMap::new()), + fetch_locks: Mutex::new(HashMap::new()), + default_ttl, + } + } + + /// Returns the per-host fetch lock for `host`, creating it on first use. + /// Held across the network request so only one fetch per host runs at a + /// time; callers that wait on it then pick up the result from the cache. + fn fetch_lock(&self, host: &str) -> Arc> { + self.fetch_locks + .lock() + .entry(host.to_string()) + .or_insert_with(|| Arc::new(AsyncMutex::new(()))) + .clone() } /// Looks up the cached region URLs for `host`, reporting whether the entry @@ -89,6 +109,29 @@ impl RegionCache { let ttl = max_age.unwrap_or(self.default_ttl); self.entries.lock().insert(host, CachedRegions { urls, fetched_at: Instant::now(), ttl }); } + + /// Removes `failed_url` from the cached list for `host` so it is not handed + /// out again. If that empties the list, the entry is dropped entirely, + /// forcing a re-fetch on the next lookup. + fn mark_failed(&self, host: &str, failed_url: &str) { + let mut entries = self.entries.lock(); + if let Some(entry) = entries.get_mut(host) { + entry.urls.retain(|u| u != failed_url); + if entry.urls.is_empty() { + entries.remove(host); + } + } + } + + /// Drops the cached entry for `host`, forcing a re-fetch on the next lookup. + fn invalidate(&self, host: &str) { + self.entries.lock().remove(host); + } + + /// Drops every cached entry. + fn clear(&self) { + self.entries.lock().clear(); + } } fn region_host(url: &str) -> SignalResult { @@ -137,7 +180,9 @@ impl RegionUrlProvider { /// server's `Cache-Control: max-age` (or [`RegionCache::DEFAULT_TTL`] when /// absent); failures are never cached. Once an entry goes stale a re-fetch /// is attempted, but if it fails the stale entry is returned as a fallback - /// rather than surfacing the error. + /// rather than surfacing the error. Concurrent calls for the same host are + /// de-duplicated: only one fetch runs at a time and the rest reuse its + /// result. pub async fn fetch_region_urls(url: &str, token: &str) -> SignalResult> { if !is_cloud_url(url)? { return Ok(vec![]); @@ -145,12 +190,24 @@ impl RegionUrlProvider { let host = region_host(url)?; let cache = RegionCache::shared(); + + // Fast path: a fresh entry needs neither a fetch nor the fetch lock. let stale = match cache.get(&host) { Cached::Fresh(urls) => return Ok(urls), Cached::Stale(urls) => Some(urls), Cached::Miss => None, }; + // Single-flight: serialise concurrent fetches for the same host so they + // collapse into one network request. + let fetch_lock = cache.fetch_lock(&host); + let _guard = fetch_lock.lock().await; + + // Another caller may have refreshed the entry while we waited on the lock. + if let Cached::Fresh(urls) = cache.get(&host) { + return Ok(urls); + } + let endpoint = region_endpoint(url)?; match fetch_from_endpoint(&endpoint, token).await { Ok((urls, max_age)) => { @@ -168,6 +225,31 @@ impl RegionUrlProvider { }, } } + + /// Reports that `failed_url` (a region URL previously returned for `url`'s + /// host) could not be connected to, dropping it from the cache so it is not + /// handed out again. When the host's last region URL is dropped the whole + /// entry is invalidated, forcing a fresh fetch on the next attempt. + pub fn mark_failed(url: &str, failed_url: &str) { + if let Ok(host) = region_host(url) { + RegionCache::shared().mark_failed(&host, failed_url); + } + } + + /// Invalidates the cached region list for `url`'s host, forcing a fresh + /// fetch on the next [`Self::fetch_region_urls`] call. + pub fn invalidate(url: &str) { + if let Ok(host) = region_host(url) { + RegionCache::shared().invalidate(&host); + } + } + + /// Clears the entire region cache. Useful when external state that affects + /// geo routing changes (e.g. the device's network connectivity), since that + /// can invalidate every cached host at once. + pub fn clear() { + RegionCache::shared().clear(); + } } /// Fetches the region list from `endpoint_url`, returning the ordered URLs @@ -457,6 +539,61 @@ mod tests { ); } + #[test] + fn region_cache_mark_failed_prunes_then_drops() { + let cache = RegionCache::new(RegionCache::DEFAULT_TTL); + let host = "mark-failed.livekit.cloud"; + let r1 = "wss://r1.livekit.cloud".to_string(); + let r2 = "wss://r2.livekit.cloud".to_string(); + cache.insert(host.to_string(), vec![r1.clone(), r2.clone()], None); + + // Pruning one URL keeps the entry with the survivors. + cache.mark_failed(host, &r1); + assert!( + matches!(cache.get(host), Cached::Fresh(u) if u == vec![r2.clone()]), + "failed URL is pruned, the rest remain" + ); + + // Removing the last URL drops the entry entirely, forcing a re-fetch. + cache.mark_failed(host, &r2); + assert!(matches!(cache.get(host), Cached::Miss), "emptied entry is dropped"); + + // Marking an unknown host is a no-op. + cache.mark_failed("unknown.livekit.cloud", &r1); + } + + #[test] + fn region_cache_invalidate_and_clear() { + let cache = RegionCache::new(RegionCache::DEFAULT_TTL); + let a = "a.livekit.cloud"; + let b = "b.livekit.cloud"; + let urls = vec!["wss://r.livekit.cloud".to_string()]; + cache.insert(a.to_string(), urls.clone(), None); + cache.insert(b.to_string(), urls.clone(), None); + + // invalidate drops only the targeted host. + cache.invalidate(a); + assert!(matches!(cache.get(a), Cached::Miss), "invalidated host is a miss"); + assert!(matches!(cache.get(b), Cached::Fresh(_)), "other host is untouched"); + + // clear drops everything. + cache.clear(); + assert!(matches!(cache.get(b), Cached::Miss), "clear removes all entries"); + } + + #[test] + fn fetch_lock_is_shared_per_host() { + let cache = RegionCache::new(RegionCache::DEFAULT_TTL); + + // Same host hands back the same lock, so concurrent callers contend on a + // single fetch; distinct hosts get independent locks. + let a1 = cache.fetch_lock("a.livekit.cloud"); + let a2 = cache.fetch_lock("a.livekit.cloud"); + let b = cache.fetch_lock("b.livekit.cloud"); + assert!(Arc::ptr_eq(&a1, &a2), "same host shares one fetch lock"); + assert!(!Arc::ptr_eq(&a1, &b), "different hosts get distinct fetch locks"); + } + #[test] fn test_parse_max_age() { assert_eq!(parse_max_age("max-age=300"), Some(Duration::from_secs(300))); From 611b18be1dbba2212e7d184d0602ec1f55cfb534 Mon Sep 17 00:00:00 2001 From: lukasIO Date: Tue, 23 Jun 2026 13:54:47 +0200 Subject: [PATCH 8/8] update spec --- livekit/specs/signalling-reconnection.allium | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/livekit/specs/signalling-reconnection.allium b/livekit/specs/signalling-reconnection.allium index 67414ec86..373b06b75 100644 --- a/livekit/specs/signalling-reconnection.allium +++ b/livekit/specs/signalling-reconnection.allium @@ -441,7 +441,6 @@ rule ServerRequestsDisconnect { rule CloseEngine { when: CloseEngine(engine) requires: engine.status in {connected, reconnecting} - ensures: engine.reconnect_permission = revoked ensures: engine.status = closed ensures: EngineDisconnected(engine, cause: client_initiated) @guidance @@ -449,7 +448,10 @@ rule CloseEngine { -- non-terminal state — including mid-reconnect, cancelling an in-flight -- reconnect. The implementation signals close_notifier, which breaks the -- reconnect loop's backoff/attempt waits immediately rather than waiting - -- them out; permission is revoked so a late stimulus can't restart it. + -- them out; the `closed` state then stops any further reconnection (a + -- late stimulus spawns a loop that bails on the first is-closed check). + -- Permission is NOT revoked here — like exhaustion, close lands `closed` + -- without revoking (RevokedImpliesClosed asserts only revoked => closed). } -- === Engine: starting / escalating the loop =================================