Skip to content

Commit a581dbf

Browse files
authored
PipeWire: fix stopped stream ejection (#115)
## Description Fixes #105, at least for me. To be reviewed by commits: - first commit adds a new example `eject_stream_pipewire.rs`, which hangs at this point - this would be even better as an integration test, but it requires running pipewire, output device, the `pw-link` command - the second commit is a small cleanup/refactor to make the following change easier - the third commit is the actual bugfix - instead of handling `StreamCommand`s in the `process()` callback (which runs in the RT thread), we now handle them in the PipeWire main loop. This also prompted a transition from `rtrb` to `pipewire::channel`; this makes their handling independent - a `Mutex` is introduced in the audio callback **but we never wait on it** in the audio process() callback. I tried hard to avoid it, but I believe it is necessary (short of unsafe code), as we need `&mut` access to the `Option<Callback>` in two separate threads. ## Type of Change Please delete options that are not relevant. - [x] Bug fix (non-breaking change which fixes an issue) - [x] Code cleanup or refactor ## How Has This Been Tested? - [x] Tested by introducing the `eject_stream_pipewire.rs` example. - [ ] Test in more realistic setting that is works at least as good as tonarino#5 @mbernat maybe? 🙏 ## Checklist: - [x] My code follows the style guidelines of this project - ~I have made corresponding changes to the documentation~ - [x] My changes generate no new warnings - [x] Wherever possible, I have added tests that prove my fix is effective or that my feature works. For changes that need to be validated manually (i.e. a new audio driver), use examples that can be run to easily validate them. - [x] New and existing unit tests pass locally with my changes - [x] I have checked my code and corrected any misspellings CC @mbernat. Supersedes tonarino#5.
2 parents 84e5f5d + 2f7bd85 commit a581dbf

4 files changed

Lines changed: 249 additions & 58 deletions

File tree

Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,11 @@ windows = { version = "0.61.3", features = [
5454
"Win32_UI_Shell_PropertiesSystem",
5555
] }
5656

57+
[[example]]
58+
name = "eject_stream_pipewire"
59+
path = "examples/eject_stream_pipewire.rs"
60+
required-features = ["pipewire"]
61+
5762
[[example]]
5863
name = "enumerate_alsa"
5964
path = "examples/enumerate_alsa.rs"

examples/eject_stream_pipewire.rs

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
//! This example showcases the problem with ejecting stopped pipewire streams described in
2+
//! https://github.com/SolarLiner/interflow/issues/105
3+
//!
4+
//! It would be best as an integration test, but it has nontrivial prerequisites on the environment:
5+
//! - running PipeWire daemon
6+
//! - at least one PipeWire audio output device
7+
//! - the `pw-link` program installed (bundled with pipewire)
8+
9+
use interflow::prelude::*;
10+
use std::ops::Deref;
11+
use std::thread;
12+
use util::sine::SineWave;
13+
14+
mod util;
15+
16+
#[cfg(all(os_pipewire, feature = "pipewire"))]
17+
fn main() -> Result<(), Box<dyn std::error::Error>> {
18+
use interflow::prelude::pipewire::driver::PipewireDriver;
19+
use std::{process, time::Duration};
20+
21+
env_logger::init();
22+
23+
let driver = PipewireDriver::new()?;
24+
25+
// Select the highest-priority output device. Use this rather than `driver.default_device()`
26+
// because we need its real name for disconnecting it below.
27+
let devices = driver.list_devices()?;
28+
let mut device = devices
29+
.into_iter()
30+
.filter(|d| d.device_type().is_output())
31+
.max_by_key(device_session_priority)
32+
.expect("No output PipeWire devices?");
33+
println!("Using device {}", device.name());
34+
35+
let config = device.default_output_config()?;
36+
device.with_stream_name("Interflow eject test 1");
37+
let stream_1 = device.create_output_stream(config, SineWave::new(440.0))?;
38+
39+
println!("Playing sine wave for 1 second, then ejecting");
40+
thread::sleep(Duration::from_secs(1));
41+
let callback = stream_1.eject().unwrap();
42+
43+
println!("Playing sine wave for another second in a new stream but old callback");
44+
let stream_2 = device.create_output_stream(config, callback)?;
45+
thread::sleep(Duration::from_secs(1));
46+
47+
// Disconnect our node from the device node. Call external program, doing this programmatically
48+
// using pipewire-rs would be much more involved.
49+
let mut command = process::Command::new("pw-link");
50+
command
51+
.arg("--disconnect")
52+
.arg("eject_stream_pipewire")
53+
.arg(device.name().deref());
54+
println!("Disconnecting playback pipewire node from its device using {command:?}");
55+
let status = command.status()?;
56+
assert!(status.success());
57+
58+
println!("Ejecting the callback from the new stream");
59+
// The hang occurred right in this call
60+
stream_2.eject()?;
61+
62+
println!("Exiting cleanly");
63+
Ok(())
64+
}
65+
66+
#[cfg(all(os_pipewire, feature = "pipewire"))]
67+
fn device_session_priority(device: &pipewire::device::PipewireDevice) -> Option<i32> {
68+
let properties = device
69+
.properties()
70+
.expect("Cannot get pipewire device properties")?;
71+
72+
let priority_property = properties.get("priority.session")?;
73+
let priority = priority_property
74+
.parse()
75+
.expect("Cannot parse priority.session as i32");
76+
Some(priority)
77+
}

src/backends/pipewire/stream.rs

Lines changed: 98 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
use crate::audio_buffer::{AudioMut, AudioRef};
44
use crate::backends::pipewire::error::PipewireError;
55
use crate::channel_map::Bitset;
6+
use crate::prelude::pipewire::utils::{BlackHole, CallbackHolder};
67
use crate::timestamp::Timestamp;
78
use crate::{
89
AudioCallbackContext, AudioInput, AudioInputCallback, AudioOutput, AudioOutputCallback,
@@ -15,65 +16,33 @@ use libspa::utils::Direction;
1516
use libspa_sys::{SPA_PARAM_EnumFormat, SPA_TYPE_OBJECT_Format};
1617
use pipewire::context::Context;
1718
use pipewire::keys;
18-
use pipewire::main_loop::{MainLoop, WeakMainLoop};
19+
use pipewire::main_loop::MainLoop;
1920
use pipewire::properties::Properties;
2021
use pipewire::stream::{Stream, StreamFlags};
22+
use std::cell::Cell;
2123
use std::collections::HashMap;
2224
use std::fmt;
2325
use std::fmt::Formatter;
26+
use std::sync::{Arc, Weak};
2427
use std::thread::JoinHandle;
2528

2629
enum StreamCommands<Callback> {
27-
ReceiveCallback(Callback),
2830
Eject(oneshot::Sender<Callback>),
2931
}
3032

3133
impl<Callback> fmt::Debug for StreamCommands<Callback> {
3234
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
3335
match self {
34-
Self::ReceiveCallback(_) => write!(f, "ReceiveCallback"),
3536
Self::Eject(_) => write!(f, "Eject"),
3637
}
3738
}
3839
}
3940

4041
struct StreamInner<Callback> {
41-
commands: rtrb::Consumer<StreamCommands<Callback>>,
4242
scratch_buffer: Box<[f32]>,
43-
callback: Option<Callback>,
43+
callback: Weak<CallbackHolder<Callback>>,
4444
config: StreamConfig,
4545
timestamp: Timestamp,
46-
loop_ref: WeakMainLoop,
47-
}
48-
49-
impl<Callback> StreamInner<Callback> {
50-
fn handle_command(&mut self, command: StreamCommands<Callback>) {
51-
log::debug!("Handling command: {command:?}");
52-
match command {
53-
StreamCommands::ReceiveCallback(callback) => {
54-
debug_assert!(self.callback.is_none());
55-
self.callback = Some(callback);
56-
}
57-
StreamCommands::Eject(reply) => {
58-
if let Some(callback) = self.callback.take() {
59-
reply.send(callback).unwrap();
60-
if let Some(loop_ref) = self.loop_ref.upgrade() {
61-
loop_ref.quit();
62-
}
63-
}
64-
}
65-
}
66-
}
67-
68-
fn handle_commands(&mut self) {
69-
while let Ok(command) = self.commands.pop() {
70-
self.handle_command(command);
71-
}
72-
}
73-
74-
fn ejected(&self) -> bool {
75-
self.callback.is_none()
76-
}
7746
}
7847

7948
impl<Callback: AudioOutputCallback> StreamInner<Callback> {
@@ -83,7 +52,7 @@ impl<Callback: AudioOutputCallback> StreamInner<Callback> {
8352
channels,
8453
)
8554
.unwrap();
86-
if let Some(callback) = self.callback.as_mut() {
55+
if let Some(mut callback) = self.callback.upgrade() {
8756
let context = AudioCallbackContext {
8857
stream_config: self.config,
8958
timestamp: self.timestamp,
@@ -93,7 +62,12 @@ impl<Callback: AudioOutputCallback> StreamInner<Callback> {
9362
buffer,
9463
timestamp: self.timestamp,
9564
};
65+
66+
// SAFETY: there is max one other owner of the callback Arc, and it never dereferences
67+
// it thanks to `BlackHole`, fulfilling safety requirements of `arc_get_mut_unchecked()`.
68+
let callback = unsafe { arc_get_mut_unchecked(&mut callback) };
9669
callback.on_output_data(context, output);
70+
9771
self.timestamp += num_frames as u64;
9872
num_frames
9973
} else {
@@ -107,7 +81,7 @@ impl<Callback: AudioInputCallback> StreamInner<Callback> {
10781
let buffer =
10882
AudioRef::from_interleaved(&self.scratch_buffer[..channels * frames], channels)
10983
.unwrap();
110-
if let Some(callback) = self.callback.as_mut() {
84+
if let Some(mut callback) = self.callback.upgrade() {
11185
let context = AudioCallbackContext {
11286
stream_config: self.config,
11387
timestamp: self.timestamp,
@@ -117,7 +91,12 @@ impl<Callback: AudioInputCallback> StreamInner<Callback> {
11791
buffer,
11892
timestamp: self.timestamp,
11993
};
94+
95+
// SAFETY: there is max one other owner of the callback Arc, and it never dereferences
96+
// it thanks to `BlackHole`, fulfilling safety requirements of `arc_get_mut_unchecked()`.
97+
let callback = unsafe { arc_get_mut_unchecked(&mut callback) };
12098
callback.on_input_data(context, input);
99+
121100
self.timestamp += num_frames as u64;
122101
num_frames
123102
} else {
@@ -128,19 +107,19 @@ impl<Callback: AudioInputCallback> StreamInner<Callback> {
128107

129108
/// PipeWire stream handle.
130109
pub struct StreamHandle<Callback> {
131-
commands: rtrb::Producer<StreamCommands<Callback>>,
110+
commands: pipewire::channel::Sender<StreamCommands<Callback>>,
132111
handle: JoinHandle<Result<(), PipewireError>>,
133112
}
134113

135114
impl<Callback> AudioStreamHandle<Callback> for StreamHandle<Callback> {
136115
type Error = PipewireError;
137116

138-
fn eject(mut self) -> Result<Callback, Self::Error> {
117+
fn eject(self) -> Result<Callback, Self::Error> {
139118
log::info!("Ejecting stream");
140119
let (tx, rx) = oneshot::channel();
141120
self.commands
142-
.push(StreamCommands::Eject(tx))
143-
.expect("Command buffer overflow");
121+
.send(StreamCommands::Eject(tx))
122+
.expect("Should be able to send a message through PipeWire channel");
144123
self.handle.join().unwrap()?;
145124
Ok(rx.recv().unwrap())
146125
}
@@ -158,7 +137,10 @@ impl<Callback: 'static + Send> StreamHandle<Callback> {
158137
+ Send
159138
+ 'static,
160139
) -> Result<Self, PipewireError> {
161-
let (mut tx, rx) = rtrb::RingBuffer::new(16);
140+
// Create a channel for sending command into PipeWire main loop.
141+
let (pipewire_sender, pipewire_receiver) =
142+
pipewire::channel::channel::<StreamCommands<Callback>>();
143+
162144
let handle = std::thread::spawn(move || {
163145
let main_loop = MainLoop::new(None)?;
164146
let context = Context::new(&main_loop)?;
@@ -183,23 +165,27 @@ impl<Callback: 'static + Send> StreamHandle<Callback> {
183165
properties.insert(*keys::TARGET_OBJECT, device_object_serial);
184166
}
185167

168+
let (callback_holder, callback_rx) = CallbackHolder::new(callback);
169+
let callback_holder = Arc::new(callback_holder);
170+
171+
let stream_inner = StreamInner {
172+
callback: Arc::downgrade(&callback_holder),
173+
scratch_buffer: vec![0.0; MAX_FRAMES * channels].into_boxed_slice(),
174+
config,
175+
timestamp: Timestamp::new(config.samplerate),
176+
};
177+
178+
// SAFETY of StreamInner::process_input(), StreamInner::process_output() depends on us
179+
// never _dereferencing_ `callback_holder` outside of `StreamInner`. Achieve that at
180+
// type level by wrapping it in a black hole.
181+
let callback_holder = BlackHole::new(callback_holder);
182+
186183
let stream = Stream::new(&core, &name, properties)?;
187184
config.samplerate = config.samplerate.round();
188185
let _listener = stream
189-
.add_local_listener_with_user_data(StreamInner {
190-
callback: None,
191-
commands: rx,
192-
scratch_buffer: vec![0.0; MAX_FRAMES * channels].into_boxed_slice(),
193-
loop_ref: main_loop.downgrade(),
194-
config,
195-
timestamp: Timestamp::new(config.samplerate),
196-
})
186+
.add_local_listener_with_user_data(stream_inner)
197187
.process(move |stream, inner| {
198188
log::debug!("Processing stream");
199-
inner.handle_commands();
200-
if inner.ejected() {
201-
return;
202-
}
203189
if let Some(mut buffer) = stream.dequeue_buffer() {
204190
let datas = buffer.datas_mut();
205191
log::debug!("Datas: len={}", datas.len());
@@ -238,14 +224,48 @@ impl<Callback: 'static + Send> StreamHandle<Callback> {
238224
StreamFlags::AUTOCONNECT | StreamFlags::MAP_BUFFERS | StreamFlags::RT_PROCESS,
239225
&mut params,
240226
)?;
227+
228+
// Handle commands (stream ejection). Runs in the PipeWire main loop.
229+
let loop_ref = main_loop.downgrade();
230+
// pipewire::channel::receiver::attach() only accepts `Fn()` (instead of expected
231+
// `FnMut()`), so we need interior mutability. Cell is sufficient.
232+
let callback_holder = Cell::new(Some(callback_holder));
233+
let _attached_receiver = pipewire_receiver.attach(main_loop.loop_(), move |command| {
234+
log::debug!("Handling command: {command:?}");
235+
match command {
236+
StreamCommands::Eject(reply) => {
237+
// Take the callback holder our of its `Cell`, leaving `None` in place.
238+
let callback_holder = callback_holder.take();
239+
240+
if callback_holder.is_none() {
241+
// We've already ejected the callback, nothing to do.
242+
return;
243+
}
244+
245+
// Drop our reference to the Arc, which is its only persistent strong
246+
// reference. The `CallbackHolder` will go out of scope (usually right away,
247+
// but if the callback is running right now in the rt thread, then after it
248+
// releases it), and its Drop impl will send it through `callback_tx`.
249+
drop(callback_holder);
250+
251+
let callback = callback_rx.recv().expect(
252+
"channel from StreamInner to receiver in pipewire main thread should \
253+
not be closed",
254+
);
255+
reply.send(callback).unwrap();
256+
if let Some(loop_ref) = loop_ref.upgrade() {
257+
loop_ref.quit();
258+
}
259+
}
260+
}
261+
});
262+
241263
log::debug!("Starting Pipewire main loop");
242264
main_loop.run();
243265
Ok::<_, PipewireError>(())
244266
});
245-
log::debug!("Sending callback to stream");
246-
tx.push(StreamCommands::ReceiveCallback(callback)).unwrap();
247267
Ok(Self {
248-
commands: tx,
268+
commands: pipewire_sender,
249269
handle,
250270
})
251271
}
@@ -350,3 +370,23 @@ const DEFAULT_EXPECTED_FRAMES: usize = 512;
350370
fn stream_buffer_size(range: (Option<usize>, Option<usize>)) -> usize {
351371
range.0.or(range.1).unwrap_or(DEFAULT_EXPECTED_FRAMES)
352372
}
373+
374+
/// Returns a mutable reference into the given `Arc`, without any check.
375+
///
376+
/// This does the same thing as unstable [`Arc::get_mut_unchecked()`], but on stable Rust.
377+
/// The documentation including Safety prerequisites are copied from Rust stdlib.
378+
/// This helper can be removed once `get_mut_unchecked()` is stabilized and hits our MSRV.
379+
///
380+
/// Unsafe variant of [`Arc::get_mut()`], which is safe and does appropriate checks.
381+
///
382+
/// # Safety
383+
///
384+
/// If any other `Arc` or [`Weak`] pointers to the same allocation exist, then
385+
/// they must not be dereferenced or have active borrows for the duration
386+
/// of the returned borrow, and their inner type must be exactly the same as the
387+
/// inner type of this Rc (including lifetimes). This is trivially the case if no
388+
/// such pointers exist, for example immediately after `Arc::new`.
389+
unsafe fn arc_get_mut_unchecked<T>(arc: &mut Arc<T>) -> &mut T {
390+
let raw_pointer = Arc::as_ptr(arc) as *mut T;
391+
unsafe { &mut *raw_pointer }
392+
}

0 commit comments

Comments
 (0)