Skip to content

Commit 490e53c

Browse files
kixelatedclaude
andcommitted
Fix bandwidth estimation: use Signal directly, fix version matching and lifecycle bugs
- Replace Bandwidth wrapper class with Signal<number | undefined> type alias - Fix Rust version matching to default forward for newest drafts (per convention) - Guard tokio::select! recv_bandwidth arm to prevent early exit on Lite01/Lite02 - Store BandwidthProducer in Session instead of BandwidthConsumer (enables lazy polling) - Add closed flag to JS Subscriber to stop probe loop on close() - Move error handling inside send bandwidth polling loop (per-iteration, not outer) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 771e1c2 commit 490e53c

9 files changed

Lines changed: 44 additions & 66 deletions

File tree

js/lite/src/bandwidth.ts

Lines changed: 8 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,14 @@
1-
import { type Getter, Signal } from "@moq/signals";
1+
import { Signal } from "@moq/signals";
22

33
/**
4-
* A bandwidth estimate that can be read synchronously or observed reactively.
4+
* A bandwidth estimate in bits per second, or undefined if unknown.
55
*
6-
* Created internally by the connection. Consumers read from it via the signal.
6+
* This is a Signal that can be read synchronously via `peek()`,
7+
* observed reactively via `effect.get()`, or updated via `set()`.
78
*/
8-
export class Bandwidth {
9-
#bitrate = new Signal<number | undefined>(undefined);
9+
export type Bandwidth = Signal<number | undefined>;
1010

11-
/** Reactive signal for the current bandwidth estimate in bits per second. */
12-
readonly signal: Getter<number | undefined> = this.#bitrate;
13-
14-
/**
15-
* Update the bandwidth estimate. Called internally by the connection/subscriber.
16-
* @internal
17-
*/
18-
set(bitrate: number | undefined): void {
19-
this.#bitrate.set(bitrate);
20-
}
21-
22-
/** Get the current bandwidth estimate synchronously. */
23-
get(): number | undefined {
24-
return this.#bitrate.peek();
25-
}
26-
27-
/** Wait for the bandwidth estimate to change. */
28-
async changed(): Promise<number | undefined> {
29-
return new Promise<number | undefined>((resolve) => {
30-
this.#bitrate.changed(resolve);
31-
});
32-
}
11+
/** Create a new bandwidth signal. */
12+
export function createBandwidth(): Bandwidth {
13+
return new Signal<number | undefined>(undefined);
3314
}

js/lite/src/lite/connection.ts

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import type { Announced } from "../announced.ts";
2-
import { Bandwidth } from "../bandwidth.ts";
2+
import { type Bandwidth, createBandwidth } from "../bandwidth.ts";
33
import type { Broadcast } from "../broadcast.ts";
44
import type { Established } from "../connection/established.ts";
55
import * as Path from "../path.ts";
@@ -68,8 +68,8 @@ export class Connection implements Established {
6868

6969
// Set up bandwidth estimation for Lite03+.
7070
if (version === Version.DRAFT_03) {
71-
this.sendBandwidth = new Bandwidth();
72-
this.recvBandwidth = new Bandwidth();
71+
this.sendBandwidth = createBandwidth();
72+
this.recvBandwidth = createBandwidth();
7373
}
7474

7575
this.#publisher = new Publisher(this.#quic, this.#version);
@@ -244,18 +244,16 @@ export class Connection implements Established {
244244
if (!getStats) return;
245245

246246
const run = async () => {
247-
try {
248-
while (!this.#closed) {
249-
const timeout = new Promise<void>((resolve) => setTimeout(resolve, SEND_BW_POLL_INTERVAL));
250-
await timeout;
251-
252-
if (this.#closed) break;
247+
while (!this.#closed) {
248+
await new Promise<void>((resolve) => setTimeout(resolve, SEND_BW_POLL_INTERVAL));
249+
if (this.#closed) break;
253250

251+
try {
254252
const stats = await getStats();
255253
bandwidth.set(stats.estimatedSendRate ?? undefined);
254+
} catch {
255+
if (this.#closed) break;
256256
}
257-
} catch {
258-
// Connection closed.
259257
}
260258
};
261259

js/lite/src/lite/subscriber.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -207,13 +207,15 @@ export class Subscriber {
207207
}
208208
}
209209

210+
#closed = false;
211+
210212
/**
211213
* Opens a PROBE bidi stream to receive bandwidth estimates from the publisher.
212214
* Retries on error.
213215
*/
214216
#runProbe() {
215217
const run = async () => {
216-
for (;;) {
218+
while (!this.#closed) {
217219
try {
218220
const stream = await Stream.open(this.#quic);
219221
await stream.writer.u53(StreamId.Probe);
@@ -231,6 +233,8 @@ export class Subscriber {
231233
// Clear the bitrate on disconnect.
232234
this.#recvBandwidth?.set(undefined);
233235

236+
if (this.#closed) break;
237+
234238
// Wait before retrying.
235239
await new Promise((resolve) => setTimeout(resolve, 1000));
236240
}
@@ -240,6 +244,7 @@ export class Subscriber {
240244
}
241245

242246
close() {
247+
this.#closed = true;
243248
for (const track of this.#subscribes.values()) {
244249
track.close();
245250
}

js/publish/src/broadcast.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ export class Broadcast {
6363
return;
6464
}
6565

66-
const estimate = effect.get(connection.sendBandwidth.signal);
66+
const estimate = effect.get(connection.sendBandwidth);
6767
effect.set(this.#sendBandwidth, estimate);
6868
}
6969

js/watch/src/video/source.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,7 @@ export class Source {
219219
const connection = broadcast ? effect.get(broadcast.connection) : undefined;
220220
const recvBw = connection?.recvBandwidth;
221221
if (recvBw) {
222-
const estimate = effect.get(recvBw.signal);
222+
const estimate = effect.get(recvBw);
223223
if (estimate != null) {
224224
// Apply a safety margin (80%) to avoid oscillation.
225225
const safeBitrate = Math.round(estimate * 0.8);

rs/moq-lite/src/ietf/session.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::{
2-
BandwidthConsumer, Error, OriginConsumer, OriginProducer,
2+
BandwidthProducer, Error, OriginConsumer, OriginProducer,
33
coding::{Encode, Reader, Stream, Writer},
44
ietf::{self, FetchHeader, GroupFlags, RequestId},
55
setup,
@@ -8,7 +8,7 @@ use crate::{
88
use super::{Control, Message, Publisher, Subscriber, Version, adapter::ControlStreamAdapter};
99

1010
/// Returned by `start()`: (send_bandwidth, recv_bandwidth)
11-
pub type Bandwidth = (Option<BandwidthConsumer>, Option<BandwidthConsumer>);
11+
pub type Bandwidth = (Option<BandwidthProducer>, Option<BandwidthProducer>);
1212

1313
pub fn start<S: web_transport_trait::Session>(
1414
session: S,

rs/moq-lite/src/lite/session.rs

Lines changed: 8 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,12 @@ use std::time::Duration;
22

33
use web_transport_trait::Stats;
44

5-
use crate::{
6-
BandwidthConsumer, BandwidthProducer, Error, OriginConsumer, OriginProducer, coding::Stream, lite::SessionInfo,
7-
};
5+
use crate::{BandwidthProducer, Error, OriginConsumer, OriginProducer, coding::Stream, lite::SessionInfo};
86

97
use super::{Publisher, Subscriber, Version};
108

119
/// Returned by `start()`: (send_bandwidth, recv_bandwidth)
12-
pub type Bandwidth = (Option<BandwidthConsumer>, Option<BandwidthConsumer>);
10+
pub type Bandwidth = (Option<BandwidthProducer>, Option<BandwidthProducer>);
1311

1412
pub fn start<S: web_transport_trait::Session>(
1513
session: S,
@@ -24,17 +22,12 @@ pub fn start<S: web_transport_trait::Session>(
2422
version: Version,
2523
) -> Result<Bandwidth, Error> {
2624
let send_bw = BandwidthProducer::new();
27-
let send_bw_consumer = send_bw.consume();
25+
let send_bw_task = send_bw.clone();
2826

2927
let recv_bw = BandwidthProducer::new();
30-
let recv_bw_consumer = match version {
31-
Version::Lite03 => Some(recv_bw.consume()),
32-
_ => None,
33-
};
34-
35-
let recv_bw_for_sub = match version {
36-
Version::Lite03 => Some(recv_bw),
37-
_ => None,
28+
let (recv_bw_ret, recv_bw_for_sub) = match version {
29+
Version::Lite01 | Version::Lite02 => (None, None),
30+
_ => (Some(recv_bw.clone()), Some(recv_bw)),
3831
};
3932

4033
let publisher = Publisher::new(session.clone(), publish, version);
@@ -45,7 +38,7 @@ pub fn start<S: web_transport_trait::Session>(
4538
Err(res) = run_session(setup) => Err(res),
4639
res = publisher.run() => res,
4740
res = subscriber.run() => res,
48-
_ = run_send_bandwidth(&session, send_bw) => Ok(()),
41+
_ = run_send_bandwidth(&session, send_bw_task) => Ok(()),
4942
};
5043

5144
match res {
@@ -64,7 +57,7 @@ pub fn start<S: web_transport_trait::Session>(
6457
}
6558
});
6659

67-
Ok((Some(send_bw_consumer), recv_bw_consumer))
60+
Ok((Some(send_bw), recv_bw_ret))
6861
}
6962

7063
/// Polls the QUIC congestion controller for estimated send rate.

rs/moq-lite/src/lite/subscriber.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,12 @@ impl<S: web_transport_trait::Session> Subscriber<S> {
4444
}
4545

4646
pub async fn run(self) -> Result<(), Error> {
47+
let recv_bandwidth_enabled = self.recv_bandwidth.is_some();
4748
let bw = self.clone();
4849
tokio::select! {
4950
Err(err) = self.clone().run_announce() => Err(err),
5051
res = self.run_uni() => res,
51-
_ = bw.run_recv_bandwidth() => Ok(()),
52+
_ = bw.run_recv_bandwidth(), if recv_bandwidth_enabled => Ok(()),
5253
}
5354
}
5455

rs/moq-lite/src/session.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::{future::Future, pin::Pin, sync::Arc};
22

3-
use crate::{BandwidthConsumer, Error, Version};
3+
use crate::{BandwidthConsumer, BandwidthProducer, Error, Version};
44

55
/// A MoQ transport session, wrapping a WebTransport connection.
66
///
@@ -11,17 +11,17 @@ use crate::{BandwidthConsumer, Error, Version};
1111
pub struct Session {
1212
session: Arc<dyn SessionInner>,
1313
version: Version,
14-
send_bandwidth: Option<BandwidthConsumer>,
15-
recv_bandwidth: Option<BandwidthConsumer>,
14+
send_bandwidth: Option<BandwidthProducer>,
15+
recv_bandwidth: Option<BandwidthProducer>,
1616
closed: bool,
1717
}
1818

1919
impl Session {
2020
pub(super) fn new<S: web_transport_trait::Session>(
2121
session: S,
2222
version: Version,
23-
send_bandwidth: Option<BandwidthConsumer>,
24-
recv_bandwidth: Option<BandwidthConsumer>,
23+
send_bandwidth: Option<BandwidthProducer>,
24+
recv_bandwidth: Option<BandwidthProducer>,
2525
) -> Self {
2626
Self {
2727
session: Arc::new(session),
@@ -41,14 +41,14 @@ impl Session {
4141
///
4242
/// Returns `None` if the QUIC backend or MoQ version doesn't support bandwidth estimation.
4343
pub fn send_bandwidth(&self) -> Option<BandwidthConsumer> {
44-
self.send_bandwidth.clone()
44+
self.send_bandwidth.as_ref().map(|p| p.consume())
4545
}
4646

4747
/// Returns a consumer for the estimated receive bitrate (from PROBE).
4848
///
4949
/// Returns `None` if the MoQ version doesn't support PROBE (requires moq-lite-03+).
5050
pub fn recv_bandwidth(&self) -> Option<BandwidthConsumer> {
51-
self.recv_bandwidth.clone()
51+
self.recv_bandwidth.as_ref().map(|p| p.consume())
5252
}
5353

5454
/// Close the underlying transport session.

0 commit comments

Comments
 (0)