diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 5c426f1..90ae039 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -31,9 +31,34 @@ jobs: submodules: recursive - name: Install Dependencies (Linux) - run: sudo apt-get update && sudo apt-get install libdbus-1-dev + run: sudo apt-get update && sudo apt-get install libdbus-1-dev libpulse-dev pulseaudio if: matrix.os == 'ubuntu-24.04' + - name: Start Sound Server (Linux) + run: pulseaudio -D --start --exit-idle-time=-1 + if: matrix.os == 'ubuntu-24.04' + + - name: Install virtual audio devices (Windows) + run: git clone https://github.com/LABSN/sound-ci-helpers && powershell sound-ci-helpers/windows/setup_sound.ps1 + if: matrix.os == 'windows-2025' + + - name: Allow microphone access to all apps (Windows) + shell: pwsh + run: | + New-Item -Path "HKLM:\SOFTWARE\Policies\Microsoft\Windows\AppPrivacy\" + New-ItemProperty -Path "HKLM:\SOFTWARE\policies\microsoft\windows\appprivacy" -Name "LetAppsAccessMicrophone" -Value "0x00000001" -PropertyType "dword" + if: matrix.os == 'windows-2025' + + - name: Install virtual audio devices (macOS) + if: matrix.os == 'macos-14' + run: | + brew install switchaudio-osx + brew install blackhole-2ch + sudo kill -9 `pgrep coreaudiod` + sleep 10 + SwitchAudioSource -s "BlackHole 2ch" -t input + SwitchAudioSource -s "BlackHole 2ch" -t output + - name: Install Rust run: | rustup toolchain install ${{ matrix.rust }} --profile minimal diff --git a/audioipc/src/messages.rs b/audioipc/src/messages.rs index b7ee206..fb97e0f 100644 --- a/audioipc/src/messages.rs +++ b/audioipc/src/messages.rs @@ -219,7 +219,7 @@ pub struct RegisterDeviceCollectionChanged { // ServerConn::process_msg doesn't have a catch-all case. #[derive(Debug, Serialize, Deserialize)] pub enum ServerMessage { - ClientConnect(u32), + ClientConnect, ClientDisconnect, ContextGetBackendId, diff --git a/client/src/context.rs b/client/src/context.rs index 102f25b..7025ee2 100644 --- a/client/src/context.rs +++ b/client/src/context.rs @@ -185,7 +185,7 @@ impl ContextOps for ClientContext { // Don't let errors bubble from here. Later calls against this context // will return errors the caller expects to handle. - let _ = send_recv!(rpc, ClientConnect(std::process::id()) => ClientConnected); + let _ = send_recv!(rpc, ClientConnect => ClientConnected); let backend_id = send_recv!(rpc, ContextGetBackendId => ContextBackendId()) .unwrap_or_else(|_| "(remote error)".to_string()); diff --git a/ipctest/Cargo.toml b/ipctest/Cargo.toml index d80337a..1a905c5 100644 --- a/ipctest/Cargo.toml +++ b/ipctest/Cargo.toml @@ -5,6 +5,10 @@ authors = ["Dan Glastonbury "] license = "ISC" edition = "2018" +[lib] +name = "ipctest" +path = "src/lib.rs" + [dependencies] audioipc = { package = "audioipc2", path = "../audioipc" } audioipc-client = { package = "audioipc2-client", path = "../client" } diff --git a/ipctest/src/lib.rs b/ipctest/src/lib.rs new file mode 100644 index 0000000..78dd30b --- /dev/null +++ b/ipctest/src/lib.rs @@ -0,0 +1,128 @@ +// Copyright © 2026 Mozilla Foundation +// +// This program is made available under an ISC-style license. See the +// accompanying file LICENSE for details. + +pub mod client; + +use std::os::raw::c_void; + +use audioipc::PlatformHandleType; + +/// RAII wrapper around the audioipc server lifecycle. +pub struct TestServer { + handle: *mut c_void, +} + +impl Default for TestServer { + fn default() -> Self { + Self::new() + } +} + +impl TestServer { + pub fn new() -> Self { + let init_params = audioipc_server::AudioIpcServerInitParams { + thread_create_callback: None, + thread_destroy_callback: None, + }; + let handle = unsafe { + audioipc_server::audioipc2_server_start( + std::ptr::null(), + std::ptr::null(), + &init_params, + ) + }; + assert!(!handle.is_null(), "audioipc2_server_start failed"); + TestServer { handle } + } + + pub fn new_client(&self, remote_pid: u32) -> PlatformHandleType { + let fd = audioipc_server::audioipc2_server_new_client(self.handle, remote_pid, 0); + assert!( + fd != audioipc::INVALID_HANDLE_VALUE, + "audioipc2_server_new_client failed" + ); + fd + } +} + +impl Drop for TestServer { + fn drop(&mut self) { + audioipc_server::audioipc2_server_stop(self.handle); + } +} + +#[cfg(unix)] +pub fn send_fd(socket: i32, fd: i32) { + unsafe { + let iov = libc::iovec { + iov_base: &[0u8; 1] as *const _ as *mut _, + iov_len: 1, + }; + + let cmsg_space = libc::CMSG_SPACE(std::mem::size_of::() as u32) as usize; + let mut cmsg_buf = vec![0u8; cmsg_space]; + + let mut msg: libc::msghdr = std::mem::zeroed(); + msg.msg_iov = &iov as *const _ as *mut _; + msg.msg_iovlen = 1; + msg.msg_control = cmsg_buf.as_mut_ptr() as *mut _; + msg.msg_controllen = cmsg_space as _; + + let cmsg = libc::CMSG_FIRSTHDR(&msg); + (*cmsg).cmsg_level = libc::SOL_SOCKET; + (*cmsg).cmsg_type = libc::SCM_RIGHTS; + (*cmsg).cmsg_len = libc::CMSG_LEN(std::mem::size_of::() as u32) as _; + std::ptr::copy_nonoverlapping( + &fd as *const _ as *const u8, + libc::CMSG_DATA(cmsg), + std::mem::size_of::(), + ); + + let result = libc::sendmsg(socket, &msg, 0); + assert!( + result >= 0, + "sendmsg failed: {}", + std::io::Error::last_os_error() + ); + } +} + +#[cfg(unix)] +pub fn recv_fd(socket: i32) -> i32 { + unsafe { + let mut buf = [0u8; 1]; + let iov = libc::iovec { + iov_base: buf.as_mut_ptr() as *mut _, + iov_len: 1, + }; + + let cmsg_space = libc::CMSG_SPACE(std::mem::size_of::() as u32) as usize; + let mut cmsg_buf = vec![0u8; cmsg_space]; + + let mut msg: libc::msghdr = std::mem::zeroed(); + msg.msg_iov = &iov as *const _ as *mut _; + msg.msg_iovlen = 1; + msg.msg_control = cmsg_buf.as_mut_ptr() as *mut _; + msg.msg_controllen = cmsg_space as _; + + let result = libc::recvmsg(socket, &mut msg, 0); + assert!( + result >= 0, + "recvmsg failed: {}", + std::io::Error::last_os_error() + ); + + let cmsg = libc::CMSG_FIRSTHDR(&msg); + assert!(!cmsg.is_null()); + + let mut fd: i32 = -1; + std::ptr::copy_nonoverlapping( + libc::CMSG_DATA(cmsg), + &mut fd as *mut _ as *mut u8, + std::mem::size_of::(), + ); + fd + } +} diff --git a/ipctest/src/main.rs b/ipctest/src/main.rs index f3bd73d..bf1def3 100644 --- a/ipctest/src/main.rs +++ b/ipctest/src/main.rs @@ -8,57 +8,58 @@ extern crate log; use std::process::exit; -mod client; - -use audioipc::errors::{Error, Result}; +use audioipc::errors::Result; +use ipctest::TestServer; // Run with 'RUST_LOG=run,audioipc cargo run -p ipctest' #[cfg(unix)] fn run(wait_for_debugger: bool) -> Result<()> { - use std::ffi::CString; - let init_params = audioipc_server::AudioIpcServerInitParams { - thread_create_callback: None, - thread_destroy_callback: None, - }; - let handle = unsafe { - audioipc_server::audioipc2_server_start(std::ptr::null(), std::ptr::null(), &init_params) - }; - let fd = audioipc_server::audioipc2_server_new_client(handle, 0); - let fd = unsafe { - let new_fd = libc::dup(fd); - libc::close(fd); - new_fd - }; - assert!(fd > audioipc::INVALID_HANDLE_VALUE); + use audioipc::errors::Error; - let args: Vec = std::env::args().collect(); + let server = TestServer::new(); + + // Create a socketpair for passing the client fd after fork. + let mut sock_fds = [0i32; 2]; + assert_eq!( + unsafe { libc::socketpair(libc::AF_UNIX, libc::SOCK_STREAM, 0, sock_fds.as_mut_ptr()) }, + 0 + ); match unsafe { libc::fork() } { -1 => return Err(Error::Other("fork() failed".into())), 0 => { - let self_path = CString::new(&*args[0]).unwrap(); - let child_arg1 = CString::new("--client").unwrap(); - let child_arg2 = CString::new("--fd").unwrap(); - let child_arg3 = CString::new(format!("{fd}")).unwrap(); - let child_arg4 = if wait_for_debugger { - CString::new("--wait-for-debugger").unwrap() - } else { - CString::new("").unwrap() - }; - let child_args = [ - self_path.as_ptr(), - child_arg1.as_ptr(), - child_arg2.as_ptr(), - child_arg3.as_ptr(), - child_arg4.as_ptr(), - std::ptr::null(), - ]; - let r = unsafe { libc::execv(self_path.as_ptr(), &child_args as *const _) }; - assert_eq!(r, 0); + // Child: receive client fd from parent via SCM_RIGHTS. + unsafe { libc::close(sock_fds[1]) }; + let fd = ipctest::recv_fd(sock_fds[0]); + unsafe { libc::close(sock_fds[0]) }; + + eprintln!("AudioIPC client (pid {})", std::process::id()); + if wait_for_debugger { + eprintln!("Waiting for debugger to attach; hit enter to continue."); + let mut input = String::new(); + let _ = std::io::stdin().read_line(&mut input); + } + + std::process::exit(match ipctest::client::client_test(fd) { + Ok(()) => 0, + Err(e) => { + error!("error: {e}"); + 1 + } + }); } - n => unsafe { + n => { + // Parent: create server connection with child's pid, send fd to child. + unsafe { libc::close(sock_fds[0]) }; + let fd = server.new_client(n as u32); + ipctest::send_fd(sock_fds[1], fd); + unsafe { + libc::close(fd); + libc::close(sock_fds[1]); + } + let mut status: libc::c_int = 0; - libc::waitpid(n, &mut status, 0); + unsafe { libc::waitpid(n, &mut status, 0) }; if libc::WIFSIGNALED(status) { let signum = libc::WTERMSIG(status); if libc::WCOREDUMP(status) { @@ -71,48 +72,60 @@ fn run(wait_for_debugger: bool) -> Result<()> { ))); } } - }, + } }; - audioipc_server::audioipc2_server_stop(handle); - Ok(()) } -#[cfg(unix)] -fn run_client() -> Result<()> { - let args: Vec = std::env::args().collect(); - assert_eq!(args[2], "--fd"); - let target_fd: i32 = args[3].parse().unwrap(); - client::client_test(target_fd) -} - #[allow(clippy::unnecessary_wraps)] #[cfg(windows)] fn run(wait_for_debugger: bool) -> Result<()> { - let init_params = audioipc_server::AudioIpcServerInitParams { - thread_create_callback: None, - thread_destroy_callback: None, - }; - let handle = unsafe { - audioipc_server::audioipc2_server_start(std::ptr::null(), std::ptr::null(), &init_params) + use std::io::Write; + use std::process::{Command, Stdio}; + use windows_sys::Win32::{ + Foundation::{CloseHandle, DuplicateHandle, DUPLICATE_SAME_ACCESS, FALSE, HANDLE}, + System::Threading::{GetCurrentProcess, OpenProcess, PROCESS_DUP_HANDLE}, }; - let fd = audioipc_server::audioipc2_server_new_client(handle, 0); + + let server = TestServer::new(); let args: Vec = std::env::args().collect(); - let mut cmd = std::process::Command::new(&args[0]); - cmd.env("AUDIOIPC_PID", format!("{}", std::process::id())) - .env("AUDIOIPC_HANDLE", format!("{}", fd as usize)) - .arg("--client"); + let mut cmd = Command::new(&args[0]); + cmd.arg("--client").stdin(Stdio::piped()); if wait_for_debugger { cmd.arg("--wait-for-debugger"); } let mut child = cmd.spawn().expect("child process failed"); + let child_pid = child.id(); - child.wait().expect("child process wait failed"); + let fd = server.new_client(child_pid); - audioipc_server::audioipc2_server_stop(handle); + // Duplicate the handle into the child process and send the value via stdin. + let client_handle = unsafe { + let child_process = OpenProcess(PROCESS_DUP_HANDLE, FALSE, child_pid); + assert!(child_process != 0, "OpenProcess failed"); + + let mut target_handle: HANDLE = 0; + let ok = DuplicateHandle( + GetCurrentProcess(), + fd as HANDLE, + child_process, + &mut target_handle, + 0, + FALSE, + DUPLICATE_SAME_ACCESS, + ); + CloseHandle(child_process); + assert!(ok != FALSE, "DuplicateHandle failed"); + target_handle + }; + + writeln!(child.stdin.take().unwrap(), "{}", client_handle as usize) + .expect("failed to send handle to child"); + + child.wait().expect("child process wait failed"); Ok(()) } @@ -120,56 +133,33 @@ fn run(wait_for_debugger: bool) -> Result<()> { #[cfg(windows)] fn run_client() -> Result<()> { use audioipc::PlatformHandleType; - use windows_sys::Win32::{ - Foundation::{ - CloseHandle, DuplicateHandle, DUPLICATE_SAME_ACCESS, FALSE, HANDLE, - INVALID_HANDLE_VALUE, - }, - System::Threading::{GetCurrentProcess, OpenProcess, PROCESS_DUP_HANDLE}, - }; - let pid: u32 = std::env::var("AUDIOIPC_PID").unwrap().parse().unwrap(); - let handle: usize = std::env::var("AUDIOIPC_HANDLE").unwrap().parse().unwrap(); + // Read the pre-duplicated handle value from stdin. + let mut line = String::new(); + std::io::stdin() + .read_line(&mut line) + .expect("failed to read handle from stdin"); + let handle: usize = line.trim().parse().expect("invalid handle value"); - let mut target_handle = INVALID_HANDLE_VALUE; - unsafe { - let source = OpenProcess(PROCESS_DUP_HANDLE, FALSE, pid); - let target = GetCurrentProcess(); - - let ok = DuplicateHandle( - source, - handle as HANDLE, - target, - &mut target_handle, - 0, - FALSE, - DUPLICATE_SAME_ACCESS, - ); - CloseHandle(source); - if ok == FALSE { - return Err(Error::Other("DuplicateHandle failed".into())); - } - } - - client::client_test(target_handle as PlatformHandleType) + ipctest::client::client_test(handle as PlatformHandleType) } fn main() { env_logger::init(); - let mut client = false; + let mut is_client = false; let mut wait_for_debugger = false; for arg in std::env::args() { if arg == "--client" { - client = true; + is_client = true; } if arg == "--wait-for-debugger" { wait_for_debugger = true; } } - let result = if !client { + let result = if !is_client { eprintln!("AudioIPC server (pid {})", std::process::id()); run(wait_for_debugger) } else { @@ -179,7 +169,14 @@ fn main() { let mut input = String::new(); let _ = std::io::stdin().read_line(&mut input); } - run_client() + #[cfg(windows)] + { + run_client() + } + #[cfg(not(windows))] + { + unreachable!("Unix client runs directly in forked child"); + } }; if let Err(ref e) = result { diff --git a/ipctest/tests/e2e.rs b/ipctest/tests/e2e.rs new file mode 100644 index 0000000..1eb03a1 --- /dev/null +++ b/ipctest/tests/e2e.rs @@ -0,0 +1,139 @@ +// Copyright © 2026 Mozilla Foundation +// +// This program is made available under an ISC-style license. See the +// accompanying file LICENSE for details. + +/// Multi-process end-to-end test: start server, spawn self as a child client +/// process, pass the IPC handle via SCM_RIGHTS over a socketpair, and run +/// client_test in the child. +#[cfg(unix)] +#[test] +fn multi_process_client_test() { + use std::os::unix::process::CommandExt; + use std::process::Command; + + // Child path: we've been re-spawned; the socketpair fd is in env var. + if let Ok(fd_str) = std::env::var("AUDIOIPC_TEST_CLIENT_FD") { + let sock_fd: i32 = fd_str.parse().expect("invalid fd"); + let fd = ipctest::recv_fd(sock_fd); + unsafe { libc::close(sock_fd) }; + std::process::exit(match ipctest::client::client_test(fd) { + Ok(()) => 0, + Err(e) => { + eprintln!("client_test failed: {e}"); + 1 + } + }); + } + + let _ = env_logger::try_init(); + + let server = ipctest::TestServer::new(); + + let mut sock_fds = [0i32; 2]; + assert_eq!( + unsafe { libc::socketpair(libc::AF_UNIX, libc::SOCK_STREAM, 0, sock_fds.as_mut_ptr()) }, + 0 + ); + let (parent_fd, child_fd) = (sock_fds[0], sock_fds[1]); + + // Spawn self as a child, running just this test with the fd env var set. + let mut cmd = Command::new(std::env::current_exe().unwrap()); + cmd.env("AUDIOIPC_TEST_CLIENT_FD", child_fd.to_string()) + .arg("--exact") + .arg("multi_process_client_test") + .arg("--nocapture"); + // Clear FD_CLOEXEC on child_fd in the post-fork/pre-exec child so the + // socketpair fd survives exec. pre_exec only mutates the child's FD table. + unsafe { + cmd.pre_exec(move || { + let flags = libc::fcntl(child_fd, libc::F_GETFD); + if flags < 0 || libc::fcntl(child_fd, libc::F_SETFD, flags & !libc::FD_CLOEXEC) < 0 { + return Err(std::io::Error::last_os_error()); + } + Ok(()) + }); + } + let mut child = cmd.spawn().expect("child process failed"); + let child_pid = child.id(); + + // Parent doesn't need its copy of the child-side fd anymore. + unsafe { libc::close(child_fd) }; + + let fd = server.new_client(child_pid); + ipctest::send_fd(parent_fd, fd); + unsafe { + libc::close(fd); + libc::close(parent_fd); + } + + let status = child.wait().expect("child process wait failed"); + assert!(status.success(), "child exited with: {}", status); +} + +/// Multi-process end-to-end test for Windows: spawn self as a child process, +/// duplicate the IPC handle into the child, and send the handle value via stdin. +#[cfg(windows)] +#[test] +fn multi_process_client_test() { + use audioipc::PlatformHandleType; + use std::io::Write; + use std::process::{Command, Stdio}; + use windows_sys::Win32::{ + Foundation::{CloseHandle, DuplicateHandle, DUPLICATE_SAME_ACCESS, FALSE, HANDLE}, + System::Threading::{GetCurrentProcess, OpenProcess, PROCESS_DUP_HANDLE}, + }; + + // Child path: if this env var is set, we're the spawned child. + if std::env::var("AUDIOIPC_TEST_CLIENT").is_ok() { + let mut line = String::new(); + std::io::stdin() + .read_line(&mut line) + .expect("failed to read handle from stdin"); + let handle: usize = line.trim().parse().expect("invalid handle value"); + ipctest::client::client_test(handle as PlatformHandleType) + .expect("client_test failed in child"); + return; + } + + let _ = env_logger::try_init(); + + let server = ipctest::TestServer::new(); + + // Spawn self as a child, running just this test with the client env var set. + let mut cmd = Command::new(std::env::current_exe().unwrap()); + cmd.env("AUDIOIPC_TEST_CLIENT", "1") + .arg("--exact") + .arg("multi_process_client_test") + .arg("--nocapture") + .stdin(Stdio::piped()); + let mut child = cmd.spawn().expect("child process failed"); + let child_pid = child.id(); + + let fd = server.new_client(child_pid); + + let client_handle = unsafe { + let child_process = OpenProcess(PROCESS_DUP_HANDLE, FALSE, child_pid); + assert!(child_process != 0, "OpenProcess failed"); + + let mut target_handle: HANDLE = 0; + let ok = DuplicateHandle( + GetCurrentProcess(), + fd as HANDLE, + child_process, + &mut target_handle, + 0, + FALSE, + DUPLICATE_SAME_ACCESS, + ); + CloseHandle(child_process); + assert!(ok != FALSE, "DuplicateHandle failed"); + target_handle + }; + + writeln!(child.stdin.take().unwrap(), "{}", client_handle as usize) + .expect("failed to send handle to child"); + + let status = child.wait().expect("child process wait failed"); + assert!(status.success(), "child exited with: {}", status); +} diff --git a/server/src/lib.rs b/server/src/lib.rs index ef0aeb8..9622c33 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -160,6 +160,7 @@ pub unsafe extern "C" fn audioipc2_server_start( #[no_mangle] pub extern "C" fn audioipc2_server_new_client( p: *mut c_void, + remote_pid: u32, shm_area_size: usize, ) -> PlatformHandleType { let wrapper: &ServerWrapper = unsafe { &*(p as *mut _) }; @@ -182,6 +183,7 @@ pub extern "C" fn audioipc2_server_new_client( let server = server::CubebServer::new( callback_thread.clone(), device_collection_thread.clone(), + remote_pid, shm_area_size, ); if let Err(e) = rpc_thread.bind_server(server, server_pipe) { diff --git a/server/src/server.rs b/server/src/server.rs index b52a95f..a9971d0 100644 --- a/server/src/server.rs +++ b/server/src/server.rs @@ -430,9 +430,6 @@ impl rpccore::Server for CubebServer { type ClientMessage = ClientMessage; fn process(&mut self, req: Self::ServerMessage) -> Self::ClientMessage { - if let ServerMessage::ClientConnect(pid) = req { - self.remote_pid = Some(pid); - } with_local_context(|context, manager| match *context { Err(_) => error(cubeb::Error::Error), Ok(ref context) => self.process_msg(context, manager, &req), @@ -465,13 +462,14 @@ impl CubebServer { pub fn new( callback_thread: ipccore::EventLoopHandle, device_collection_thread: ipccore::EventLoopHandle, + remote_pid: u32, shm_area_size: usize, ) -> Self { CubebServer { callback_thread, device_collection_thread, streams: slab::Slab::::new(), - remote_pid: None, + remote_pid: Some(remote_pid), device_collection_change_callbacks: None, devidmap: DevIdMap::new(), shm_area_size, @@ -486,8 +484,7 @@ impl CubebServer { msg: &ServerMessage, ) -> ClientMessage { let resp: ClientMessage = match *msg { - ServerMessage::ClientConnect(_) => { - // remote_pid is set before cubeb initialization, just verify here. + ServerMessage::ClientConnect => { assert!(self.remote_pid.is_some()); ClientMessage::ClientConnected }