diff --git a/Cargo.lock b/Cargo.lock index 4ac39f5..6a8cae5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -14,6 +14,17 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "async-lock" +version = "3.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "290f7f2596bd5b78a9fec8088ccd89180d7f9f55b94b0576823bbbdc72ee8311" +dependencies = [ + "event-listener", + "event-listener-strategy", + "pin-project-lite", +] + [[package]] name = "concurrent-queue" version = "2.5.0" @@ -161,6 +172,7 @@ name = "operation-queue" version = "1.0.0" dependencies = [ "async-channel", + "async-lock", "futures", "log", "oneshot", diff --git a/Cargo.toml b/Cargo.toml index e482d98..7afcb95 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,6 +20,7 @@ line_token = [] futures = "0.3.28" oneshot = "0.1.11" async-channel = "2.5.0" +async-lock = "3.4.2" log = "0.4.21" thiserror = "1.0.56" diff --git a/src/lib.rs b/src/lib.rs index b8b637c..a26ecef 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -48,10 +48,11 @@ //! //! # Multithreading //! -//! In order to maintain compatibility with the current Thunderbird code-base, -//! neither the operation queue's runner, nor the synchronization helpers in the -//! [`line_token`] module, can be sent between threads. This is something we -//! plan to address in the future. +//! The synchronization helpers in the [`line_token`] module are thread-safe. +//! +//! However, in order to maintain compatibility with the current Thunderbird +//! code-base, the operation queue's runner cannot be sent between threads. +//! This is something we plan to address in the future. //! //! [dyn compatibility]: //! diff --git a/src/line_token.rs b/src/line_token.rs index 3d75fe6..25281db 100644 --- a/src/line_token.rs +++ b/src/line_token.rs @@ -45,8 +45,7 @@ //! [`Rc`]: std::rc::Rc //! [`Arc`]: std::sync::Arc -use std::cell::RefCell; - +use async_lock::Mutex; use futures::{FutureExt, future::Shared}; use oneshot::{Receiver, Sender}; @@ -61,12 +60,23 @@ struct ReleaseChannel { } /// A [`Line`] from which a [`Token`] can be acquired. +/// +/// # Thread safety +/// +/// `Line` is thread-safe and can be shared across threads via [`Arc`]: +/// +/// ```rust +/// use std::sync::Arc; +/// use operation_queue::line_token::Line; +/// +/// fn assert_send_sync() {} +/// assert_send_sync::(); +/// ``` +/// +/// [`Arc`]: std::sync::Arc #[derive(Default)] pub struct Line { - // TODO: We should look into replacing this `RefCell` with a `Mutex` from - // `async_lock` to make `Line` thread-safe. - // https://github.com/thunderbird/operation-queue-rs/issues/2 - channel: RefCell>, + channel: Mutex>, } impl Line { @@ -85,8 +95,10 @@ impl Line { /// If a [`Token`] has already been acquired for this line, a future to /// `await` is returned instead. It resolves when the current token holder /// has finished handling the current error and releases the line. - pub fn try_acquire_token<'l>(&'l self) -> AcquireOutcome<'l> { - if let Some(channel) = self.channel.borrow().as_ref() { + pub async fn try_acquire_token<'l>(&'l self) -> AcquireOutcome<'l> { + let mut channel = self.channel.lock().await; + + if let Some(channel) = channel.as_ref() { // Since the oneshot `Receiver` is wrapped in a `Shared`, cloning it // will return a new handle on the `Shared` which will resolve at // the same time as the others. @@ -96,10 +108,10 @@ impl Line { // The line is currently available, create a new channel and give the // consumer their token. let (sender, receiver) = oneshot::channel(); - self.channel.replace(Some(ReleaseChannel { + *channel = Some(ReleaseChannel { sender, receiver: receiver.shared(), - })); + }); AcquireOutcome::Success(Token { line: self }) } @@ -107,10 +119,10 @@ impl Line { /// Releases the line, and resolves the [`Shared`] future other consumers /// might be awaiting. pub(self) fn release(&self) { - // "Take" the channel out of the `RefCell`; on top of letting us access + // "Take" the channel out of the `Mutex`; on top of letting us access // its content, we're also making sure that even if something bad // happens then the line can be acquired again. - match self.channel.take() { + match self.channel.lock_blocking().take() { Some(channel) => match channel.sender.send(()) { Ok(_) => (), Err(_) => log::error!("trying to release using a closed channel"), @@ -183,20 +195,20 @@ mod tests { use super::*; - fn get_token(line: &Line) -> Token<'_> { - match line.try_acquire_token() { + async fn get_token(line: &Line) -> Token<'_> { + match line.try_acquire_token().await { AcquireOutcome::Success(token) => token, AcquireOutcome::Failure(_) => panic!("expected a token from try_acquire_token()"), } } - #[test] - fn acquire_token() { + #[tokio::test(flavor = "current_thread")] + async fn acquire_token() { let line = Line::new(); - let _token = get_token(&line); + let _token = get_token(&line).await; - match line.try_acquire_token() { + match line.try_acquire_token().await { AcquireOutcome::Success(_) => { panic!("should not be able to acquire the line while the token is in scope") } @@ -204,14 +216,14 @@ mod tests { } } - #[test] - fn token_out_of_scope() { + #[tokio::test(flavor = "current_thread")] + async fn token_out_of_scope() { let line = Line::new(); { - let _token = get_token(&line); + let _token = get_token(&line).await; - match line.try_acquire_token() { + match line.try_acquire_token().await { AcquireOutcome::Success(_) => { panic!("should not be able to acquire the line while the token is in scope") } @@ -219,7 +231,7 @@ mod tests { } } - match line.try_acquire_token() { + match line.try_acquire_token().await { AcquireOutcome::Success(_) => (), AcquireOutcome::Failure(_) => { panic!("expected a token now that the previous token has been dropped") @@ -227,13 +239,13 @@ mod tests { } } - #[test] - fn or_token() { + #[tokio::test(flavor = "current_thread")] + async fn or_token() { let line = Line::new(); - let token = get_token(&line); + let token = get_token(&line).await; - match line.try_acquire_token().or_token(Some(token)) { + match line.try_acquire_token().await.or_token(Some(token)) { AcquireOutcome::Success(_) => (), AcquireOutcome::Failure(_) => panic!("we should have kept our token"), } @@ -251,14 +263,14 @@ mod tests { // The reason we sleep here is to give some time to `wait_for_line` to // try (and fail) to acquire the line's token before we drop it. async fn acquire_sleep_and_drop(line: &Line) { - let _token = get_token(&line); + let _token = get_token(&line).await; tokio::time::sleep(Duration::from_millis(10)).await; } // Try (and fail) to acquire the token, then wait for the line to become // available again. This function sets the success flag. async fn wait_for_line(line: &Line, success: &mut bool) { - let shared = match line.try_acquire_token() { + let shared = match line.try_acquire_token().await { AcquireOutcome::Success(_) => { panic!("should not be able to acquire the line while the token is in scope") }