Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/bound-audio-filter-init-timeout.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
livekit-ffi: patch
---

Add timeout for audio filter plugin initialization

33 changes: 33 additions & 0 deletions livekit-ffi/src/server/audio_plugin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@
use std::{
pin::Pin,
task::{Context, Poll},
time::Duration,
};

use super::FfiServer;
use futures_util::Stream;
use livekit::{
registered_audio_filter_plugins,
webrtc::{audio_stream::native::NativeAudioStream, prelude::AudioFrame},
AudioFilterAudioStream,
};
Expand All @@ -42,3 +45,33 @@ impl Stream for AudioStreamKind {
}
}
}

const AUDIO_FILTER_INIT_TIMEOUT: Duration = Duration::from_secs(5);

/// Initializes all registered audio-filter plugins for a freshly connected room.
///
/// Best-effort: on timeout or failure the room stays connected with the filter
/// disabled. The [`tokio::task::spawn_blocking`] task cannot be cancelled, so on
/// timeout it is detached and finishes in the background.
///
pub async fn initialize_audio_filters(server: &'static FfiServer, url: String, token: String) {
let started = std::time::Instant::now();
let init = server.async_runtime.spawn_blocking(move || {
for filter in registered_audio_filter_plugins().into_iter() {
filter.on_load(&url, &token).map_err(|e| e.to_string())?;
}
Ok::<(), String>(())
});

let outcome = match tokio::time::timeout(AUDIO_FILTER_INIT_TIMEOUT, init).await {
Ok(join_result) => join_result.map_err(|e| e.to_string()).and_then(|r| r),
Err(_) => Err(format!("timed out after {:?}", AUDIO_FILTER_INIT_TIMEOUT)),
};

if let Err(e) = outcome {
log::debug!("error while initializing audio filter after {:?}: {}", started.elapsed(), e);
log::error!(
"audio filter cannot be enabled: ensure you are connecting to LiveKit Cloud and that the filter is properly configured"
);
}
}
28 changes: 5 additions & 23 deletions livekit-ffi/src/server/room.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::collections::HashMap;
use std::time::Duration;
use std::{collections::HashSet, slice, sync::Arc};

use livekit::{prelude::*, registered_audio_filter_plugins};
use livekit::prelude::*;
use livekit::{ChatMessage, StreamReader};
use livekit_protocol as lk_proto;
use parking_lot::Mutex;
Expand Down Expand Up @@ -148,28 +148,10 @@ impl FfiRoom {
let connect = async move {
match Room::connect(&connect.url, &connect.token, options.clone()).await {
Ok((room, mut events)) => {
// initialize audio filters
let result = server
.async_runtime
.spawn_blocking(move || {
for filter in registered_audio_filter_plugins().into_iter() {
filter.on_load(&req.url, &req.token).map_err(|e| e.to_string())?;
}
Ok::<(), String>(())
})
.await
.map_err(|e| e.to_string());
match result {
Err(e) | Ok(Err(e)) => {
log::warn!("error while initializing audio filter: {}", e);
log::error!(
"audio filter cannot be enabled: ensure you are connecting to LiveKit Cloud and that the filter is properly configured"
);
// Skip returning an error here to keep the rtc session alive
// But in this case, the filter isn't enabled in the session.
}
Ok(Ok(_)) => (),
};
crate::server::audio_plugin::initialize_audio_filters(
server, req.url, req.token,
)
.await;

// Successfully connected to the room
// Forward the initial state for the FfiClient
Expand Down
Loading