Skip to content

Commit 7393a00

Browse files
committed
Use RWLock instead of single Mutext in MemoryAwaitedActionDb.
1 parent 8cfeade commit 7393a00

2 files changed

Lines changed: 24 additions & 12 deletions

File tree

nativelink-metric/src/lib.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -458,6 +458,18 @@ impl<T: MetricsComponent> MetricsComponent for async_lock::Mutex<T> {
458458
}
459459
}
460460

461+
impl<T: MetricsComponent> MetricsComponent for async_lock::RwLock<T> {
462+
fn publish(
463+
&self,
464+
kind: MetricKind,
465+
field_metadata: MetricFieldData,
466+
) -> Result<MetricPublishKnownKindData, Error> {
467+
// It is safe to block in the publishing thread.
468+
let lock = self.read_blocking();
469+
lock.publish(kind, field_metadata)
470+
}
471+
}
472+
461473
impl<T: MetricsComponent> MetricsComponent for parking_lot::Mutex<T> {
462474
fn publish(
463475
&self,

nativelink-scheduler/src/memory_awaited_action_db.rs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use std::collections::{BTreeMap, BTreeSet, HashMap};
1919
use std::iter::Map;
2020
use std::sync::Arc;
2121

22-
use async_lock::Mutex;
22+
use async_lock::{Mutex, RwLock};
2323
use futures::{FutureExt, Stream};
2424
use nativelink_config::stores::EvictionPolicy;
2525
use nativelink_error::{Code, Error, ResultExt, error_if, make_err};
@@ -908,7 +908,7 @@ impl<I: InstantWrapper, NowFn: Fn() -> I + Clone + Send + Sync> AwaitedActionDbI
908908
#[derive(Debug, MetricsComponent)]
909909
pub struct MemoryAwaitedActionDb<I: InstantWrapper, NowFn: Fn() -> I> {
910910
#[metric]
911-
inner: Arc<Mutex<AwaitedActionDbImpl<I, NowFn>>>,
911+
inner: Arc<RwLock<AwaitedActionDbImpl<I, NowFn>>>,
912912
tasks_change_notify: Arc<Notify>,
913913
_handle_awaited_action_events: JoinHandleDropGuard<()>,
914914
}
@@ -922,7 +922,7 @@ impl<I: InstantWrapper, NowFn: Fn() -> I + Clone + Send + Sync + 'static>
922922
now_fn: NowFn,
923923
) -> Self {
924924
let (action_event_tx, mut action_event_rx) = mpsc::unbounded_channel();
925-
let inner = Arc::new(Mutex::new(AwaitedActionDbImpl {
925+
let inner = Arc::new(RwLock::new(AwaitedActionDbImpl {
926926
client_operation_to_awaited_action: EvictingMap::new(eviction_config, (now_fn)()),
927927
operation_id_to_awaited_action: BTreeMap::new(),
928928
action_info_hash_key_to_awaited_action: HashMap::new(),
@@ -945,7 +945,7 @@ impl<I: InstantWrapper, NowFn: Fn() -> I + Clone + Send + Sync + 'static>
945945
let Some(inner) = weak_inner.upgrade() else {
946946
return; // Nothing to cleanup, our struct is dropped.
947947
};
948-
let mut inner = inner.lock().await;
948+
let mut inner = inner.write().await;
949949
inner
950950
.handle_action_events(dropped_operation_ids.drain(..))
951951
.await;
@@ -965,7 +965,7 @@ impl<I: InstantWrapper, NowFn: Fn() -> I + Clone + Send + Sync + 'static> Awaite
965965
client_operation_id: &OperationId,
966966
) -> Result<Option<Self::Subscriber>, Error> {
967967
self.inner
968-
.lock()
968+
.read()
969969
.await
970970
.get_awaited_action_by_id(client_operation_id)
971971
.await
@@ -978,7 +978,7 @@ impl<I: InstantWrapper, NowFn: Fn() -> I + Clone + Send + Sync + 'static> Awaite
978978
Bound::Unbounded,
979979
Bound::Unbounded,
980980
move |start, end, mut output| async move {
981-
let inner = self.inner.lock().await;
981+
let inner = self.inner.read().await;
982982
let mut maybe_new_start = None;
983983

984984
for (operation_id, item) in
@@ -998,11 +998,11 @@ impl<I: InstantWrapper, NowFn: Fn() -> I + Clone + Send + Sync + 'static> Awaite
998998
&self,
999999
operation_id: &OperationId,
10001000
) -> Result<Option<Self::Subscriber>, Error> {
1001-
Ok(self.inner.lock().await.get_by_operation_id(operation_id))
1001+
Ok(self.inner.read().await.get_by_operation_id(operation_id))
10021002
}
10031003

10041004
async fn get_queued_actions(&self) -> Result<Vec<Arc<AwaitedAction>>, Error> {
1005-
let inner = self.inner.lock().await;
1005+
let inner = self.inner.read().await;
10061006

10071007
Ok(inner
10081008
.sorted_action_info_hash_keys
@@ -1028,7 +1028,7 @@ impl<I: InstantWrapper, NowFn: Fn() -> I + Clone + Send + Sync + 'static> Awaite
10281028
start,
10291029
end,
10301030
move |start, end, mut output| async move {
1031-
let inner = self.inner.lock().await;
1031+
let inner = self.inner.read().await;
10321032
let mut done = true;
10331033
let mut new_start = start.as_ref();
10341034
let mut new_end = end.as_ref();
@@ -1071,14 +1071,14 @@ impl<I: InstantWrapper, NowFn: Fn() -> I + Clone + Send + Sync + 'static> Awaite
10711071
let mut results: HashMap<CountableActionStage, usize> =
10721072
HashMap::with_capacity(stages.len());
10731073
for stage in stages {
1074-
results.insert(stage, self.inner.lock().await.count_actions(stage));
1074+
results.insert(stage, self.inner.write().await.count_actions(stage));
10751075
}
10761076
Ok(results)
10771077
}
10781078

10791079
async fn update_awaited_action(&self, new_awaited_action: AwaitedAction) -> Result<(), Error> {
10801080
self.inner
1081-
.lock()
1081+
.write()
10821082
.await
10831083
.update_awaited_action(new_awaited_action)?;
10841084
self.tasks_change_notify.notify_one();
@@ -1093,7 +1093,7 @@ impl<I: InstantWrapper, NowFn: Fn() -> I + Clone + Send + Sync + 'static> Awaite
10931093
) -> Result<Self::Subscriber, Error> {
10941094
let subscriber = self
10951095
.inner
1096-
.lock()
1096+
.write()
10971097
.await
10981098
.add_action(client_operation_id, action_info)
10991099
.await?;

0 commit comments

Comments
 (0)