Skip to content

Commit 558b5b5

Browse files
author
Michael Ingley
committed
pd: fix tso sender lost-wake race
1 parent bfdaa88 commit 558b5b5

1 file changed

Lines changed: 67 additions & 61 deletions

File tree

src/pd/timestamp.rs

Lines changed: 67 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ use std::sync::atomic::AtomicBool;
1717
use std::sync::atomic::Ordering;
1818
use std::sync::Arc;
1919

20-
use futures::pin_mut;
2120
use futures::prelude::*;
2221
use futures::task::AtomicWaker;
2322
use futures::task::Context;
@@ -158,25 +157,22 @@ impl Stream for TsoRequestStream {
158157
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
159158
let mut this = self.project();
160159

161-
let pending_requests = this.pending_requests.lock();
162-
pin_mut!(pending_requests);
163-
let mut pending_requests = if let Poll::Ready(pending_requests) = pending_requests.poll(cx)
164-
{
160+
let mut pending_requests = if let Ok(pending_requests) = this.pending_requests.try_lock() {
165161
this.sender_waiting_on_lock.store(false, Ordering::SeqCst);
166162
pending_requests
167163
} else {
168-
// Lock is held by the response path. Register waker first so any
169-
// subsequent wake() targets the correct waker, then advertise that
170-
// we are waiting.
171-
this.self_waker.register(cx.waker());
172-
this.sender_waiting_on_lock.store(true, Ordering::SeqCst);
173-
// If the response path cleared the flag between our register and
174-
// store, its wake may have targeted a stale waker. Self-wake to
175-
// guarantee we get re-polled.
176-
if !this.sender_waiting_on_lock.load(Ordering::SeqCst) {
177-
cx.waker().wake_by_ref();
164+
let pending_requests = register_sender_wait_for_pending_lock(
165+
cx,
166+
this.self_waker.as_ref(),
167+
this.sender_waiting_on_lock.as_ref(),
168+
|| this.pending_requests.try_lock().ok(),
169+
);
170+
171+
if let Some(pending_requests) = pending_requests {
172+
pending_requests
173+
} else {
174+
return Poll::Pending;
178175
}
179-
return Poll::Pending;
180176
};
181177
let mut requests = Vec::new();
182178

@@ -219,6 +215,29 @@ impl Stream for TsoRequestStream {
219215
}
220216
}
221217

218+
fn register_sender_wait_for_pending_lock<F, G>(
219+
cx: &mut Context<'_>,
220+
self_waker: &AtomicWaker,
221+
sender_waiting_on_lock: &AtomicBool,
222+
mut try_lock_after_register: F,
223+
) -> Option<G>
224+
where
225+
F: FnMut() -> Option<G>,
226+
{
227+
// Register first so a wake from the response path targets the current task.
228+
self_waker.register(cx.waker());
229+
sender_waiting_on_lock.store(true, Ordering::SeqCst);
230+
231+
// Retry once after advertising waiting to close the race where the response path
232+
// already checked/cleared the flag before this store and therefore will not wake.
233+
if let Some(guard) = try_lock_after_register() {
234+
sender_waiting_on_lock.store(false, Ordering::SeqCst);
235+
Some(guard)
236+
} else {
237+
None
238+
}
239+
}
240+
222241
fn allocate_timestamps(
223242
resp: &TsoResponse,
224243
pending_requests: &mut VecDeque<RequestGroup>,
@@ -445,7 +464,7 @@ mod tests {
445464
}
446465

447466
#[test]
448-
fn poll_next_marks_waiting_flag_when_lock_is_contended() {
467+
fn poll_next_marks_waiting_flag_when_lock_is_contended_and_response_wakes() {
449468
let (stream, _request_tx, pending_requests, self_waker, sender_waiting_on_lock) =
450469
new_test_stream();
451470
let lock_guard = block_on(pending_requests.lock());
@@ -469,68 +488,55 @@ mod tests {
469488
assert!(wake_counter.wakes.load(Ordering::SeqCst) >= 1);
470489
}
471490

472-
/// Simulate the race where the response path clears the flag *before*
473-
/// poll_next sets it. The self-wake guard must fire.
474491
#[test]
475-
fn poll_next_self_wakes_when_flag_cleared_before_store() {
476-
let (stream, _request_tx, pending_requests, _self_waker, sender_waiting_on_lock) =
477-
new_test_stream();
478-
// Hold lock so poll returns Pending.
479-
let lock_guard = block_on(pending_requests.lock());
480-
492+
fn register_sender_wait_sets_waiting_flag_and_registers_waker_on_retry_failure() {
493+
let self_waker = AtomicWaker::new();
494+
let sender_waiting_on_lock = AtomicBool::new(false);
481495
let wake_counter = Arc::new(WakeCounter {
482496
wakes: AtomicUsize::new(0),
483497
});
484498
let test_waker = waker(wake_counter.clone());
485499
let mut cx = Context::from_waker(&test_waker);
486-
let mut stream = Box::pin(stream);
487-
488-
// Pre-clear the flag (simulates response path racing ahead).
489-
sender_waiting_on_lock.store(false, Ordering::SeqCst);
490-
491-
let polled = stream.as_mut().poll_next(&mut cx);
492-
assert!(matches!(polled, Poll::Pending));
493500

494-
// The flag should have been set to true by poll_next.
495-
// Because the flag was not externally cleared *after* the store,
496-
// no self-wake is needed — the flag stays true for the response path
497-
// to observe normally.
501+
let reacquired = register_sender_wait_for_pending_lock(
502+
&mut cx,
503+
&self_waker,
504+
&sender_waiting_on_lock,
505+
|| None::<()>,
506+
);
507+
assert!(reacquired.is_none());
498508
assert!(sender_waiting_on_lock.load(Ordering::SeqCst));
499509

500-
drop(lock_guard);
510+
self_waker.wake();
511+
assert_eq!(wake_counter.wakes.load(Ordering::SeqCst), 1);
501512
}
502513

503-
/// Verify that after the response path clears the flag *and* we simulate
504-
/// that clearing happening between register and store, the sender detects
505-
/// it and self-wakes.
506514
#[test]
507-
fn poll_next_detects_flag_cleared_after_store_and_self_wakes() {
508-
let (stream, _request_tx, pending_requests, _self_waker, sender_waiting_on_lock) =
509-
new_test_stream();
510-
let lock_guard = block_on(pending_requests.lock());
511-
515+
fn register_sender_wait_retries_once_and_clears_waiting_flag_when_lock_reacquires() {
516+
let self_waker = AtomicWaker::new();
517+
let sender_waiting_on_lock = AtomicBool::new(false);
518+
let mut retry_count = 0;
512519
let wake_counter = Arc::new(WakeCounter {
513520
wakes: AtomicUsize::new(0),
514521
});
515522
let test_waker = waker(wake_counter.clone());
516523
let mut cx = Context::from_waker(&test_waker);
517-
let mut stream = Box::pin(stream);
518524

519-
// poll_next will: register waker, store(true), then load to re-check.
520-
// We can't interleave mid-poll, but we can verify the steady-state:
521-
// after poll returns Pending, simulate response clearing the flag
522-
// and confirm wake propagation via self_waker.
523-
let polled = stream.as_mut().poll_next(&mut cx);
524-
assert!(matches!(polled, Poll::Pending));
525-
assert!(sender_waiting_on_lock.load(Ordering::SeqCst));
526-
527-
// Simulate response path: clear the flag (as if between store and load).
528-
sender_waiting_on_lock.store(false, Ordering::SeqCst);
529-
// The registered waker is current, so waking self_waker delivers correctly.
530-
_self_waker.wake();
531-
assert_eq!(wake_counter.wakes.load(Ordering::SeqCst), 1);
532-
533-
drop(lock_guard);
525+
// Simulates the lost-wake interleaving: initial lock contention, then lock
526+
// becomes available before any response-side wake.
527+
let reacquired = register_sender_wait_for_pending_lock(
528+
&mut cx,
529+
&self_waker,
530+
&sender_waiting_on_lock,
531+
|| {
532+
retry_count += 1;
533+
Some(())
534+
},
535+
);
536+
assert_eq!(retry_count, 1);
537+
assert!(reacquired.is_some());
538+
assert!(!sender_waiting_on_lock.load(Ordering::SeqCst));
539+
assert_eq!(wake_counter.wakes.load(Ordering::SeqCst), 0);
534540
}
535541

536542
/// After acquiring the lock, the waiting flag must be cleared.

0 commit comments

Comments
 (0)