Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ line_token = []
futures = "0.3.28"
oneshot = "0.1.11"
async-channel = "2.5.0"
async-lock = "3.4.2"
log = "0.4.21"
thiserror = "1.0.56"

Expand Down
9 changes: 5 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,11 @@
//!
//! # Multithreading
//!
//! In order to maintain compatibility with the current Thunderbird code-base,
//! neither the operation queue's runner, nor the synchronization helpers in the
//! [`line_token`] module, can be sent between threads. This is something we
//! plan to address in the future.
//! The synchronization helpers in the [`line_token`] module are thread-safe.
//!
//! However, in order to maintain compatibility with the current Thunderbird
//! code-base, the operation queue's runner cannot be sent between threads.
//! This is something we plan to address in the future.
//!
//! [dyn compatibility]:
//! <https://doc.rust-lang.org/reference/items/traits.html#dyn-compatibility>
Expand Down
70 changes: 41 additions & 29 deletions src/line_token.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@
//! [`Rc`]: std::rc::Rc
//! [`Arc`]: std::sync::Arc

use std::cell::RefCell;

use async_lock::Mutex;
use futures::{FutureExt, future::Shared};
use oneshot::{Receiver, Sender};

Expand All @@ -61,12 +60,23 @@ struct ReleaseChannel {
}

/// A [`Line`] from which a [`Token`] can be acquired.
///
/// # Thread safety
///
/// `Line` is thread-safe and can be shared across threads via [`Arc`]:
///
/// ```rust
/// use std::sync::Arc;
/// use operation_queue::line_token::Line;
///
/// fn assert_send_sync<T: Send + Sync>() {}
/// assert_send_sync::<Line>();
/// ```
///
/// [`Arc`]: std::sync::Arc
#[derive(Default)]
pub struct Line {
// TODO: We should look into replacing this `RefCell` with a `Mutex` from
// `async_lock` to make `Line` thread-safe.
// https://github.com/thunderbird/operation-queue-rs/issues/2
channel: RefCell<Option<ReleaseChannel>>,
channel: Mutex<Option<ReleaseChannel>>,
}

impl Line {
Expand All @@ -85,8 +95,10 @@ impl Line {
/// If a [`Token`] has already been acquired for this line, a future to
/// `await` is returned instead. It resolves when the current token holder
/// has finished handling the current error and releases the line.
pub fn try_acquire_token<'l>(&'l self) -> AcquireOutcome<'l> {
if let Some(channel) = self.channel.borrow().as_ref() {
pub async fn try_acquire_token<'l>(&'l self) -> AcquireOutcome<'l> {
let mut channel = self.channel.lock().await;

if let Some(channel) = channel.as_ref() {
// Since the oneshot `Receiver` is wrapped in a `Shared`, cloning it
// will return a new handle on the `Shared` which will resolve at
// the same time as the others.
Expand All @@ -96,21 +108,21 @@ impl Line {
// The line is currently available, create a new channel and give the
// consumer their token.
let (sender, receiver) = oneshot::channel();
self.channel.replace(Some(ReleaseChannel {
*channel = Some(ReleaseChannel {
sender,
receiver: receiver.shared(),
}));
});

AcquireOutcome::Success(Token { line: self })
}

/// Releases the line, and resolves the [`Shared`] future other consumers
/// might be awaiting.
pub(self) fn release(&self) {
// "Take" the channel out of the `RefCell`; on top of letting us access
// "Take" the channel out of the `Mutex`; on top of letting us access
// its content, we're also making sure that even if something bad
// happens then the line can be acquired again.
match self.channel.take() {
match self.channel.lock_blocking().take() {
Some(channel) => match channel.sender.send(()) {
Ok(_) => (),
Err(_) => log::error!("trying to release using a closed channel"),
Expand Down Expand Up @@ -183,57 +195,57 @@ mod tests {

use super::*;

fn get_token(line: &Line) -> Token<'_> {
match line.try_acquire_token() {
async fn get_token(line: &Line) -> Token<'_> {
match line.try_acquire_token().await {
AcquireOutcome::Success(token) => token,
AcquireOutcome::Failure(_) => panic!("expected a token from try_acquire_token()"),
}
}

#[test]
fn acquire_token() {
#[tokio::test(flavor = "current_thread")]
async fn acquire_token() {
let line = Line::new();

let _token = get_token(&line);
let _token = get_token(&line).await;

match line.try_acquire_token() {
match line.try_acquire_token().await {
AcquireOutcome::Success(_) => {
panic!("should not be able to acquire the line while the token is in scope")
}
AcquireOutcome::Failure(_) => (),
}
}

#[test]
fn token_out_of_scope() {
#[tokio::test(flavor = "current_thread")]
async fn token_out_of_scope() {
let line = Line::new();

{
let _token = get_token(&line);
let _token = get_token(&line).await;

match line.try_acquire_token() {
match line.try_acquire_token().await {
AcquireOutcome::Success(_) => {
panic!("should not be able to acquire the line while the token is in scope")
}
AcquireOutcome::Failure(_) => (),
}
}

match line.try_acquire_token() {
match line.try_acquire_token().await {
AcquireOutcome::Success(_) => (),
AcquireOutcome::Failure(_) => {
panic!("expected a token now that the previous token has been dropped")
}
}
}

#[test]
fn or_token() {
#[tokio::test(flavor = "current_thread")]
async fn or_token() {
let line = Line::new();

let token = get_token(&line);
let token = get_token(&line).await;

match line.try_acquire_token().or_token(Some(token)) {
match line.try_acquire_token().await.or_token(Some(token)) {
AcquireOutcome::Success(_) => (),
AcquireOutcome::Failure(_) => panic!("we should have kept our token"),
}
Expand All @@ -251,14 +263,14 @@ mod tests {
// The reason we sleep here is to give some time to `wait_for_line` to
// try (and fail) to acquire the line's token before we drop it.
async fn acquire_sleep_and_drop(line: &Line) {
let _token = get_token(&line);
let _token = get_token(&line).await;
tokio::time::sleep(Duration::from_millis(10)).await;
}

// Try (and fail) to acquire the token, then wait for the line to become
// available again. This function sets the success flag.
async fn wait_for_line(line: &Line, success: &mut bool) {
let shared = match line.try_acquire_token() {
let shared = match line.try_acquire_token().await {
AcquireOutcome::Success(_) => {
panic!("should not be able to acquire the line while the token is in scope")
}
Expand Down
Loading