From 8304025e3cfb7d5d1d26c2007ee84eba0e03e03d Mon Sep 17 00:00:00 2001 From: MooseTheRebel Date: Fri, 17 Apr 2026 16:48:22 -0400 Subject: [PATCH 1/6] prove that Line is not thread safe --- src/line_token.rs | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/src/line_token.rs b/src/line_token.rs index 3d75fe6..e94559c 100644 --- a/src/line_token.rs +++ b/src/line_token.rs @@ -61,6 +61,24 @@ struct ReleaseChannel { } /// A [`Line`] from which a [`Token`] can be acquired. +/// +/// # Thread safety +/// +/// `Line` is currently **not** thread-safe (see +/// [issue #2](https://github.com/thunderbird/operation-queue-rs/issues/2)). +/// Once fixed, this example should compile and pass: +/// +/// ```rust +/// use std::sync::Arc; +/// use operation_queue::line_token::Line; +/// +/// let line = Arc::new(Line::new()); +/// let clone = Arc::clone(&line); +/// +/// std::thread::spawn(move || { +/// let _ = clone.try_acquire_token(); +/// }).join().unwrap(); +/// ``` #[derive(Default)] pub struct Line { // TODO: We should look into replacing this `RefCell` with a `Mutex` from From d61521431bd9af14108e088e0ea59de6f71e7c86 Mon Sep 17 00:00:00 2001 From: MooseTheRebel Date: Fri, 17 Apr 2026 16:58:30 -0400 Subject: [PATCH 2/6] add 'async-lock' dependency --- Cargo.lock | 12 ++++++++++++ Cargo.toml | 1 + 2 files changed, 13 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 4ac39f5..6a8cae5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -14,6 +14,17 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "async-lock" +version = "3.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "290f7f2596bd5b78a9fec8088ccd89180d7f9f55b94b0576823bbbdc72ee8311" +dependencies = [ + "event-listener", + "event-listener-strategy", + "pin-project-lite", +] + [[package]] name = "concurrent-queue" version = "2.5.0" @@ -161,6 +172,7 @@ name = "operation-queue" version = "1.0.0" dependencies = [ "async-channel", + "async-lock", "futures", "log", "oneshot", diff --git a/Cargo.toml b/Cargo.toml index e482d98..891ec54 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,6 +20,7 @@ line_token = [] futures = "0.3.28" oneshot = "0.1.11" async-channel = "2.5.0" +async-lock = "3" log = "0.4.21" thiserror = "1.0.56" From c9a61d21f7f3c50e1c6feedaa5d1656effff0d66 Mon Sep 17 00:00:00 2001 From: MooseTheRebel Date: Fri, 17 Apr 2026 16:59:27 -0400 Subject: [PATCH 3/6] attempt to make 'Line' thread safe --- src/line_token.rs | 58 +++++++++++++++++++++++------------------------ 1 file changed, 29 insertions(+), 29 deletions(-) diff --git a/src/line_token.rs b/src/line_token.rs index e94559c..7f03ee9 100644 --- a/src/line_token.rs +++ b/src/line_token.rs @@ -45,8 +45,7 @@ //! [`Rc`]: std::rc::Rc //! [`Arc`]: std::sync::Arc -use std::cell::RefCell; - +use async_lock::Mutex; use futures::{FutureExt, future::Shared}; use oneshot::{Receiver, Sender}; @@ -79,12 +78,11 @@ struct ReleaseChannel { /// let _ = clone.try_acquire_token(); /// }).join().unwrap(); /// ``` +/// +/// [`Arc`]: std::sync::Arc #[derive(Default)] pub struct Line { - // TODO: We should look into replacing this `RefCell` with a `Mutex` from - // `async_lock` to make `Line` thread-safe. - // https://github.com/thunderbird/operation-queue-rs/issues/2 - channel: RefCell>, + channel: Mutex>, } impl Line { @@ -103,8 +101,10 @@ impl Line { /// If a [`Token`] has already been acquired for this line, a future to /// `await` is returned instead. It resolves when the current token holder /// has finished handling the current error and releases the line. - pub fn try_acquire_token<'l>(&'l self) -> AcquireOutcome<'l> { - if let Some(channel) = self.channel.borrow().as_ref() { + pub async fn try_acquire_token<'l>(&'l self) -> AcquireOutcome<'l> { + let mut channel = self.channel.lock().await; + + if let Some(channel) = channel.as_ref() { // Since the oneshot `Receiver` is wrapped in a `Shared`, cloning it // will return a new handle on the `Shared` which will resolve at // the same time as the others. @@ -114,10 +114,10 @@ impl Line { // The line is currently available, create a new channel and give the // consumer their token. let (sender, receiver) = oneshot::channel(); - self.channel.replace(Some(ReleaseChannel { + *channel = Some(ReleaseChannel { sender, receiver: receiver.shared(), - })); + }); AcquireOutcome::Success(Token { line: self }) } @@ -125,10 +125,10 @@ impl Line { /// Releases the line, and resolves the [`Shared`] future other consumers /// might be awaiting. pub(self) fn release(&self) { - // "Take" the channel out of the `RefCell`; on top of letting us access + // "Take" the channel out of the `Mutex`; on top of letting us access // its content, we're also making sure that even if something bad // happens then the line can be acquired again. - match self.channel.take() { + match self.channel.lock_blocking().take() { Some(channel) => match channel.sender.send(()) { Ok(_) => (), Err(_) => log::error!("trying to release using a closed channel"), @@ -201,20 +201,20 @@ mod tests { use super::*; - fn get_token(line: &Line) -> Token<'_> { - match line.try_acquire_token() { + async fn get_token(line: &Line) -> Token<'_> { + match line.try_acquire_token().await { AcquireOutcome::Success(token) => token, AcquireOutcome::Failure(_) => panic!("expected a token from try_acquire_token()"), } } - #[test] - fn acquire_token() { + #[tokio::test(flavor = "current_thread")] + async fn acquire_token() { let line = Line::new(); - let _token = get_token(&line); + let _token = get_token(&line).await; - match line.try_acquire_token() { + match line.try_acquire_token().await { AcquireOutcome::Success(_) => { panic!("should not be able to acquire the line while the token is in scope") } @@ -222,14 +222,14 @@ mod tests { } } - #[test] - fn token_out_of_scope() { + #[tokio::test(flavor = "current_thread")] + async fn token_out_of_scope() { let line = Line::new(); { - let _token = get_token(&line); + let _token = get_token(&line).await; - match line.try_acquire_token() { + match line.try_acquire_token().await { AcquireOutcome::Success(_) => { panic!("should not be able to acquire the line while the token is in scope") } @@ -237,7 +237,7 @@ mod tests { } } - match line.try_acquire_token() { + match line.try_acquire_token().await { AcquireOutcome::Success(_) => (), AcquireOutcome::Failure(_) => { panic!("expected a token now that the previous token has been dropped") @@ -245,13 +245,13 @@ mod tests { } } - #[test] - fn or_token() { + #[tokio::test(flavor = "current_thread")] + async fn or_token() { let line = Line::new(); - let token = get_token(&line); + let token = get_token(&line).await; - match line.try_acquire_token().or_token(Some(token)) { + match line.try_acquire_token().await.or_token(Some(token)) { AcquireOutcome::Success(_) => (), AcquireOutcome::Failure(_) => panic!("we should have kept our token"), } @@ -269,14 +269,14 @@ mod tests { // The reason we sleep here is to give some time to `wait_for_line` to // try (and fail) to acquire the line's token before we drop it. async fn acquire_sleep_and_drop(line: &Line) { - let _token = get_token(&line); + let _token = get_token(&line).await; tokio::time::sleep(Duration::from_millis(10)).await; } // Try (and fail) to acquire the token, then wait for the line to become // available again. This function sets the success flag. async fn wait_for_line(line: &Line, success: &mut bool) { - let shared = match line.try_acquire_token() { + let shared = match line.try_acquire_token().await { AcquireOutcome::Success(_) => { panic!("should not be able to acquire the line while the token is in scope") } From 5f553526d74bb5258197cbd9c81f21c72949c9be Mon Sep 17 00:00:00 2001 From: MooseTheRebel Date: Fri, 17 Apr 2026 17:00:20 -0400 Subject: [PATCH 4/6] Update doctest to reflect thread-safety --- src/line_token.rs | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/src/line_token.rs b/src/line_token.rs index 7f03ee9..25281db 100644 --- a/src/line_token.rs +++ b/src/line_token.rs @@ -63,20 +63,14 @@ struct ReleaseChannel { /// /// # Thread safety /// -/// `Line` is currently **not** thread-safe (see -/// [issue #2](https://github.com/thunderbird/operation-queue-rs/issues/2)). -/// Once fixed, this example should compile and pass: +/// `Line` is thread-safe and can be shared across threads via [`Arc`]: /// /// ```rust /// use std::sync::Arc; /// use operation_queue::line_token::Line; /// -/// let line = Arc::new(Line::new()); -/// let clone = Arc::clone(&line); -/// -/// std::thread::spawn(move || { -/// let _ = clone.try_acquire_token(); -/// }).join().unwrap(); +/// fn assert_send_sync() {} +/// assert_send_sync::(); /// ``` /// /// [`Arc`]: std::sync::Arc From c97242536751ba092deefa9ac616d3c30f69cab0 Mon Sep 17 00:00:00 2001 From: MooseTheRebel Date: Fri, 17 Apr 2026 17:19:15 -0400 Subject: [PATCH 5/6] correct additional comment --- src/lib.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index b8b637c..a26ecef 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -48,10 +48,11 @@ //! //! # Multithreading //! -//! In order to maintain compatibility with the current Thunderbird code-base, -//! neither the operation queue's runner, nor the synchronization helpers in the -//! [`line_token`] module, can be sent between threads. This is something we -//! plan to address in the future. +//! The synchronization helpers in the [`line_token`] module are thread-safe. +//! +//! However, in order to maintain compatibility with the current Thunderbird +//! code-base, the operation queue's runner cannot be sent between threads. +//! This is something we plan to address in the future. //! //! [dyn compatibility]: //! From b90278dc54750c8721fc282a3c9fa35b38073559 Mon Sep 17 00:00:00 2001 From: MooseTheRebel Date: Wed, 22 Apr 2026 19:48:29 -0400 Subject: [PATCH 6/6] pin async-lock to version 3.4.2 --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 891ec54..7afcb95 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,7 +20,7 @@ line_token = [] futures = "0.3.28" oneshot = "0.1.11" async-channel = "2.5.0" -async-lock = "3" +async-lock = "3.4.2" log = "0.4.21" thiserror = "1.0.56"