Skip to content

Commit 571d044

Browse files
kixelatedclaude
andauthored
Change broadcast replacement strategy to queue backups instead of reannouncing (#1319)
Co-authored-by: Claude <noreply@anthropic.com>
1 parent d351360 commit 571d044

1 file changed

Lines changed: 76 additions & 24 deletions

File tree

rs/moq-lite/src/model/origin.rs

Lines changed: 76 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use std::{
2-
collections::HashMap,
2+
collections::{HashMap, VecDeque},
33
sync::atomic::{AtomicU64, Ordering},
44
};
55
use tokio::sync::mpsc;
@@ -19,11 +19,11 @@ impl ConsumerId {
1919
}
2020
}
2121

22-
// If there are multiple broadcasts with the same path, we use the most recent one but keep the others around.
22+
// If there are multiple broadcasts with the same path, we keep the oldest active and queue the others.
2323
struct OriginBroadcast {
2424
path: PathOwned,
2525
active: BroadcastConsumer,
26-
backup: Vec<BroadcastConsumer>,
26+
backup: VecDeque<BroadcastConsumer>,
2727
}
2828

2929
#[derive(Clone)]
@@ -145,17 +145,15 @@ impl OriginNode {
145145
self.entry(dir).lock().publish(&full, broadcast, &relative);
146146
} else if let Some(existing) = &mut self.broadcast {
147147
// This node is a leaf with an existing broadcast.
148-
let old = existing.active.clone();
149-
existing.active = broadcast.clone();
150-
existing.backup.push(old);
151-
152-
self.notify.lock().reannounce(full, broadcast);
148+
// Keep the older broadcast active; queue the new one as a backup.
149+
// This avoids reannouncing and potentially disrupting subscribers.
150+
existing.backup.push_back(broadcast.clone());
153151
} else {
154152
// This node is a leaf with no existing broadcast.
155153
self.broadcast = Some(OriginBroadcast {
156154
path: full.to_owned(),
157155
active: broadcast.clone(),
158-
backup: Vec::new(),
156+
backup: VecDeque::new(),
159157
});
160158
self.notify.lock().announce(full, broadcast);
161159
}
@@ -227,9 +225,9 @@ impl OriginNode {
227225
// Okay so it must be the active broadcast or else we fucked up.
228226
assert!(entry.active.is_clone(&broadcast));
229227

230-
// If there's a backup broadcast, then announce it.
231-
if let Some(active) = entry.backup.pop() {
232-
entry.active = active;
228+
// If there's a backup broadcast, promote the oldest one.
229+
if let Some(next) = entry.backup.pop_front() {
230+
entry.active = next;
233231
self.notify.lock().reannounce(full, &entry.active);
234232
} else {
235233
// No more backups, so remove the entry.
@@ -367,9 +365,10 @@ impl OriginProducer {
367365
/// Publish a broadcast, announcing it to all consumers.
368366
///
369367
/// The broadcast will be unannounced when it is closed.
370-
/// If there is already a broadcast with the same path, then it will be replaced and reannounced.
371-
/// If the old broadcast is closed before the new one, then nothing will happen.
372-
/// If the new broadcast is closed before the old one, then the old broadcast will be reannounced.
368+
/// If there is already a broadcast with the same path, then the older broadcast remains active
369+
/// and the new one is queued as a backup (no reannounce is triggered).
370+
/// When the active broadcast closes, the oldest queued backup is promoted and reannounced.
371+
/// A queued backup that closes before it is promoted is silently dropped with no announcement.
373372
///
374373
/// Returns false if the broadcast is not allowed to be published.
375374
pub fn publish_broadcast(&self, path: impl AsPath, broadcast: BroadcastConsumer) -> bool {
@@ -624,6 +623,8 @@ mod tests {
624623

625624
#[tokio::test]
626625
async fn test_announce() {
626+
tokio::time::pause();
627+
627628
let origin = Origin::produce();
628629
let broadcast1 = Broadcast::produce();
629630
let broadcast2 = Broadcast::produce();
@@ -688,6 +689,8 @@ mod tests {
688689

689690
#[tokio::test]
690691
async fn test_duplicate() {
692+
tokio::time::pause();
693+
691694
let origin = Origin::produce();
692695

693696
let broadcast1 = Broadcast::produce();
@@ -705,13 +708,11 @@ mod tests {
705708
origin.publish_broadcast("test", consumer3.clone());
706709
assert!(consumer.consume_broadcast("test").is_some());
707710

711+
// Only the oldest broadcast is announced; later publishes go to the backup queue.
708712
consumer.assert_next("test", &consumer1);
709-
consumer.assert_next_none("test");
710-
consumer.assert_next("test", &consumer2);
711-
consumer.assert_next_none("test");
712-
consumer.assert_next("test", &consumer3);
713+
consumer.assert_next_wait();
713714

714-
// Drop the backup, nothing should change.
715+
// Drop a backup, nothing should change.
715716
drop(broadcast2);
716717

717718
// Wait for the async task to run.
@@ -720,18 +721,18 @@ mod tests {
720721
assert!(consumer.consume_broadcast("test").is_some());
721722
consumer.assert_next_wait();
722723

723-
// Drop the active, we should reannounce.
724-
drop(broadcast3);
724+
// Drop the active, we should reannounce with the oldest remaining backup.
725+
drop(broadcast1);
725726

726727
// Wait for the async task to run.
727728
tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
728729

729730
assert!(consumer.consume_broadcast("test").is_some());
730731
consumer.assert_next_none("test");
731-
consumer.assert_next("test", &consumer1);
732+
consumer.assert_next("test", &consumer3);
732733

733734
// Drop the final broadcast, we should unannounce.
734-
drop(broadcast1);
735+
drop(broadcast3);
735736

736737
// Wait for the async task to run.
737738
tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
@@ -741,8 +742,55 @@ mod tests {
741742
consumer.assert_next_wait();
742743
}
743744

745+
#[tokio::test]
746+
async fn test_duplicate_fifo_order() {
747+
tokio::time::pause();
748+
749+
let origin = Origin::produce();
750+
751+
let broadcast1 = Broadcast::produce();
752+
let broadcast2 = Broadcast::produce();
753+
let broadcast3 = Broadcast::produce();
754+
755+
let consumer1 = broadcast1.consume();
756+
let consumer2 = broadcast2.consume();
757+
let consumer3 = broadcast3.consume();
758+
759+
let mut consumer = origin.consume();
760+
761+
origin.publish_broadcast("test", consumer1.clone());
762+
origin.publish_broadcast("test", consumer2.clone());
763+
origin.publish_broadcast("test", consumer3.clone());
764+
765+
// The oldest broadcast is active; the rest are queued in publish order.
766+
consumer.assert_next("test", &consumer1);
767+
consumer.assert_next_wait();
768+
769+
// Drop the active; the next-oldest (not the newest) should be promoted.
770+
drop(broadcast1);
771+
tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
772+
consumer.assert_next_none("test");
773+
consumer.assert_next("test", &consumer2);
774+
consumer.assert_next_wait();
775+
776+
// Drop the now-active; the remaining backup is promoted.
777+
drop(broadcast2);
778+
tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
779+
consumer.assert_next_none("test");
780+
consumer.assert_next("test", &consumer3);
781+
consumer.assert_next_wait();
782+
783+
// Drop the last broadcast; the entry is fully unannounced.
784+
drop(broadcast3);
785+
tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
786+
consumer.assert_next_none("test");
787+
consumer.assert_next_wait();
788+
}
789+
744790
#[tokio::test]
745791
async fn test_duplicate_reverse() {
792+
tokio::time::pause();
793+
746794
let origin = Origin::produce();
747795
let broadcast1 = Broadcast::produce();
748796
let broadcast2 = Broadcast::produce();
@@ -767,6 +815,8 @@ mod tests {
767815

768816
#[tokio::test]
769817
async fn test_double_publish() {
818+
tokio::time::pause();
819+
770820
let origin = Origin::produce();
771821
let broadcast = Broadcast::produce();
772822

@@ -1322,6 +1372,8 @@ mod tests {
13221372
// Verify unannounce also doesn't panic with trailing slash
13231373
#[tokio::test]
13241374
async fn test_with_root_trailing_slash_unannounce() {
1375+
tokio::time::pause();
1376+
13251377
let origin = Origin::produce();
13261378

13271379
let prefix = "some_prefix/".to_string();

0 commit comments

Comments
 (0)