Add bandwidth estimation for adaptive bitrate control#1208
Conversation
Expose BandwidthProducer/Consumer pairs on Session (Rust) and Connection (JS) for estimated send rate (from congestion controller, polled every 100ms) and receive rate (from PROBE mechanism, moq-lite-03+ only). The subscriber now initiates PROBE streams lazily when consumers exist. @moq/watch uses recv bandwidth for automatic ABR rendition selection, and @moq/publish caps encoder bitrate to the estimated send bandwidth. https://claude.ai/code/session_01C68pEkoUhQqtQALYxRajhx
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (2)
✅ Files skipped from review due to trivial changes (2)
WalkthroughAdds end-to-end bandwidth estimate propagation. JS: introduces a 🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
✨ Simplify code
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 7
🧹 Nitpick comments (3)
js/lite/src/lite/subscriber.ts (1)
234-235: Consider using a named constant for the retry delay.The
1000millisecond retry delay is a magic number. Per coding guidelines, consider defining it as a named constant for clarity.♻️ Suggested refactor
+/** Delay before retrying probe stream connection. */ +const PROBE_RETRY_DELAY_MS = 1000; + // Wait before retrying. -await new Promise((resolve) => setTimeout(resolve, 1000)); +await new Promise((resolve) => setTimeout(resolve, PROBE_RETRY_DELAY_MS));🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@js/lite/src/lite/subscriber.ts` around lines 234 - 235, Replace the magic number 1000 used for the retry delay in subscriber.ts with a named constant (e.g., RETRY_DELAY_MS) defined at module scope; define RETRY_DELAY_MS = 1000 (or pull from existing config if one exists) and replace the inline await new Promise((resolve) => setTimeout(resolve, 1000)) in the retry logic with await new Promise((resolve) => setTimeout(resolve, RETRY_DELAY_MS)) so the delay is clear and easy to adjust, referencing the constant in the retry loop inside the function where the wait occurs.js/publish/src/video/encoder.ts (1)
214-224: Consider extracting magic numbers as named constants.The
0.9safety margin and100_000minimum bitrate are magic numbers. Per coding guidelines, consider defining these as named constants at module level for clarity and easier maintenance.♻️ Suggested refactor
+/** Safety margin for send bandwidth to reserve capacity for audio and protocol overhead. */ +const SEND_BANDWIDTH_SAFETY_MARGIN = 0.9; + +/** Minimum encoder bitrate to keep encoding usable (100 kbps). */ +const MIN_ENCODER_BITRATE = 100_000; + // ... in `#runConfig` method: // If no explicit maxBitrate, cap to the estimated send bandwidth (with 90% safety margin). if (!user.maxBitrate) { const sendBw = effect.get(this.sendBandwidth); if (sendBw != null) { - // Reserve ~10% for audio and protocol overhead. - const cap = Math.round(sendBw * 0.9); - // Don't go below 100kbps to keep encoding usable. - const MIN_BITRATE = 100_000; - bitrate = Math.max(MIN_BITRATE, Math.min(bitrate, cap)); + const cap = Math.round(sendBw * SEND_BANDWIDTH_SAFETY_MARGIN); + bitrate = Math.max(MIN_ENCODER_BITRATE, Math.min(bitrate, cap)); } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@js/publish/src/video/encoder.ts` around lines 214 - 224, Extract the magic numbers in the bitrate cap logic into descriptive module-level constants: replace the literal 0.9 used when computing cap (sendBw * 0.9) with a constant like SEND_BANDWIDTH_SAFETY_MARGIN and replace the local MIN_BITRATE = 100_000 with a module-level MIN_VIDEO_BITRATE (or similar) and use those in the block that references user.maxBitrate, effect.get(this.sendBandwidth), and bitrate; remove the locally-defined MIN_BITRATE and ensure the new constants are exported or documented at the top of the file for clarity and maintainability.js/watch/src/video/source.ts (1)
215-229: Consider extracting the safety margin as a named constant.The
0.8multiplier is a magic number. Per coding guidelines, consider using a named constant to clarify its purpose and make it easier to adjust if needed.♻️ Suggested refactor
+/** Safety margin for receive bandwidth estimation to avoid oscillation. */ +const RECV_BANDWIDTH_SAFETY_MARGIN = 0.8; + // If no explicit bitrate target, use the recv bandwidth estimate from the connection. let effectiveTarget = target; if (!target?.bitrate && !target?.name) { const broadcast = effect.get(this.broadcast); const connection = broadcast ? effect.get(broadcast.connection) : undefined; const recvBw = connection?.recvBandwidth; if (recvBw) { const estimate = effect.get(recvBw.signal); if (estimate != null) { - // Apply a safety margin (80%) to avoid oscillation. - const safeBitrate = Math.round(estimate * 0.8); + const safeBitrate = Math.round(estimate * RECV_BANDWIDTH_SAFETY_MARGIN); effectiveTarget = { ...target, bitrate: safeBitrate }; } } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@js/watch/src/video/source.ts` around lines 215 - 229, Extract the magic constant 0.8 into a clearly named constant (e.g., SAFETY_MARGIN or RECV_BW_SAFETY_MARGIN) and use it where you compute safeBitrate in the block that sets effectiveTarget from target.recvBandwidth estimate; update the code referencing effectiveTarget, target, recvBw, estimate, and safeBitrate so the multiplier is replaced by the constant and add a brief comment describing its purpose (apply an 80% safety margin to avoid oscillation).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@js/lite/src/bandwidth.ts`:
- Around line 28-32: The changed() method creates a Promise but never calls the
dispose function returned by this.#bitrate.changed, leaking the subscription;
modify changed() to capture the disposer (e.g., const dispose =
this.#bitrate.changed(callback)), call dispose() immediately after resolving the
promise (including in any error/finally path), and ensure the resolver is
invoked only once so the callback is removed from the Signal's internal Set
(refer to changed() and this.#bitrate.changed to locate the code).
In `@js/lite/src/lite/connection.ts`:
- Around line 69-73: The code eagerly creates Bandwidth instances for draft-03
in the constructor, which starts the 100ms poller even when nobody observes
estimates; instead, make sendBandwidth and recvBandwidth lazy and only start
polling when there are active subscribers: remove immediate instantiation in the
constructor and either (a) change sendBandwidth/recvBandwidth to getters that
instantiate new Bandwidth() on first access, or (b) keep optional fields and
have the Bandwidth implementation start its poller only when its subscribe
method is called (checking subscriber count > 0); ensure the polling start/stop
logic is tied to Bandwidth.subscribe/unsubscribe so no background timer runs
until a consumer subscribes, and reference sendBandwidth, recvBandwidth,
Bandwidth, and the subscribe/unsubscribe methods when implementing the change.
- Around line 246-259: The run() loop currently has a single outer try/catch
that aborts the whole polling when any iteration throws; change this so each
poll iteration handles its own errors and the loop continues until this.#closed
is true. Move the try/catch inside the while loop (or wrap the await getStats()
and bandwidth.set(...) calls) so transient errors are caught per-iteration
(optionally log the error) and then continue; still break out if this.#closed
becomes true. Ensure SEND_BW_POLL_INTERVAL sleep/timeout logic and the
getStats()/bandwidth.set(...) calls remain unchanged except for the new
per-iteration error handling so the estimator isn't permanently terminated by a
single failure.
In `@js/lite/src/lite/subscriber.ts`:
- Around line 210-240: The probe loop started by `#runProbe` currently spawns an
untracked async task (run) that never stops; add a cancellation mechanism (e.g.,
a private `#closed` boolean or an AbortController stored on the Subscriber
instance) and set it in Subscriber.close(), then modify the run function inside
`#runProbe` to check that cancellation flag/controller before each retry and
before waiting on the timeout and to break out of the outer for(;;) when closed;
also ensure any in-flight Stream.open/reads respect the abort signal or are
closed when closed is set, and keep the existing bitrate-clear behavior but only
retry while not closed.
In `@rs/moq-lite/src/lite/session.rs`:
- Around line 29-38: The current matching defaults newer Lite versions to None
for recv-bandwidth (recv_bw_consumer and recv_bw_for_sub) causing PROBE to be
disabled for future drafts; change the matches to explicitly list older drafts
(e.g., Version::Lite01 | Version::Lite02) returning None and use the catch-all
arm (_) to enable Some(...) for the recv_bw consumer and recv_bw_for_sub so the
newest/default draft behavior remains enabled; adjust the match arms around
recv_bw_consumer and recv_bw_for_sub accordingly, referencing the recv_bw,
recv_bw_consumer and recv_bw_for_sub symbols and the Version enum variants.
In `@rs/moq-lite/src/lite/subscriber.rs`:
- Around line 46-52: Subscriber::run() currently lets the run_recv_bandwidth()
arm always participate in tokio::select!, so when recv_bandwidth is None that
immediate-ready future wins and causes an early Ok(()) return. Fix by guarding
that select arm so it only participates when recv_bandwidth is present: change
the select branch for bw.run_recv_bandwidth() to be conditional (e.g., using an
if guard checking self.recv_bandwidth.is_some() or by constructing an
Option/future that is only awaited when Some) so that run_recv_bandwidth()
cannot win when recv_bandwidth == None; adjust references to Subscriber::run,
run_recv_bandwidth, and the recv_bandwidth field accordingly.
In `@rs/moq-lite/src/session.rs`:
- Around line 14-15: Session currently stores BandwidthConsumer instances
(fields send_bandwidth and recv_bandwidth) which keeps consumer count >0 and
prevents producer.unused() from completing; replace those fields to store
BandwidthProducer instead, and update the accessor methods (e.g.,
get_send_bandwidth/get_recv_bandwidth or equivalent) to mint/clone a new
BandwidthConsumer from the stored BandwidthProducer on demand; ensure
run_send_bandwidth and run_recv_bandwidth continue to use the producer and rely
on producer.unused() to quiesce when no consumers remain.
---
Nitpick comments:
In `@js/lite/src/lite/subscriber.ts`:
- Around line 234-235: Replace the magic number 1000 used for the retry delay in
subscriber.ts with a named constant (e.g., RETRY_DELAY_MS) defined at module
scope; define RETRY_DELAY_MS = 1000 (or pull from existing config if one exists)
and replace the inline await new Promise((resolve) => setTimeout(resolve, 1000))
in the retry logic with await new Promise((resolve) => setTimeout(resolve,
RETRY_DELAY_MS)) so the delay is clear and easy to adjust, referencing the
constant in the retry loop inside the function where the wait occurs.
In `@js/publish/src/video/encoder.ts`:
- Around line 214-224: Extract the magic numbers in the bitrate cap logic into
descriptive module-level constants: replace the literal 0.9 used when computing
cap (sendBw * 0.9) with a constant like SEND_BANDWIDTH_SAFETY_MARGIN and replace
the local MIN_BITRATE = 100_000 with a module-level MIN_VIDEO_BITRATE (or
similar) and use those in the block that references user.maxBitrate,
effect.get(this.sendBandwidth), and bitrate; remove the locally-defined
MIN_BITRATE and ensure the new constants are exported or documented at the top
of the file for clarity and maintainability.
In `@js/watch/src/video/source.ts`:
- Around line 215-229: Extract the magic constant 0.8 into a clearly named
constant (e.g., SAFETY_MARGIN or RECV_BW_SAFETY_MARGIN) and use it where you
compute safeBitrate in the block that sets effectiveTarget from
target.recvBandwidth estimate; update the code referencing effectiveTarget,
target, recvBw, estimate, and safeBitrate so the multiplier is replaced by the
constant and add a brief comment describing its purpose (apply an 80% safety
margin to avoid oscillation).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 5c7ed8ec-4faa-405a-a539-30c234635711
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (17)
js/lite/src/bandwidth.tsjs/lite/src/connection/established.tsjs/lite/src/index.tsjs/lite/src/lite/connection.tsjs/lite/src/lite/subscriber.tsjs/publish/src/broadcast.tsjs/publish/src/video/encoder.tsjs/publish/src/video/index.tsjs/watch/src/video/source.tsrs/moq-lite/src/client.rsrs/moq-lite/src/ietf/session.rsrs/moq-lite/src/lite/session.rsrs/moq-lite/src/lite/subscriber.rsrs/moq-lite/src/model/bandwidth.rsrs/moq-lite/src/model/mod.rsrs/moq-lite/src/server.rsrs/moq-lite/src/session.rs
| async changed(): Promise<number | undefined> { | ||
| return new Promise<number | undefined>((resolve) => { | ||
| this.#bitrate.changed(resolve); | ||
| }); | ||
| } |
There was a problem hiding this comment.
Subscription leak in changed() — the dispose function is not called.
The changed() callback returned by Signal.changed() is not disposed after the promise resolves. Per the pattern in js/lite/src/lite/publisher.ts:102-112, you must capture and call the dispose function to prevent the callback from persisting in the signal's internal Set.
🐛 Proposed fix
/** Wait for the bandwidth estimate to change. */
async changed(): Promise<number | undefined> {
- return new Promise<number | undefined>((resolve) => {
- this.#bitrate.changed(resolve);
- });
+ return new Promise<number | undefined>((resolve) => {
+ const dispose = this.#bitrate.changed((value) => {
+ dispose();
+ resolve(value);
+ });
+ });
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| async changed(): Promise<number | undefined> { | |
| return new Promise<number | undefined>((resolve) => { | |
| this.#bitrate.changed(resolve); | |
| }); | |
| } | |
| async changed(): Promise<number | undefined> { | |
| return new Promise<number | undefined>((resolve) => { | |
| const dispose = this.#bitrate.changed((value) => { | |
| dispose(); | |
| resolve(value); | |
| }); | |
| }); | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@js/lite/src/bandwidth.ts` around lines 28 - 32, The changed() method creates
a Promise but never calls the dispose function returned by
this.#bitrate.changed, leaking the subscription; modify changed() to capture the
disposer (e.g., const dispose = this.#bitrate.changed(callback)), call dispose()
immediately after resolving the promise (including in any error/finally path),
and ensure the resolver is invoked only once so the callback is removed from the
Signal's internal Set (refer to changed() and this.#bitrate.changed to locate
the code).
| // Set up bandwidth estimation for Lite03+. | ||
| if (version === Version.DRAFT_03) { | ||
| this.sendBandwidth = new Bandwidth(); | ||
| this.recvBandwidth = new Bandwidth(); | ||
| } |
There was a problem hiding this comment.
This condition doesn't implement the promised laziness.
Lines 69-72 create sendBandwidth for every draft-03 connection, so Line 105 is effectively just a version check. That starts the 100 ms polling loop even when nothing observes the estimate, which defeats the stated "only when consumers exist" behavior and adds a permanent background wakeup per connection. Please gate the poller on actual Bandwidth subscribers instead of property existence.
Also applies to: 104-107
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@js/lite/src/lite/connection.ts` around lines 69 - 73, The code eagerly
creates Bandwidth instances for draft-03 in the constructor, which starts the
100ms poller even when nobody observes estimates; instead, make sendBandwidth
and recvBandwidth lazy and only start polling when there are active subscribers:
remove immediate instantiation in the constructor and either (a) change
sendBandwidth/recvBandwidth to getters that instantiate new Bandwidth() on first
access, or (b) keep optional fields and have the Bandwidth implementation start
its poller only when its subscribe method is called (checking subscriber count >
0); ensure the polling start/stop logic is tied to
Bandwidth.subscribe/unsubscribe so no background timer runs until a consumer
subscribes, and reference sendBandwidth, recvBandwidth, Bandwidth, and the
subscribe/unsubscribe methods when implementing the change.
…timation-api-heK3r
…nd lifecycle bugs - Replace Bandwidth wrapper class with Signal<number | undefined> type alias - Fix Rust version matching to default forward for newest drafts (per convention) - Guard tokio::select! recv_bandwidth arm to prevent early exit on Lite01/Lite02 - Store BandwidthProducer in Session instead of BandwidthConsumer (enables lazy polling) - Add closed flag to JS Subscriber to stop probe loop on close() - Move error handling inside send bandwidth polling loop (per-iteration, not outer) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
🧹 Nitpick comments (1)
js/lite/src/lite/subscriber.ts (1)
210-244: Probe loop cancellation fix looks good, but Stream.open() call is not cancellable.The
#closedflag properly addresses the previous review feedback. However, ifclose()is called whileStream.open()(line 220) orProbe.decodeMaybe()(line 224) is awaiting, the loop will still complete that iteration before checking#closed. This is acceptable behavior given the short-lived nature of these operations, but worth noting.Minor: consider extracting the retry delay to a named constant for clarity.
+const PROBE_RETRY_DELAY = 1000; // ms + `#runProbe`() { const run = async () => { while (!this.#closed) { // ... // Wait before retrying. - await new Promise((resolve) => setTimeout(resolve, 1000)); + await new Promise((resolve) => setTimeout(resolve, PROBE_RETRY_DELAY)); } };🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@js/lite/src/lite/subscriber.ts` around lines 210 - 244, The probe loop currently awaits non-cancellable operations (Stream.open and Probe.decodeMaybe) so close() can’t stop an in-flight await; to address this, after awaiting Stream.open(...) in `#runProbe` and after the inner Probe.decodeMaybe loop add an immediate check of this.#closed and break out if set, and also extract the hard-coded retry delay (1000) into a named constant (e.g., RETRY_DELAY_MS) and use it in the setTimeout to improve clarity; keep references to Stream.open, Probe.decodeMaybe, `#runProbe`, `#closed`, and the retry delay constant when making the changes.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@js/lite/src/lite/subscriber.ts`:
- Around line 210-244: The probe loop currently awaits non-cancellable
operations (Stream.open and Probe.decodeMaybe) so close() can’t stop an
in-flight await; to address this, after awaiting Stream.open(...) in `#runProbe`
and after the inner Probe.decodeMaybe loop add an immediate check of
this.#closed and break out if set, and also extract the hard-coded retry delay
(1000) into a named constant (e.g., RETRY_DELAY_MS) and use it in the setTimeout
to improve clarity; keep references to Stream.open, Probe.decodeMaybe,
`#runProbe`, `#closed`, and the retry delay constant when making the changes.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 9e90e23f-a353-42d6-baf3-9a717c346bd8
📒 Files selected for processing (9)
js/lite/src/bandwidth.tsjs/lite/src/lite/connection.tsjs/lite/src/lite/subscriber.tsjs/publish/src/broadcast.tsjs/watch/src/video/source.tsrs/moq-lite/src/ietf/session.rsrs/moq-lite/src/lite/session.rsrs/moq-lite/src/lite/subscriber.rsrs/moq-lite/src/session.rs
✅ Files skipped from review due to trivial changes (1)
- js/watch/src/video/source.ts
🚧 Files skipped from review as they are similar to previous changes (2)
- js/lite/src/bandwidth.ts
- rs/moq-lite/src/session.rs
- BandwidthProducer: add close(err)/closed() for cancellation, rename get() to peek() - Move send bandwidth polling to Session (version-agnostic, depends on QUIC backend support not protocol version) - Remove send bandwidth from lite/ietf start() return values - Make recv bandwidth errors fatal (no retry loop) - Use forward-compatible version matching (list older versions explicitly) - JS: make send bandwidth version-agnostic (check getStats existence), use setInterval with closed promise instead of fire-and-forget - JS: make probe async and fatal, called from connection Promise.all - JS watch: handle target.name early before ABR, remove MIN_BITRATE - JS publish: read sendBandwidth from connection directly instead of piping through props - CLAUDE.md: add JS async patterns convention (Effect.interval/timer) https://claude.ai/code/session_01C68pEkoUhQqtQALYxRajhx
Remove unused #closed field in subscriber.ts and fix Bandwidth type usage in encoder.ts (Bandwidth is Signal directly, not a wrapper). https://claude.ai/code/session_01C68pEkoUhQqtQALYxRajhx
d76687b to
94a5f10
Compare
There was a problem hiding this comment.
Actionable comments posted: 3
♻️ Duplicate comments (3)
rs/moq-lite/src/session.rs (1)
16-17:⚠️ Potential issue | 🟠 MajorDon't keep live
BandwidthConsumerhandles insideSession.These fields keep the producer count permanently non-zero, so the new lazy gating never shuts off. It also leaves the send-bandwidth producer owned only by the background task, which makes the
producer.closed()branch unreachable and prevents session shutdown from closing the estimator cleanly. Keep producers inSession, close them fromclose()/Drop, and mint fresh consumers from the accessors instead. This will also need thelite::start()/Session::new()handoff to carry a producer for recv bandwidth instead of a consumer.Also applies to: 22-46, 59-68
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rs/moq-lite/src/session.rs` around lines 16 - 17, Session currently stores BandwidthConsumer handles (send_bandwidth, recv_bandwidth) which keep producer counts non-zero; change Session to own BandwidthProducer fields instead and have accessor methods create/mint fresh BandwidthConsumer instances on demand, close the producers from Session::close() and Drop to allow lazy gating to shut off and let producer.closed() branches run, and update lite::start() / Session::new() to hand off a recv BandwidthProducer (not a consumer) into the Session constructor/initialization so ownership is correct.js/lite/src/lite/connection.ts (2)
66-75:⚠️ Potential issue | 🟠 MajorThis still starts bandwidth work eagerly.
The constructor creates
sendBandwidth/recvBandwidthup front, and#run()starts the poller and PROBE task whenever those fields exist. That makes every supported connection pay the 100 ms wakeup and PROBE-stream cost even when nothing observes the estimate. Tie startup/shutdown to actualBandwidthconsumers instead of property existence.Also applies to: 99-107
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@js/lite/src/lite/connection.ts` around lines 66 - 75, The constructor currently eagerly constructs sendBandwidth and recvBandwidth (via createBandwidth), which causes the poller/PROBE tasks to start for every connection; change this to lazy creation and lifecycle tied to actual consumers: remove immediate assignments in the constructor, add lazy accessors (e.g., getOrCreateSendBandwidth and getOrCreateRecvBandwidth that call createBandwidth on first use) and maintain a consumer count or subscription API on the Bandwidth instances so consumers call acquire/release; update run() and the PROBE/poller startup logic to only start when a Bandwidth instance has active consumers and to stop when consumer count drops to zero; adjust related code paths that currently check property existence to use the accessors or active-consumer checks (affects sendBandwidth, recvBandwidth, createBandwidth, and run — also apply same lazy pattern to the code referenced around lines 99-107).
210-223:⚠️ Potential issue | 🟠 MajorDon't permanently stop sampling after one
getStats()failure.Any thrown stats read resolves this task forever, so downstream ABR keeps using the last send estimate for the rest of the connection. Treat failed samples as unavailable and let
closedbe the condition that actually stops the poller.🛠️ Proposed fix
const id = setInterval(async () => { try { const stats = await quic.getStats(); bandwidth.set(stats.estimatedSendRate ?? undefined); } catch { - clearInterval(id); - resolve(); + bandwidth.set(undefined); } }, SEND_BW_POLL_INTERVAL);🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@js/lite/src/lite/connection.ts` around lines 210 - 223, The poller currently stops on the first quic.getStats() failure because the catch block clears the interval and resolves the Promise; change it so a failed sample is treated as unavailable instead of stopping the poller: in the setInterval handler (where quic.getStats() is called) remove the clearInterval(id) and resolve() from the catch, and instead call bandwidth.set(undefined) or otherwise ignore the sample error so polling continues; keep the clearInterval(id) and resolve() only in the this.closed.then(...) handler so SEND_BW_POLL_INTERVAL-driven polling continues until this.closed.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@js/lite/src/lite/subscriber.ts`:
- Around line 217-223: The loop treating an undefined probe from
Probe.decodeMaybe as a harmless EOF should instead treat it as fatal so ABR
updates don't silently stop; in the subscriber where you open the probe stream
(Stream.open and stream.writer.u53 with StreamId.Probe) replace the permissive
decodeMaybe usage with a strict decode (or explicitly throw when
Probe.decodeMaybe returns undefined) so that when
Probe.decodeMaybe(stream.reader, this.version) yields undefined you throw an
Error (or propagate) to terminate the session and avoid freezing
this.#recvBandwidth; ensure Connection.#run() will handle the thrown error as a
connection-level failure.
In `@js/publish/src/video/encoder.ts`:
- Around line 215-224: The adaptive bitrate cap can drive bitrate below the 100
kbps floor; define a named minimum constant (e.g., MIN_BITRATE_BPS = 100_000)
and ensure the computed cap respects it before applying to bitrate — replace the
direct use of cap in the Math.min call with Math.max(cap, MIN_BITRATE_BPS)
(references: user.maxBitrate check, conn = effect.get(this.connection), sendBw,
estimate, cap, and bitrate in encoder.ts) so the adaptive path never sets
bitrate under the documented minimum.
In `@rs/moq-lite/src/lite/subscriber.rs`:
- Around line 135-163: The run_recv_bandwidth task currently returns after
bandwidth.unused(), preventing future consumers from restarting probing; change
run_recv_bandwidth to loop so it re-enters probing when a new consumer appears:
wrap the Stream::open/Probe send and the tokio::select! in an inner loop, call
bandwidth.used().await? at the start of each outer iteration, and on
bandwidth.unused() call stream.writer.finish() and await stream.writer.closed()?
then break the inner loop (not return) so the outer loop continues and waits for
bandwidth.used().await? again; keep the existing closed() branch returning as
before. Use the existing symbols: run_recv_bandwidth, bandwidth.unused(),
bandwidth.used(), Stream::open, stream.writer.finish(), stream.writer.closed(),
and stream.reader.decode to locate and modify the code.
---
Duplicate comments:
In `@js/lite/src/lite/connection.ts`:
- Around line 66-75: The constructor currently eagerly constructs sendBandwidth
and recvBandwidth (via createBandwidth), which causes the poller/PROBE tasks to
start for every connection; change this to lazy creation and lifecycle tied to
actual consumers: remove immediate assignments in the constructor, add lazy
accessors (e.g., getOrCreateSendBandwidth and getOrCreateRecvBandwidth that call
createBandwidth on first use) and maintain a consumer count or subscription API
on the Bandwidth instances so consumers call acquire/release; update run() and
the PROBE/poller startup logic to only start when a Bandwidth instance has
active consumers and to stop when consumer count drops to zero; adjust related
code paths that currently check property existence to use the accessors or
active-consumer checks (affects sendBandwidth, recvBandwidth, createBandwidth,
and run — also apply same lazy pattern to the code referenced around lines
99-107).
- Around line 210-223: The poller currently stops on the first quic.getStats()
failure because the catch block clears the interval and resolves the Promise;
change it so a failed sample is treated as unavailable instead of stopping the
poller: in the setInterval handler (where quic.getStats() is called) remove the
clearInterval(id) and resolve() from the catch, and instead call
bandwidth.set(undefined) or otherwise ignore the sample error so polling
continues; keep the clearInterval(id) and resolve() only in the
this.closed.then(...) handler so SEND_BW_POLL_INTERVAL-driven polling continues
until this.closed.
In `@rs/moq-lite/src/session.rs`:
- Around line 16-17: Session currently stores BandwidthConsumer handles
(send_bandwidth, recv_bandwidth) which keep producer counts non-zero; change
Session to own BandwidthProducer fields instead and have accessor methods
create/mint fresh BandwidthConsumer instances on demand, close the producers
from Session::close() and Drop to allow lazy gating to shut off and let
producer.closed() branches run, and update lite::start() / Session::new() to
hand off a recv BandwidthProducer (not a consumer) into the Session
constructor/initialization so ownership is correct.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: cb0c731b-8735-427e-a396-2e9cb52639ae
📒 Files selected for processing (13)
CLAUDE.mdjs/lite/src/lite/connection.tsjs/lite/src/lite/subscriber.tsjs/publish/src/broadcast.tsjs/publish/src/video/encoder.tsjs/publish/src/video/index.tsjs/watch/src/video/source.tsrs/moq-lite/src/client.rsrs/moq-lite/src/lite/session.rsrs/moq-lite/src/lite/subscriber.rsrs/moq-lite/src/model/bandwidth.rsrs/moq-lite/src/server.rsrs/moq-lite/src/session.rs
✅ Files skipped from review due to trivial changes (2)
- CLAUDE.md
- js/watch/src/video/source.ts
🚧 Files skipped from review as they are similar to previous changes (4)
- js/publish/src/broadcast.ts
- js/publish/src/video/index.ts
- rs/moq-lite/src/client.rs
- rs/moq-lite/src/server.rs
| const stream = await Stream.open(this.#quic); | ||
| await stream.writer.u53(StreamId.Probe); | ||
|
|
||
| for (;;) { | ||
| const probe = await Probe.decodeMaybe(stream.reader, this.version); | ||
| if (!probe) break; | ||
| this.#recvBandwidth.set(probe.bitrate); |
There was a problem hiding this comment.
Treat an unexpected PROBE EOF as fatal.
decodeMaybe() turns a peer-closed PROBE stream into a clean return, so Connection.#run() keeps the session alive and recvBandwidth just freezes on the last sample. If this stream is supposed to live for the connection, throw on undefined here (or switch to the strict decode path) so ABR doesn't silently stop updating.
🐛 Proposed fix
for (;;) {
const probe = await Probe.decodeMaybe(stream.reader, this.version);
- if (!probe) break;
+ if (!probe) {
+ throw new Error("probe stream closed");
+ }
this.#recvBandwidth.set(probe.bitrate);
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| const stream = await Stream.open(this.#quic); | |
| await stream.writer.u53(StreamId.Probe); | |
| for (;;) { | |
| const probe = await Probe.decodeMaybe(stream.reader, this.version); | |
| if (!probe) break; | |
| this.#recvBandwidth.set(probe.bitrate); | |
| const stream = await Stream.open(this.#quic); | |
| await stream.writer.u53(StreamId.Probe); | |
| for (;;) { | |
| const probe = await Probe.decodeMaybe(stream.reader, this.version); | |
| if (!probe) { | |
| throw new Error("probe stream closed"); | |
| } | |
| this.#recvBandwidth.set(probe.bitrate); |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@js/lite/src/lite/subscriber.ts` around lines 217 - 223, The loop treating an
undefined probe from Probe.decodeMaybe as a harmless EOF should instead treat it
as fatal so ABR updates don't silently stop; in the subscriber where you open
the probe stream (Stream.open and stream.writer.u53 with StreamId.Probe) replace
the permissive decodeMaybe usage with a strict decode (or explicitly throw when
Probe.decodeMaybe returns undefined) so that when
Probe.decodeMaybe(stream.reader, this.version) yields undefined you throw an
Error (or propagate) to terminate the session and avoid freezing
this.#recvBandwidth; ensure Connection.#run() will handle the thrown error as a
connection-level failure.
| // If no explicit maxBitrate, cap to the estimated send bandwidth (with 90% safety margin). | ||
| if (!user.maxBitrate) { | ||
| const conn = effect.get(this.connection); | ||
| const sendBw = conn?.sendBandwidth; | ||
| if (sendBw) { | ||
| const estimate = effect.get(sendBw); | ||
| if (estimate != null) { | ||
| // Reserve ~10% for audio and protocol overhead. | ||
| const cap = Math.round(estimate * 0.9); | ||
| bitrate = Math.min(bitrate, cap); |
There was a problem hiding this comment.
Keep the adaptive cap above the documented 100 kbps floor.
This path can drive bitrate below 100 kbps whenever the estimate is small, which breaks the PR's minimum-bitrate guarantee and can configure unusably low video rates.
💡 Proposed fix
+const SEND_BANDWIDTH_HEADROOM = 0.9;
+const MIN_ADAPTIVE_BITRATE = 100_000;
+
// If no explicit maxBitrate, cap to the estimated send bandwidth (with 90% safety margin).
if (!user.maxBitrate) {
const conn = effect.get(this.connection);
const sendBw = conn?.sendBandwidth;
if (sendBw) {
const estimate = effect.get(sendBw);
if (estimate != null) {
// Reserve ~10% for audio and protocol overhead.
- const cap = Math.round(estimate * 0.9);
+ const cap = Math.max(
+ MIN_ADAPTIVE_BITRATE,
+ Math.round(estimate * SEND_BANDWIDTH_HEADROOM),
+ );
bitrate = Math.min(bitrate, cap);
}
}
}As per coding guidelines, "Avoid using magic numbers; use named constants instead."
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@js/publish/src/video/encoder.ts` around lines 215 - 224, The adaptive bitrate
cap can drive bitrate below the 100 kbps floor; define a named minimum constant
(e.g., MIN_BITRATE_BPS = 100_000) and ensure the computed cap respects it before
applying to bitrate — replace the direct use of cap in the Math.min call with
Math.max(cap, MIN_BITRATE_BPS) (references: user.maxBitrate check, conn =
effect.get(this.connection), sendBw, estimate, cap, and bitrate in encoder.ts)
so the adaptive path never sets bitrate under the documented minimum.
| async fn run_recv_bandwidth(self) -> Result<(), Error> { | ||
| let Some(bandwidth) = &self.recv_bandwidth else { | ||
| return Ok(()); | ||
| }; | ||
|
|
||
| bandwidth.used().await?; | ||
|
|
||
| let mut stream = Stream::open(&self.session, self.version).await?; | ||
| stream.writer.encode(&lite::ControlType::Probe).await?; | ||
|
|
||
| loop { | ||
| tokio::select! { | ||
| biased; | ||
| _ = bandwidth.closed() => { | ||
| stream.writer.finish()?; | ||
| return stream.writer.closed().await; | ||
| } | ||
| res = bandwidth.unused() => { | ||
| res?; | ||
| // No more consumers, close the probe stream. | ||
| stream.writer.finish()?; | ||
| return stream.writer.closed().await; | ||
| } | ||
| probe = stream.reader.decode::<lite::Probe>() => { | ||
| let probe = probe?; | ||
| bandwidth.set(Some(probe.bitrate))?; | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Re-open the PROBE stream after unused() instead of ending the task.
When bandwidth.unused() resolves, this function returns Ok(()). In Subscriber::run(), that makes the select arm disappear permanently, so a later consumer can never restart recv-bandwidth probing.
Suggested shape
loop {
bandwidth.used().await?;
let mut stream = Stream::open(&self.session, self.version).await?;
stream.writer.encode(&lite::ControlType::Probe).await?;
loop {
tokio::select! {
biased;
_ = bandwidth.closed() => {
stream.writer.finish()?;
return stream.writer.closed().await;
}
res = bandwidth.unused() => {
res?;
stream.writer.finish()?;
stream.writer.closed().await?;
break;
}
probe = stream.reader.decode::<lite::Probe>() => {
let probe = probe?;
bandwidth.set(Some(probe.bitrate))?;
}
}
}
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@rs/moq-lite/src/lite/subscriber.rs` around lines 135 - 163, The
run_recv_bandwidth task currently returns after bandwidth.unused(), preventing
future consumers from restarting probing; change run_recv_bandwidth to loop so
it re-enters probing when a new consumer appears: wrap the Stream::open/Probe
send and the tokio::select! in an inner loop, call bandwidth.used().await? at
the start of each outer iteration, and on bandwidth.unused() call
stream.writer.finish() and await stream.writer.closed()? then break the inner
loop (not return) so the outer loop continues and waits for
bandwidth.used().await? again; keep the existing closed() branch returning as
before. Use the existing symbols: run_recv_bandwidth, bandwidth.unused(),
bandwidth.used(), Stream::open, stream.writer.finish(), stream.writer.closed(),
and stream.reader.decode to locate and modify the code.
…timation-api-heK3r
- Transport, Decode, Encode, BoundsExceeded now carry inner errors via #[from] - Replace from_code() with Remote(u32) for wire-received reset codes - Session::closed() captures the actual transport error message - Make coding module public for downstream error type access - PROBE stream failures no longer kill the session - Client closes unsupported PROBE streams gracefully instead of aborting Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
rs/moq-lite/src/ietf/session.rs (1)
98-105:⚠️ Potential issue | 🟡 MinorTreat peer close codes as terminal here too.
from_transport()now surfaces peer application closes asError::Remote(code), so those fall through to the generic warning branch and trigger a secondsession.close(...). MatchingRemote(_)here keeps remote shutdowns on the normal termination path.Suggested tweak
- Err(Error::Transport(_)) => { + Err(Error::Transport(_)) | Err(Error::Remote(_)) => {🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rs/moq-lite/src/ietf/session.rs` around lines 98 - 105, The match over res in from_transport() is not handling peer application closes (Err(Error::Remote(_))) and they fall through to the generic warning branch causing a second session.close; add a dedicated arm for Err(Error::Remote(_)) alongside Err(Error::Transport(_)) that logs normal termination (e.g., tracing::info!("session terminated")) and calls session.close(1, "") or the same termination path used for Transport, ensuring Remote is treated as terminal and avoiding the generic Err(err) branch.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@rs/moq-lite/src/error.rs`:
- Around line 4-12: The Error enum currently derives Clone but its
Decode(#[from] coding::DecodeError) variant contains a non-cloneable inner
(coding::DecodeError includes FromUtf8Error), causing the derive to fail; either
remove Clone from the derive on the Error enum or make the inner cloneable by
wrapping the non-cloneable parts (e.g., change Decode(#[from]
coding::DecodeError) to hold a cloneable newtype or Arc-wrapped error inside
coding::DecodeError), so update the #[derive(...)] on Error to drop Clone or
modify coding::DecodeError / the Decode variant to use a cloneable wrapper (Arc
or custom newtype) to allow deriving Clone on Error.
In `@rs/moq-lite/src/lite/publisher.rs`:
- Line 67: Add an Error::is_cancel() helper (returns true for Error::Cancel or
Error::Remote(0)) and replace the three match branches in lite/publisher.rs that
currently use Error::Cancel | Error::Transport(_) so they detect remote peer
cancels via err.is_cancel() instead of only local Cancel (thus treating
Remote(0) as graceful); remove hard-coded 0 at call sites and use the helper to
keep intent clear.
---
Outside diff comments:
In `@rs/moq-lite/src/ietf/session.rs`:
- Around line 98-105: The match over res in from_transport() is not handling
peer application closes (Err(Error::Remote(_))) and they fall through to the
generic warning branch causing a second session.close; add a dedicated arm for
Err(Error::Remote(_)) alongside Err(Error::Transport(_)) that logs normal
termination (e.g., tracing::info!("session terminated")) and calls
session.close(1, "") or the same termination path used for Transport, ensuring
Remote is treated as terminal and avoiding the generic Err(err) branch.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: ea93996f-5c93-465b-88d9-e5efbad981f2
📒 Files selected for processing (15)
js/lite/src/lite/publisher.tsjs/publish/src/video/encoder.tsjs/watch/src/video/source.tsrs/moq-ffi/src/session.rsrs/moq-lite/src/coding/reader.rsrs/moq-lite/src/error.rsrs/moq-lite/src/ietf/adapter.rsrs/moq-lite/src/ietf/session.rsrs/moq-lite/src/lib.rsrs/moq-lite/src/lite/publisher.rsrs/moq-lite/src/lite/session.rsrs/moq-lite/src/lite/subscriber.rsrs/moq-lite/src/model/track.rsrs/moq-lite/src/session.rsrs/moq-mux/src/ordered/consumer.rs
🚧 Files skipped from review as they are similar to previous changes (4)
- js/watch/src/video/source.ts
- js/publish/src/video/encoder.ts
- rs/moq-lite/src/lite/session.rs
- rs/moq-lite/src/session.rs
| #[derive(thiserror::Error, Debug, Clone)] | ||
| #[non_exhaustive] | ||
| pub enum Error { | ||
| #[error("transport error")] | ||
| Transport, | ||
| #[error("transport: {0}")] | ||
| Transport(String), | ||
|
|
||
| #[error("decode error")] | ||
| Decode, | ||
| #[error(transparent)] | ||
| Decode(#[from] coding::DecodeError), | ||
|
|
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
fd -i 'decode.rs' rs --exec sed -n '1,160p'
fd -i 'encode.rs' rs --exec sed -n '1,160p'
rg -n 'FromUtf8Error|derive\(.*Clone|impl Clone for (DecodeError|EncodeError|BoundsExceeded)' rsRepository: moq-dev/moq
Length of output: 21654
Remove Clone from Error or wrap non-cloneable errors.
DecodeError derives Clone but contains InvalidString(#[from] FromUtf8Error), where std::string::FromUtf8Error does not implement Clone. This makes DecodeError itself not cloneable, which causes Error::derive(Clone) to fail compilation since Error wraps DecodeError.
Either remove Clone from the Error derive macro, or wrap the non-cloneable error types in a cloneable newtype (e.g., Arc<FromUtf8Error>).
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@rs/moq-lite/src/error.rs` around lines 4 - 12, The Error enum currently
derives Clone but its Decode(#[from] coding::DecodeError) variant contains a
non-cloneable inner (coding::DecodeError includes FromUtf8Error), causing the
derive to fail; either remove Clone from the derive on the Error enum or make
the inner cloneable by wrapping the non-cloneable parts (e.g., change
Decode(#[from] coding::DecodeError) to hold a cloneable newtype or Arc-wrapped
error inside coding::DecodeError), so update the #[derive(...)] on Error to drop
Clone or modify coding::DecodeError / the Decode variant to use a cloneable
wrapper (Arc or custom newtype) to allow deriving Clone on Error.
| if let Err(err) = Self::run_probe(&session, &mut stream, version).await { | ||
| match &err { | ||
| Error::Cancel | Error::Transport => { | ||
| Error::Cancel | Error::Transport(_) => { |
There was a problem hiding this comment.
Remote CANCEL no longer hits the graceful path.
Error::Cancel is now only produced locally; a peer reset with code 0 comes through as Error::Remote(0). These branches will now warn on normal remote cancels unless they recognize that case too.
Suggested tweak
- Error::Cancel | Error::Transport(_) => {
+ Error::Cancel | Error::Remote(0) | Error::Transport(_) => {Apply the same adjustment to all three branches above. A small Error::is_cancel() helper would also avoid hard-coding 0 at each call site.
Also applies to: 132-132, 231-231
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@rs/moq-lite/src/lite/publisher.rs` at line 67, Add an Error::is_cancel()
helper (returns true for Error::Cancel or Error::Remote(0)) and replace the
three match branches in lite/publisher.rs that currently use Error::Cancel |
Error::Transport(_) so they detect remote peer cancels via err.is_cancel()
instead of only local Cancel (thus treating Remote(0) as graceful); remove
hard-coded 0 at call sites and use the helper to keep intent clear.
…timation-api-heK3r
- Escape `[Buf]`, `[Bytes]`, `[Message]` doc links in writer.rs that were not in scope, causing rustdoc failures - Bump moq-lite from 0.15.8 to 0.15.9 - Merge with latest main Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Summary
This PR adds bandwidth estimation capabilities to both the Rust and JavaScript implementations of MoQ Lite, enabling adaptive bitrate control based on network conditions. The implementation includes send bandwidth estimation from the QUIC congestion controller and receive bandwidth estimation via a new PROBE mechanism.
Key Changes
Rust Implementation
bandwidthmodule (rs/moq-lite/src/model/bandwidth.rs): IntroducesBandwidthProducerandBandwidthConsumerfor managing bandwidth estimates with async change notificationsrs/moq-lite/src/lite/session.rs): Addedrun_send_bandwidth()that polls the QUIC session's congestion controller at 100ms intervals, only when consumers are activers/moq-lite/src/lite/subscriber.rs): Implementedrun_recv_bandwidth()andrun_probe_stream()to open PROBE bidi streams and receive bandwidth estimates from publishers (Lite03+ only)rs/moq-lite/src/session.rs): Exposedsend_bandwidth()andrecv_bandwidth()methods on the publicSessiontypestart()functionsJavaScript Implementation
Bandwidthclass (js/lite/src/bandwidth.ts): Reactive bandwidth estimate with synchronousget(), asyncchanged(), and signal-based observationjs/lite/src/lite/connection.ts): Added#runSendBandwidth()that polls WebTransport stats at 100ms intervalsjs/lite/src/lite/subscriber.ts): Implemented#runProbe()to open PROBE streams and receive bandwidth estimatessendBandwidthandrecvBandwidthproperties on theConnectionclassAdaptive Bitrate Control
js/publish/src/video/encoder.ts): When no explicitmaxBitrateis configured, caps encoder bitrate to 90% of estimated send bandwidth (minimum 100kbps)js/watch/src/video/source.ts): When no explicit bitrate target is set, uses 80% of estimated receive bandwidth to select appropriate qualityjs/publish/src/broadcast.ts): Tracks connection's send bandwidth and propagates to video encodersImplementation Details
Nonefor receive bandwidthhttps://claude.ai/code/session_01C68pEkoUhQqtQALYxRajhx