diff --git a/.changeset/bound-audio-filter-init-timeout.md b/.changeset/bound-audio-filter-init-timeout.md new file mode 100644 index 000000000..8b8c39334 --- /dev/null +++ b/.changeset/bound-audio-filter-init-timeout.md @@ -0,0 +1,6 @@ +--- +livekit-ffi: patch +--- + +Add timeout for audio filter plugin initialization + diff --git a/livekit-ffi/src/server/audio_plugin.rs b/livekit-ffi/src/server/audio_plugin.rs index f448a0794..e7da928db 100644 --- a/livekit-ffi/src/server/audio_plugin.rs +++ b/livekit-ffi/src/server/audio_plugin.rs @@ -15,10 +15,13 @@ use std::{ pin::Pin, task::{Context, Poll}, + time::Duration, }; +use super::FfiServer; use futures_util::Stream; use livekit::{ + registered_audio_filter_plugins, webrtc::{audio_stream::native::NativeAudioStream, prelude::AudioFrame}, AudioFilterAudioStream, }; @@ -42,3 +45,33 @@ impl Stream for AudioStreamKind { } } } + +const AUDIO_FILTER_INIT_TIMEOUT: Duration = Duration::from_secs(5); + +/// Initializes all registered audio-filter plugins for a freshly connected room. +/// +/// Best-effort: on timeout or failure the room stays connected with the filter +/// disabled. The [`tokio::task::spawn_blocking`] task cannot be cancelled, so on +/// timeout it is detached and finishes in the background. +/// +pub async fn initialize_audio_filters(server: &'static FfiServer, url: String, token: String) { + let started = std::time::Instant::now(); + let init = server.async_runtime.spawn_blocking(move || { + for filter in registered_audio_filter_plugins().into_iter() { + filter.on_load(&url, &token).map_err(|e| e.to_string())?; + } + Ok::<(), String>(()) + }); + + let outcome = match tokio::time::timeout(AUDIO_FILTER_INIT_TIMEOUT, init).await { + Ok(join_result) => join_result.map_err(|e| e.to_string()).and_then(|r| r), + Err(_) => Err(format!("timed out after {:?}", AUDIO_FILTER_INIT_TIMEOUT)), + }; + + if let Err(e) = outcome { + log::debug!("error while initializing audio filter after {:?}: {}", started.elapsed(), e); + log::error!( + "audio filter cannot be enabled: ensure you are connecting to LiveKit Cloud and that the filter is properly configured" + ); + } +} diff --git a/livekit-ffi/src/server/room.rs b/livekit-ffi/src/server/room.rs index 7e321a26a..d26c10fb1 100644 --- a/livekit-ffi/src/server/room.rs +++ b/livekit-ffi/src/server/room.rs @@ -16,7 +16,7 @@ use std::collections::HashMap; use std::time::Duration; use std::{collections::HashSet, slice, sync::Arc}; -use livekit::{prelude::*, registered_audio_filter_plugins}; +use livekit::prelude::*; use livekit::{ChatMessage, StreamReader}; use livekit_protocol as lk_proto; use parking_lot::Mutex; @@ -148,28 +148,10 @@ impl FfiRoom { let connect = async move { match Room::connect(&connect.url, &connect.token, options.clone()).await { Ok((room, mut events)) => { - // initialize audio filters - let result = server - .async_runtime - .spawn_blocking(move || { - for filter in registered_audio_filter_plugins().into_iter() { - filter.on_load(&req.url, &req.token).map_err(|e| e.to_string())?; - } - Ok::<(), String>(()) - }) - .await - .map_err(|e| e.to_string()); - match result { - Err(e) | Ok(Err(e)) => { - log::warn!("error while initializing audio filter: {}", e); - log::error!( - "audio filter cannot be enabled: ensure you are connecting to LiveKit Cloud and that the filter is properly configured" - ); - // Skip returning an error here to keep the rtc session alive - // But in this case, the filter isn't enabled in the session. - } - Ok(Ok(_)) => (), - }; + crate::server::audio_plugin::initialize_audio_filters( + server, req.url, req.token, + ) + .await; // Successfully connected to the room // Forward the initial state for the FfiClient