Skip to content

Commit 2f7bd85

Browse files
committed
pipewire: handle commands in the main loop using pw channel
Attempt to fix #105 v2: don't use a `Mutex`, do some Arc/Weak/Drop/channel/unsave tricks instead.
1 parent 0434389 commit 2f7bd85

2 files changed

Lines changed: 167 additions & 50 deletions

File tree

src/backends/pipewire/stream.rs

Lines changed: 98 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
use crate::audio_buffer::{AudioMut, AudioRef};
44
use crate::backends::pipewire::error::PipewireError;
55
use crate::channel_map::Bitset;
6+
use crate::prelude::pipewire::utils::{BlackHole, CallbackHolder};
67
use crate::timestamp::Timestamp;
78
use crate::{
89
AudioCallbackContext, AudioInput, AudioInputCallback, AudioOutput, AudioOutputCallback,
@@ -15,12 +16,14 @@ use libspa::utils::Direction;
1516
use libspa_sys::{SPA_PARAM_EnumFormat, SPA_TYPE_OBJECT_Format};
1617
use pipewire::context::Context;
1718
use pipewire::keys;
18-
use pipewire::main_loop::{MainLoop, WeakMainLoop};
19+
use pipewire::main_loop::MainLoop;
1920
use pipewire::properties::Properties;
2021
use pipewire::stream::{Stream, StreamFlags};
22+
use std::cell::Cell;
2123
use std::collections::HashMap;
2224
use std::fmt;
2325
use std::fmt::Formatter;
26+
use std::sync::{Arc, Weak};
2427
use std::thread::JoinHandle;
2528

2629
enum StreamCommands<Callback> {
@@ -36,38 +39,10 @@ impl<Callback> fmt::Debug for StreamCommands<Callback> {
3639
}
3740

3841
struct StreamInner<Callback> {
39-
commands: rtrb::Consumer<StreamCommands<Callback>>,
4042
scratch_buffer: Box<[f32]>,
41-
callback: Option<Callback>,
43+
callback: Weak<CallbackHolder<Callback>>,
4244
config: StreamConfig,
4345
timestamp: Timestamp,
44-
loop_ref: WeakMainLoop,
45-
}
46-
47-
impl<Callback> StreamInner<Callback> {
48-
fn handle_command(&mut self, command: StreamCommands<Callback>) {
49-
log::debug!("Handling command: {command:?}");
50-
match command {
51-
StreamCommands::Eject(reply) => {
52-
if let Some(callback) = self.callback.take() {
53-
reply.send(callback).unwrap();
54-
if let Some(loop_ref) = self.loop_ref.upgrade() {
55-
loop_ref.quit();
56-
}
57-
}
58-
}
59-
}
60-
}
61-
62-
fn handle_commands(&mut self) {
63-
while let Ok(command) = self.commands.pop() {
64-
self.handle_command(command);
65-
}
66-
}
67-
68-
fn ejected(&self) -> bool {
69-
self.callback.is_none()
70-
}
7146
}
7247

7348
impl<Callback: AudioOutputCallback> StreamInner<Callback> {
@@ -77,7 +52,7 @@ impl<Callback: AudioOutputCallback> StreamInner<Callback> {
7752
channels,
7853
)
7954
.unwrap();
80-
if let Some(callback) = self.callback.as_mut() {
55+
if let Some(mut callback) = self.callback.upgrade() {
8156
let context = AudioCallbackContext {
8257
stream_config: self.config,
8358
timestamp: self.timestamp,
@@ -87,7 +62,12 @@ impl<Callback: AudioOutputCallback> StreamInner<Callback> {
8762
buffer,
8863
timestamp: self.timestamp,
8964
};
65+
66+
// SAFETY: there is max one other owner of the callback Arc, and it never dereferences
67+
// it thanks to `BlackHole`, fulfilling safety requirements of `arc_get_mut_unchecked()`.
68+
let callback = unsafe { arc_get_mut_unchecked(&mut callback) };
9069
callback.on_output_data(context, output);
70+
9171
self.timestamp += num_frames as u64;
9272
num_frames
9373
} else {
@@ -101,7 +81,7 @@ impl<Callback: AudioInputCallback> StreamInner<Callback> {
10181
let buffer =
10282
AudioRef::from_interleaved(&self.scratch_buffer[..channels * frames], channels)
10383
.unwrap();
104-
if let Some(callback) = self.callback.as_mut() {
84+
if let Some(mut callback) = self.callback.upgrade() {
10585
let context = AudioCallbackContext {
10686
stream_config: self.config,
10787
timestamp: self.timestamp,
@@ -111,7 +91,12 @@ impl<Callback: AudioInputCallback> StreamInner<Callback> {
11191
buffer,
11292
timestamp: self.timestamp,
11393
};
94+
95+
// SAFETY: there is max one other owner of the callback Arc, and it never dereferences
96+
// it thanks to `BlackHole`, fulfilling safety requirements of `arc_get_mut_unchecked()`.
97+
let callback = unsafe { arc_get_mut_unchecked(&mut callback) };
11498
callback.on_input_data(context, input);
99+
115100
self.timestamp += num_frames as u64;
116101
num_frames
117102
} else {
@@ -122,19 +107,19 @@ impl<Callback: AudioInputCallback> StreamInner<Callback> {
122107

123108
/// PipeWire stream handle.
124109
pub struct StreamHandle<Callback> {
125-
commands: rtrb::Producer<StreamCommands<Callback>>,
110+
commands: pipewire::channel::Sender<StreamCommands<Callback>>,
126111
handle: JoinHandle<Result<(), PipewireError>>,
127112
}
128113

129114
impl<Callback> AudioStreamHandle<Callback> for StreamHandle<Callback> {
130115
type Error = PipewireError;
131116

132-
fn eject(mut self) -> Result<Callback, Self::Error> {
117+
fn eject(self) -> Result<Callback, Self::Error> {
133118
log::info!("Ejecting stream");
134119
let (tx, rx) = oneshot::channel();
135120
self.commands
136-
.push(StreamCommands::Eject(tx))
137-
.expect("Command buffer overflow");
121+
.send(StreamCommands::Eject(tx))
122+
.expect("Should be able to send a message through PipeWire channel");
138123
self.handle.join().unwrap()?;
139124
Ok(rx.recv().unwrap())
140125
}
@@ -152,7 +137,10 @@ impl<Callback: 'static + Send> StreamHandle<Callback> {
152137
+ Send
153138
+ 'static,
154139
) -> Result<Self, PipewireError> {
155-
let (tx, rx) = rtrb::RingBuffer::new(16);
140+
// Create a channel for sending command into PipeWire main loop.
141+
let (pipewire_sender, pipewire_receiver) =
142+
pipewire::channel::channel::<StreamCommands<Callback>>();
143+
156144
let handle = std::thread::spawn(move || {
157145
let main_loop = MainLoop::new(None)?;
158146
let context = Context::new(&main_loop)?;
@@ -177,23 +165,27 @@ impl<Callback: 'static + Send> StreamHandle<Callback> {
177165
properties.insert(*keys::TARGET_OBJECT, device_object_serial);
178166
}
179167

168+
let (callback_holder, callback_rx) = CallbackHolder::new(callback);
169+
let callback_holder = Arc::new(callback_holder);
170+
171+
let stream_inner = StreamInner {
172+
callback: Arc::downgrade(&callback_holder),
173+
scratch_buffer: vec![0.0; MAX_FRAMES * channels].into_boxed_slice(),
174+
config,
175+
timestamp: Timestamp::new(config.samplerate),
176+
};
177+
178+
// SAFETY of StreamInner::process_input(), StreamInner::process_output() depends on us
179+
// never _dereferencing_ `callback_holder` outside of `StreamInner`. Achieve that at
180+
// type level by wrapping it in a black hole.
181+
let callback_holder = BlackHole::new(callback_holder);
182+
180183
let stream = Stream::new(&core, &name, properties)?;
181184
config.samplerate = config.samplerate.round();
182185
let _listener = stream
183-
.add_local_listener_with_user_data(StreamInner {
184-
callback: Some(callback),
185-
commands: rx,
186-
scratch_buffer: vec![0.0; MAX_FRAMES * channels].into_boxed_slice(),
187-
loop_ref: main_loop.downgrade(),
188-
config,
189-
timestamp: Timestamp::new(config.samplerate),
190-
})
186+
.add_local_listener_with_user_data(stream_inner)
191187
.process(move |stream, inner| {
192188
log::debug!("Processing stream");
193-
inner.handle_commands();
194-
if inner.ejected() {
195-
return;
196-
}
197189
if let Some(mut buffer) = stream.dequeue_buffer() {
198190
let datas = buffer.datas_mut();
199191
log::debug!("Datas: len={}", datas.len());
@@ -232,12 +224,48 @@ impl<Callback: 'static + Send> StreamHandle<Callback> {
232224
StreamFlags::AUTOCONNECT | StreamFlags::MAP_BUFFERS | StreamFlags::RT_PROCESS,
233225
&mut params,
234226
)?;
227+
228+
// Handle commands (stream ejection). Runs in the PipeWire main loop.
229+
let loop_ref = main_loop.downgrade();
230+
// pipewire::channel::receiver::attach() only accepts `Fn()` (instead of expected
231+
// `FnMut()`), so we need interior mutability. Cell is sufficient.
232+
let callback_holder = Cell::new(Some(callback_holder));
233+
let _attached_receiver = pipewire_receiver.attach(main_loop.loop_(), move |command| {
234+
log::debug!("Handling command: {command:?}");
235+
match command {
236+
StreamCommands::Eject(reply) => {
237+
// Take the callback holder our of its `Cell`, leaving `None` in place.
238+
let callback_holder = callback_holder.take();
239+
240+
if callback_holder.is_none() {
241+
// We've already ejected the callback, nothing to do.
242+
return;
243+
}
244+
245+
// Drop our reference to the Arc, which is its only persistent strong
246+
// reference. The `CallbackHolder` will go out of scope (usually right away,
247+
// but if the callback is running right now in the rt thread, then after it
248+
// releases it), and its Drop impl will send it through `callback_tx`.
249+
drop(callback_holder);
250+
251+
let callback = callback_rx.recv().expect(
252+
"channel from StreamInner to receiver in pipewire main thread should \
253+
not be closed",
254+
);
255+
reply.send(callback).unwrap();
256+
if let Some(loop_ref) = loop_ref.upgrade() {
257+
loop_ref.quit();
258+
}
259+
}
260+
}
261+
});
262+
235263
log::debug!("Starting Pipewire main loop");
236264
main_loop.run();
237265
Ok::<_, PipewireError>(())
238266
});
239267
Ok(Self {
240-
commands: tx,
268+
commands: pipewire_sender,
241269
handle,
242270
})
243271
}
@@ -342,3 +370,23 @@ const DEFAULT_EXPECTED_FRAMES: usize = 512;
342370
fn stream_buffer_size(range: (Option<usize>, Option<usize>)) -> usize {
343371
range.0.or(range.1).unwrap_or(DEFAULT_EXPECTED_FRAMES)
344372
}
373+
374+
/// Returns a mutable reference into the given `Arc`, without any check.
375+
///
376+
/// This does the same thing as unstable [`Arc::get_mut_unchecked()`], but on stable Rust.
377+
/// The documentation including Safety prerequisites are copied from Rust stdlib.
378+
/// This helper can be removed once `get_mut_unchecked()` is stabilized and hits our MSRV.
379+
///
380+
/// Unsafe variant of [`Arc::get_mut()`], which is safe and does appropriate checks.
381+
///
382+
/// # Safety
383+
///
384+
/// If any other `Arc` or [`Weak`] pointers to the same allocation exist, then
385+
/// they must not be dereferenced or have active borrows for the duration
386+
/// of the returned borrow, and their inner type must be exactly the same as the
387+
/// inner type of this Rc (including lifetimes). This is trivially the case if no
388+
/// such pointers exist, for example immediately after `Arc::new`.
389+
unsafe fn arc_get_mut_unchecked<T>(arc: &mut Arc<T>) -> &mut T {
390+
let raw_pointer = Arc::as_ptr(arc) as *mut T;
391+
unsafe { &mut *raw_pointer }
392+
}

src/backends/pipewire/utils.rs

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@ use pipewire::context::Context;
55
use pipewire::main_loop::MainLoop;
66
use pipewire::registry::GlobalObject;
77
use std::cell::{Cell, RefCell};
8+
use std::ops::{Deref, DerefMut};
89
use std::rc::Rc;
10+
use std::sync::mpsc;
911

1012
fn get_device_type(object: &GlobalObject<&DictRef>) -> Option<DeviceType> {
1113
fn is_input(media_class: &str) -> bool {
@@ -89,3 +91,70 @@ pub fn get_devices() -> Result<Vec<(u32, DeviceType, String)>, PipewireError> {
8991
drop(_listener_reg);
9092
Ok(Rc::into_inner(data).unwrap().into_inner())
9193
}
94+
95+
/// A little helper that holds user's callback and sends it out using a channel when it goes out of
96+
/// scope. Dereferences to `Callback`, including mutably.
97+
pub(super) struct CallbackHolder<Callback> {
98+
/// Invariant: `callback` is always `Some`, except in the second half of the [`Drop`] impl.
99+
callback: Option<Callback>,
100+
tx: mpsc::SyncSender<Callback>,
101+
}
102+
103+
impl<Callback> CallbackHolder<Callback> {
104+
/// Returns a pair (self, rx), where `rx` should be used to fetch the callback when the holder
105+
/// goes out of scope.
106+
pub(super) fn new(callback: Callback) -> (Self, mpsc::Receiver<Callback>) {
107+
// Our first choice would be and `rtrb` channel, but that doesn't allow receiver to wait
108+
// for a message, which we need. It doesn't matter, we use a channel of capacity 1 and
109+
// we only use it exactly once, it never blocks in this case.
110+
let (tx, rx) = mpsc::sync_channel(1);
111+
let myself = Self {
112+
callback: Some(callback),
113+
tx,
114+
};
115+
(myself, rx)
116+
}
117+
}
118+
119+
impl<Callback> Deref for CallbackHolder<Callback> {
120+
type Target = Callback;
121+
122+
fn deref(&self) -> &Callback {
123+
self.callback
124+
.as_ref()
125+
.expect("never None outside destructor")
126+
}
127+
}
128+
129+
impl<Callback> DerefMut for CallbackHolder<Callback> {
130+
fn deref_mut(&mut self) -> &mut Callback {
131+
self.callback
132+
.as_mut()
133+
.expect("never None outside destructor")
134+
}
135+
}
136+
137+
impl<Callback> Drop for CallbackHolder<Callback> {
138+
fn drop(&mut self) {
139+
let callback = self.callback.take().expect("never None outside destructor");
140+
match self.tx.try_send(callback) {
141+
Ok(()) => (),
142+
Err(mpsc::TrySendError::Full(_)) => {
143+
panic!("The channel in CallbackHolder should be never full")
144+
}
145+
Err(mpsc::TrySendError::Disconnected(_)) => log::warn!(
146+
"Channel in CallbackHolder is disconnected, did PipeWire main loop already exit?"
147+
),
148+
}
149+
}
150+
}
151+
152+
/// Allows you to send to value to a black hole. It keeps at alive as long as it is in scope, but
153+
/// you cannot get the value back in any way.
154+
pub struct BlackHole<T>(T);
155+
156+
impl<T> BlackHole<T> {
157+
pub fn new(wrapped: T) -> Self {
158+
Self(wrapped)
159+
}
160+
}

0 commit comments

Comments
 (0)