Skip to content

Replace monolithic catalog with generic section-based registry#1149

Open
kixelated wants to merge 6 commits intodevfrom
generic-catalog
Open

Replace monolithic catalog with generic section-based registry#1149
kixelated wants to merge 6 commits intodevfrom
generic-catalog

Conversation

@kixelated
Copy link
Copy Markdown
Collaborator

Summary

  • Replaces the hardcoded Catalog/Root type with a generic Section<T> registry where sections are identified by name + schema pairs
  • Adds CatalogWriter for producing catalogs and CatalogReader with per-section change notifications (conducer on Rust, signals on JS)
  • Removes application-specific sections (chat, user, preview, location, capabilities) from @moq/hang — these now belong in the application layer
  • Predefined VIDEO and AUDIO sections are available but not auto-registered

Breaking changes

  • hang::Catalog struct removed — use CatalogWriter / CatalogReader with Section<T>
  • Catalog.Root, Catalog.encode(), Catalog.decode(), Catalog.fetch() removed from JS
  • CatalogConsumer::next() replaced with reader() + run() pattern
  • moq_mux::CatalogProducer now uses set()/flush() instead of lock() guard pattern
  • App-specific schemas (Chat, User, Preview, Location, Capabilities) moved to publish/watch packages

Test plan

  • just fix — no formatting issues
  • just check — all linting and type checks pass
  • cargo test — all 523 Rust tests pass, 0 failures
  • MSF catalog conversion tests pass with new (Option<&Video>, Option<&Audio>) API
  • moq-ffi catalog_update_on_new_track test passes

🤖 Generated with Claude Code

The hang catalog previously hardcoded application-specific sections
(chat, user, preview, location, capabilities). This replaces the
monolithic Catalog/Root type with a generic Section<T> registry where
sections are identified by name + schema pairs, enabling custom
applications to define their own sections.

Key changes:
- Add Section<T> type pairing a name with a typed schema (Rust: serde, JS: zod)
- Add CatalogWriter for producing catalogs with typed sections
- Add CatalogReader with per-section change notifications (conducer/signals)
- Predefined VIDEO/AUDIO sections (not auto-registered)
- Remove app-specific sections from @moq/hang (chat, user, preview, etc.)
- Migrate moq-mux, moq-cli, libmoq, moq-ffi to section-based API
- Migrate js/publish and js/watch to use CatalogWriter/CatalogReader

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai bot commented Mar 22, 2026

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: f81b57fe-f0d6-4cb4-a9e3-fe89576de9c0

📥 Commits

Reviewing files that changed from the base of the PR and between 9b237f5 and 88ee499.

📒 Files selected for processing (2)
  • rs/moq-mux/Cargo.toml
  • rs/moq-mux/src/hang/convert.rs
✅ Files skipped from review due to trivial changes (1)
  • rs/moq-mux/Cargo.toml
🚧 Files skipped from review as they are similar to previous changes (1)
  • rs/moq-mux/src/hang/convert.rs

Walkthrough

The PR replaces a monolithic catalog model with independently registerable named sections. It adds a generic Section type and implements CatalogReader and CatalogWriter for per-section validation, diffing, subscriptions, and publishing. Monolithic Root/Chat/Preview/User/Location/Capabilities schemas and types were removed; JS watch introduces application-specific section schemas. Broadcast, publish, watch, and consumer/producer code in both JS and Rust were updated to read/write individual Video/Audio (and other) sections, move to set+flush semantics, and route section changes via the new reader/writer APIs. An Error::Closed variant was added.

🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly describes the main architectural change: replacing a monolithic catalog with a generic section-based registry.
Description check ✅ Passed The description comprehensively explains the refactoring, breaking changes, and test validation, all directly related to the changeset.
Docstring Coverage ✅ Passed Docstring coverage is 85.53% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch generic-catalog
✨ Simplify code
  • Create PR with simplified code
  • Commit simplified code in branch generic-catalog

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
rs/moq-mux/src/producer/hev1.rs (1)

349-357: ⚠️ Potential issue | 🟡 Minor

Use take() instead of borrowing in Drop to properly clean up the track.

The current code borrows the track with &self.track instead of taking ownership. This differs from av01.rs which correctly uses self.track.take(). As a result, the Option still holds Some after the block, and the underlying track may not be properly dropped.

🐛 Proposed fix
 impl Drop for Hev1 {
     fn drop(&mut self) {
-        if let Some(track) = &self.track {
+        if let Some(track) = self.track.take() {
             tracing::debug!(name = ?track.info.name, "ending track");
             self.video.remove_track(&track.info);
             let _ = self.catalog.set(&hang::catalog::VIDEO, &self.video);
             self.catalog.flush();
         }
     }
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-mux/src/producer/hev1.rs` around lines 349 - 357, In Hev1's Drop
implementation, replace borrowing the Option with taking ownership so the inner
Track is actually removed and dropped: use self.track.take() to extract the
track (instead of matching on &self.track), then call tracing::debug! with the
extracted track.info.name, call self.video.remove_track(&track.info), update the
catalog with self.catalog.set(&hang::catalog::VIDEO, &self.video) and flush;
this ensures the Option is set to None and the underlying track can be dropped
(look for the impl Drop for Hev1, the track field, and the calls to
remove_track, catalog.set, and catalog.flush).
js/watch/src/preview.ts (1)

38-49: ⚠️ Potential issue | 🟠 Major

Keep reading preview updates from the track.

This task does a single Zod.read() and then exits. The publisher can emit multiple preview.json frames over the lifetime of the track, so later changes—and end-of-track cleanup—are never observed here.

♻️ Suggested fix
 			effect.spawn(async () => {
 				try {
-					const info = await Zod.read(track, PreviewSchema);
-					if (!info) return;
-
-					this.preview.set(info);
+					for (;;) {
+						const info = await Zod.read(track, PreviewSchema);
+						if (!info) break;
+						this.preview.set(info);
+					}
 				} catch (error) {
 					console.warn("Failed to parse preview JSON:", error);
+				} finally {
+					this.preview.set(undefined);
 				}
 			});
-
-			effect.cleanup(() => this.preview.set(undefined));
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@js/watch/src/preview.ts` around lines 38 - 49, The current effect.spawn
performs a single Zod.read(track, PreviewSchema) and exits, so subsequent
preview.json frames and end-of-track are missed; change the task launched in
effect.spawn (the block containing Zod.read, PreviewSchema and this.preview.set)
to continuously read frames from the track (e.g., loop or async iterator) and
call this.preview.set(info) for every successful parse, and ensure
effect.cleanup stops that loop and resets this.preview to undefined (use an
AbortController or a cancellation flag inside the spawned task) so later updates
and end-of-track cleanup are observed.
🧹 Nitpick comments (13)
js/publish/src/location/types.ts (1)

1-8: Clarify the s field intent in this public type.

Position.s is ambiguous for API consumers. If the short key is required for payload compatibility, add a doc comment explaining what it represents (for example, scale).

Suggested clarification
+/** Position in 3D space with optional scalar metadata. */
 export type Position = {
 	x?: number;
 	y?: number;
 	z?: number;
+	/** Uniform scale factor. */
 	s?: number;
 };

As per coding guidelines, "Use clear and descriptive variable names that convey intent" and "Document public APIs with clear docstrings or comments".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@js/publish/src/location/types.ts` around lines 1 - 8, The public type
Position exposes an ambiguous short field s; update the Position type
declaration to document what s stands for (e.g., scale, speed, semantic state)
with a concise doc comment above the Position type and above the s property, and
if payload compatibility requires the short key keep the name but add the
descriptive comment; reference the Position type and its s property (and Peers
which uses Position) so consumers understand the field's intent.
rs/moq-mux/src/producer/test/mod.rs (1)

22-31: Differentiate missing sections from invalid section payloads.

Right now malformed section JSON is silently treated as default, which can hide root-cause regressions in test failures. Keep defaulting for absent sections, but fail when a present section cannot be decoded.

Suggested adjustment
-	let video: Video = state
-		.sections
-		.get("video")
-		.and_then(|v| serde_json::from_value(v.clone()).ok())
-		.unwrap_or_default();
-	let audio: Audio = state
-		.sections
-		.get("audio")
-		.and_then(|v| serde_json::from_value(v.clone()).ok())
-		.unwrap_or_default();
+	let video: Video = state
+		.sections
+		.get("video")
+		.map(|value| serde_json::from_value(value.clone()).expect("invalid video section"))
+		.unwrap_or_default();
+	let audio: Audio = state
+		.sections
+		.get("audio")
+		.map(|value| serde_json::from_value(value.clone()).expect("invalid audio section"))
+		.unwrap_or_default();
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-mux/src/producer/test/mod.rs` around lines 22 - 31, Currently the test
silently defaults when decoding of present sections fails; change the logic so
that when state.sections.get("video") or get("audio") returns None you keep the
default Video/Audio, but when a Value is present you attempt
serde_json::from_value and propagate a decode error (panic or unwrap/expect)
instead of turning it into a default; update the two bindings (Video and Audio)
that use state.sections.get(...).and_then(|v|
serde_json::from_value(v.clone()).ok()).unwrap_or_default() to check presence
first (e.g., match on get("video")/get("audio")) and call
serde_json::from_value(...).expect("failed to decode video section") so
malformed payloads fail the test while absent sections still default.
rs/libmoq/src/consume.rs (1)

78-101: Manual catalog parsing replaces legacy CatalogConsumer.

The refactored run_catalog manually reads frames from the track and parses JSON sections. The use of unwrap_or_default() for missing sections is appropriate since video/audio sections are optional.

Consider using section name constants instead of the magic strings "video" and "audio" for consistency with the section definitions.

,

♻️ Optional: Use section name constants

If hang::catalog::VIDEO and hang::catalog::AUDIO expose their names, consider:

-				let video: hang::catalog::Video = json
-					.get("video")
+				let video: hang::catalog::Video = json
+					.get(hang::catalog::VIDEO.name)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/libmoq/src/consume.rs` around lines 78 - 101, In run_catalog replace the
magic string lookups for the JSON sections with the catalog section name
constants instead of "video" and "audio": locate the serde_json::Map accessors
where you call .get("video") and .get("audio") and use the appropriate constants
exported by hang::catalog (e.g., hang::catalog::VIDEO / hang::catalog::AUDIO or
their NAME variants) so the code reads the same but relies on the canonical
section identifiers used elsewhere (keep the same serde_json::from_value,
transpose, map_err and unwrap_or_default logic and types hang::catalog::Video
and hang::catalog::Audio).
rs/moq-mux/src/producer/avc3.rs (1)

39-47: Use the VIDEO constant's name field instead of the hardcoded string.

The hardcoded "video" string on line 44 should reference hang::catalog::VIDEO.name to ensure consistency and prevent drift if the section name changes.

♻️ Suggested improvement
 		// Read the current video section from the catalog, if any
 		let video: hang::catalog::Video = {
 			let state = catalog.writer().read();
 			state
 				.sections
-				.get("video")
+				.get(hang::catalog::VIDEO.name)
 				.and_then(|v| serde_json::from_value(v.clone()).ok())
 				.unwrap_or_default()
 		};

Note: The same pattern is used in hev1.rs and av01.rs and should be updated consistently.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-mux/src/producer/avc3.rs` around lines 39 - 47, Replace the hardcoded
"video" key when reading from catalog.sections with the VIDEO constant's name
field: use hang::catalog::VIDEO.name() or hang::catalog::VIDEO.name (match the
constant's API) in the call inside the block that constructs `video` (the code
using `catalog.writer().read()` and `state.sections.get(...)`), so the lookup
uses the canonical section name; apply the same change to the analogous lookups
in `hev1.rs` and `av01.rs`.
rs/moq-mux/src/producer/av01.rs (2)

39-47: Consider using the VIDEO constant for consistency.

The read path uses the string literal "video" while the write path uses &hang::catalog::VIDEO. For consistency and to avoid potential typos, consider using the constant for both:

 let video: hang::catalog::Video = {
     let state = catalog.writer().read();
     state
         .sections
-        .get("video")
+        .get(hang::catalog::VIDEO.name)
         .and_then(|v| serde_json::from_value(v.clone()).ok())
         .unwrap_or_default()
 };
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-mux/src/producer/av01.rs` around lines 39 - 47, Replace the string
literal "video" in the read path with the catalog constant to match the write
path: when building the local video value in the block that calls
catalog.writer().read(), use hang::catalog::VIDEO (or &hang::catalog::VIDEO as
appropriate) instead of "video" so the get lookup is consistent with the write
side and avoids typos; update the use in the closure that does
.sections.get(...) accordingly.

116-117: Discarding set() result silently.

The let _ = self.catalog.set(...) pattern discards any error from set(). If set() can fail in a meaningful way, consider logging or propagating the error. If it's intentionally infallible in this context, the current pattern is acceptable.

Also applies to: 160-161, 249-250, 439-440

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-mux/src/producer/av01.rs` around lines 116 - 117, The calls that
currently use "let _ = self.catalog.set(&hang::catalog::VIDEO, &self.video);"
(and the similar occurrences) silently discard the Result; update these to
handle failures by either propagating the error (returning the Result with ?
from the surrounding function) or logging the error via the project's logger
(e.g., processLogger or crate logger) with context including
hang::catalog::VIDEO and self.video; specifically replace the discard at the
sites calling self.catalog.set(...) with either "self.catalog.set(...)?", or a
match/if let Err(e) => log::error!(... , e) so errors are not lost. Ensure the
chosen approach is consistent across the other occurrences referenced by the
reviewer.
rs/moq-mux/src/producer/hev1.rs (1)

41-49: Same consistency suggestion as av01.rs: use VIDEO.name constant.

For consistency with the write path that uses &hang::catalog::VIDEO, consider using the constant's name field for the read path as well.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-mux/src/producer/hev1.rs` around lines 41 - 49, Replace the hard-coded
"video" key with the catalog constant to match the write path: change
sections.get("video") to sections.get(hang::catalog::VIDEO.name) in the block
that reads the current video section (the code using catalog.writer().read(),
.sections, and serde_json::from_value). Ensure the lookup uses the VIDEO.name
constant so the read path and write path use the same identifier.
js/watch/src/broadcast.ts (1)

112-123: Status remains "live" even if all sections become undefined.

Once any section has a value, status is set to "live". If all sections subsequently become undefined (e.g., empty catalog update), the status won't revert to "loading". If the intent is "once live, stay live until cleanup," this is fine. Otherwise, consider adding an else branch:

 if (video || audio || chat || user || preview || location) {
     this.status.set("live");
+} else {
+    this.status.set("loading");
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@js/watch/src/broadcast.ts` around lines 112 - 123, The effect.run callback
currently sets this.status.set("live") whenever any of the tracked signals
(this.video, this.audio, this.chat, this.user, this.preview, this.location) are
truthy, but never resets the status when all become undefined; update the
effect.run in broadcast.ts to add an else branch that sets this.status back to
"loading" (or another appropriate non-live state) when none of the signals are
present, so after checking const video = inner.get(this.video); const audio =
inner.get(this.audio); const chat = inner.get(this.chat); const user =
inner.get(this.user); const preview = inner.get(this.preview); const location =
inner.get(this.location); you explicitly call this.status.set("loading") when
video||audio||chat||user||preview||location is falsey.
rs/moq-ffi/src/consumer.rs (1)

44-45: Clarify the delta support limitation.

The comment "We don't support deltas yet" explains why the group is taken after reading the first frame. However, this means subsequent frames in the same group are silently ignored. Consider adding a debug log when a group with multiple frames is encountered, to aid future debugging when delta support is added.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-ffi/src/consumer.rs` around lines 44 - 45, When handling a frame from
a group in consumer.rs (the match arm that calls
self.group.as_mut()?.read_frame().await.transpose() and then self.group.take()),
add a debug log before taking the group to record that a group with multiple
frames was encountered and that subsequent frames will be dropped; use the
crate's logging facility (e.g., tracing::debug! or log::debug!) and include
identifying details from the frame (Debug-format the frame or include a frame
id/metadata if available) so future delta support debugging can locate which
group/frame was ignored.
js/watch/src/sections.ts (1)

28-43: Clarify the distinction between PreviewTrackSchema and PreviewSchema.

The PREVIEW section uses PreviewTrackSchema (just a track name), while PreviewSchema defines a richer structure with name, avatar, audio, video, etc.

Is PreviewSchema intended for the content on the preview track rather than the catalog section itself? If so, a brief comment would help clarify this distinction.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@js/watch/src/sections.ts` around lines 28 - 43, Add a short clarifying
comment above PreviewTrackSchema/PreviewTrack type and PREVIEW explaining that
this schema represents the preview "track" identifier used by the Section (a
minimal schema containing only the track name), and add a separate comment above
PreviewSchema/Preview type explaining that this is the richer preview content
shape (name, avatar, audio/video/typing/chat/screen flags) used for preview
track payloads or catalog content; reference PreviewTrackSchema, PREVIEW, and
PreviewSchema in the comments so future readers understand the distinction and
intended usage.
rs/moq-mux/src/convert/fmp4.rs (1)

61-71: Consider extracting shared catalog reading logic.

The run_with_catalog pattern and the catalog reading in run() are nearly identical between fmp4.rs and hang.rs. While not blocking, extracting the common catalog frame reading and JSON parsing logic into a shared helper could reduce duplication.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-mux/src/convert/fmp4.rs` around lines 61 - 71, Both run_with_catalog
and run in fmp4.rs duplicate the catalog frame reading and JSON parsing logic
found also in hang.rs; extract that shared behavior into a single helper (e.g.,
read_and_parse_catalog_frames or parse_catalog_from_consumer) that accepts a
&moq_lite::BroadcastConsumer (and any needed mutable CatalogProducer references)
and returns the parsed catalog structure (or Result). Replace the duplicated
blocks in run_with_catalog and run with calls to this helper to centralize
buffering, frame extraction, and serde_json parsing; ensure the helper exposes
any errors upward and preserves the same returned types so callers
(run_with_catalog, run) only perform protocol-specific handling after receiving
the parsed catalog.
rs/moq-mux/src/catalog.rs (1)

49-75: Silent deserialization failures may hide issues.

Lines 64-71 use .and_then(|v| serde_json::from_value(v.clone()).ok()) which silently ignores deserialization errors for video/audio sections. If the JSON structure doesn't match Video/Audio, the MSF catalog will be published without those sections, with no warning.

Consider logging a warning when deserialization fails:

🔧 Suggested improvement
 let video: Option<Video> = state
     .sections
     .get("video")
-    .and_then(|v| serde_json::from_value(v.clone()).ok());
+    .and_then(|v| {
+        serde_json::from_value(v.clone())
+            .map_err(|e| tracing::warn!("Failed to deserialize video section for MSF: {}", e))
+            .ok()
+    });
 let audio: Option<Audio> = state
     .sections
     .get("audio")
-    .and_then(|v| serde_json::from_value(v.clone()).ok());
+    .and_then(|v| {
+        serde_json::from_value(v.clone())
+            .map_err(|e| tracing::warn!("Failed to deserialize audio section for MSF: {}", e))
+            .ok()
+    });
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-mux/src/catalog.rs` around lines 49 - 75, The flush method currently
swallows serde_json deserialization errors for the video/audio sections (using
.and_then(... .ok())), so update the deserialization in flush to explicitly
match the Result from serde_json::from_value for both Video and Audio: if Ok(v)
set video (or audio), if Err(e) log a warning that includes the section name
("video" or "audio"), the serde error e, and the offending JSON value (or a
short preview), then continue; keep dropping state and calling
crate::msf::publish(video.as_ref(), audio.as_ref(), &mut self.msf_track) so
publish behavior is unchanged but failures are visible.
rs/hang/src/catalog/reader.rs (1)

66-81: Consider potential lock contention during update.

The update method holds the mutex lock while iterating through all sections and potentially writing to each producer. For catalogs with many sections, this could block other operations like section() registration.

This is likely acceptable for typical use cases where catalogs have a small number of sections, but worth noting if the section count grows significantly.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/hang/src/catalog/reader.rs` around lines 66 - 81, The update method holds
the mutex while iterating and calling producer.write(), which can cause
contention; change update to briefly lock self.inner to clone the necessary
state (clone or collect inner.sections entries as (name.clone(),
producer.clone()) and clone inner.last), then drop the lock and perform
comparisons and producer.write() outside the mutex, and finally re-lock once to
set inner.last = json; update references: update, inner, sections, inner.last,
producer.write(), and ensure producer clones are used so writes happen without
holding the mutex.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@js/hang/src/catalog/reader.ts`:
- Around line 45-51: When schema.parse(raw) throws in the reader for a catalog
section (inside the try/catch that currently calls schema.parse and signal.set),
clear the section's signal in the catch path so stale data is not retained;
i.e., in the catch that currently logs the invalid payload for name, call the
appropriate method to unset the signal (e.g., set it to undefined or reset the
signal) instead of leaving the previous value. Ensure you still log the error as
before and reference the same symbols (raw, schema.parse, signal.set) when
making the change.

In `@rs/hang/src/catalog/consumer.rs`:
- Around line 49-69: The run() loop currently only calls self.reader.close() on
clean EOF; change it so CatalogReader::close() is always invoked on every exit
path (errors from self.track.recv_group(), self.group.as_mut()?.read_frame(),
and serde_json::from_slice()). Implement a "finally"-style cleanup: wrap the
loop scope or the async select in a block and ensure self.reader.close() is
called after the loop (or install a guard that calls close in its Drop), so
regardless of early returns or ? errors the reader is closed; reference
functions/values: run(), self.track.recv_group(),
self.group.as_mut()?.read_frame(), serde_json::from_slice(), and
self.reader.close().

In `@rs/moq-cli/src/subscribe.rs`:
- Around line 71-79: The select! currently returns after the first section
change, which can drop the other section before exporter.init() / muxer_tracks
see it; change the logic to keep the catalog_consumer task alive until both
sections have produced their initial values. Replace the single select! with a
loop that checks video_section.get() and audio_section.get() (e.g., while
video_section.get()?.is_none() || audio_section.get()?.is_none()) and inside the
loop use tokio::select! to await catalog_consumer.run(),
video_section.changed(), and audio_section.changed(); on catalog_consumer.run()
error bail as before, otherwise loop until both sections are present, then read
video_section.get() and audio_section.get() and call exporter.init()/build
muxer_tracks.

---

Outside diff comments:
In `@js/watch/src/preview.ts`:
- Around line 38-49: The current effect.spawn performs a single Zod.read(track,
PreviewSchema) and exits, so subsequent preview.json frames and end-of-track are
missed; change the task launched in effect.spawn (the block containing Zod.read,
PreviewSchema and this.preview.set) to continuously read frames from the track
(e.g., loop or async iterator) and call this.preview.set(info) for every
successful parse, and ensure effect.cleanup stops that loop and resets
this.preview to undefined (use an AbortController or a cancellation flag inside
the spawned task) so later updates and end-of-track cleanup are observed.

In `@rs/moq-mux/src/producer/hev1.rs`:
- Around line 349-357: In Hev1's Drop implementation, replace borrowing the
Option with taking ownership so the inner Track is actually removed and dropped:
use self.track.take() to extract the track (instead of matching on &self.track),
then call tracing::debug! with the extracted track.info.name, call
self.video.remove_track(&track.info), update the catalog with
self.catalog.set(&hang::catalog::VIDEO, &self.video) and flush; this ensures the
Option is set to None and the underlying track can be dropped (look for the impl
Drop for Hev1, the track field, and the calls to remove_track, catalog.set, and
catalog.flush).

---

Nitpick comments:
In `@js/publish/src/location/types.ts`:
- Around line 1-8: The public type Position exposes an ambiguous short field s;
update the Position type declaration to document what s stands for (e.g., scale,
speed, semantic state) with a concise doc comment above the Position type and
above the s property, and if payload compatibility requires the short key keep
the name but add the descriptive comment; reference the Position type and its s
property (and Peers which uses Position) so consumers understand the field's
intent.

In `@js/watch/src/broadcast.ts`:
- Around line 112-123: The effect.run callback currently sets
this.status.set("live") whenever any of the tracked signals (this.video,
this.audio, this.chat, this.user, this.preview, this.location) are truthy, but
never resets the status when all become undefined; update the effect.run in
broadcast.ts to add an else branch that sets this.status back to "loading" (or
another appropriate non-live state) when none of the signals are present, so
after checking const video = inner.get(this.video); const audio =
inner.get(this.audio); const chat = inner.get(this.chat); const user =
inner.get(this.user); const preview = inner.get(this.preview); const location =
inner.get(this.location); you explicitly call this.status.set("loading") when
video||audio||chat||user||preview||location is falsey.

In `@js/watch/src/sections.ts`:
- Around line 28-43: Add a short clarifying comment above
PreviewTrackSchema/PreviewTrack type and PREVIEW explaining that this schema
represents the preview "track" identifier used by the Section (a minimal schema
containing only the track name), and add a separate comment above
PreviewSchema/Preview type explaining that this is the richer preview content
shape (name, avatar, audio/video/typing/chat/screen flags) used for preview
track payloads or catalog content; reference PreviewTrackSchema, PREVIEW, and
PreviewSchema in the comments so future readers understand the distinction and
intended usage.

In `@rs/hang/src/catalog/reader.rs`:
- Around line 66-81: The update method holds the mutex while iterating and
calling producer.write(), which can cause contention; change update to briefly
lock self.inner to clone the necessary state (clone or collect inner.sections
entries as (name.clone(), producer.clone()) and clone inner.last), then drop the
lock and perform comparisons and producer.write() outside the mutex, and finally
re-lock once to set inner.last = json; update references: update, inner,
sections, inner.last, producer.write(), and ensure producer clones are used so
writes happen without holding the mutex.

In `@rs/libmoq/src/consume.rs`:
- Around line 78-101: In run_catalog replace the magic string lookups for the
JSON sections with the catalog section name constants instead of "video" and
"audio": locate the serde_json::Map accessors where you call .get("video") and
.get("audio") and use the appropriate constants exported by hang::catalog (e.g.,
hang::catalog::VIDEO / hang::catalog::AUDIO or their NAME variants) so the code
reads the same but relies on the canonical section identifiers used elsewhere
(keep the same serde_json::from_value, transpose, map_err and unwrap_or_default
logic and types hang::catalog::Video and hang::catalog::Audio).

In `@rs/moq-ffi/src/consumer.rs`:
- Around line 44-45: When handling a frame from a group in consumer.rs (the
match arm that calls self.group.as_mut()?.read_frame().await.transpose() and
then self.group.take()), add a debug log before taking the group to record that
a group with multiple frames was encountered and that subsequent frames will be
dropped; use the crate's logging facility (e.g., tracing::debug! or log::debug!)
and include identifying details from the frame (Debug-format the frame or
include a frame id/metadata if available) so future delta support debugging can
locate which group/frame was ignored.

In `@rs/moq-mux/src/catalog.rs`:
- Around line 49-75: The flush method currently swallows serde_json
deserialization errors for the video/audio sections (using .and_then(...
.ok())), so update the deserialization in flush to explicitly match the Result
from serde_json::from_value for both Video and Audio: if Ok(v) set video (or
audio), if Err(e) log a warning that includes the section name ("video" or
"audio"), the serde error e, and the offending JSON value (or a short preview),
then continue; keep dropping state and calling
crate::msf::publish(video.as_ref(), audio.as_ref(), &mut self.msf_track) so
publish behavior is unchanged but failures are visible.

In `@rs/moq-mux/src/convert/fmp4.rs`:
- Around line 61-71: Both run_with_catalog and run in fmp4.rs duplicate the
catalog frame reading and JSON parsing logic found also in hang.rs; extract that
shared behavior into a single helper (e.g., read_and_parse_catalog_frames or
parse_catalog_from_consumer) that accepts a &moq_lite::BroadcastConsumer (and
any needed mutable CatalogProducer references) and returns the parsed catalog
structure (or Result). Replace the duplicated blocks in run_with_catalog and run
with calls to this helper to centralize buffering, frame extraction, and
serde_json parsing; ensure the helper exposes any errors upward and preserves
the same returned types so callers (run_with_catalog, run) only perform
protocol-specific handling after receiving the parsed catalog.

In `@rs/moq-mux/src/producer/av01.rs`:
- Around line 39-47: Replace the string literal "video" in the read path with
the catalog constant to match the write path: when building the local video
value in the block that calls catalog.writer().read(), use hang::catalog::VIDEO
(or &hang::catalog::VIDEO as appropriate) instead of "video" so the get lookup
is consistent with the write side and avoids typos; update the use in the
closure that does .sections.get(...) accordingly.
- Around line 116-117: The calls that currently use "let _ =
self.catalog.set(&hang::catalog::VIDEO, &self.video);" (and the similar
occurrences) silently discard the Result; update these to handle failures by
either propagating the error (returning the Result with ? from the surrounding
function) or logging the error via the project's logger (e.g., processLogger or
crate logger) with context including hang::catalog::VIDEO and self.video;
specifically replace the discard at the sites calling self.catalog.set(...) with
either "self.catalog.set(...)?", or a match/if let Err(e) => log::error!(... ,
e) so errors are not lost. Ensure the chosen approach is consistent across the
other occurrences referenced by the reviewer.

In `@rs/moq-mux/src/producer/avc3.rs`:
- Around line 39-47: Replace the hardcoded "video" key when reading from
catalog.sections with the VIDEO constant's name field: use
hang::catalog::VIDEO.name() or hang::catalog::VIDEO.name (match the constant's
API) in the call inside the block that constructs `video` (the code using
`catalog.writer().read()` and `state.sections.get(...)`), so the lookup uses the
canonical section name; apply the same change to the analogous lookups in
`hev1.rs` and `av01.rs`.

In `@rs/moq-mux/src/producer/hev1.rs`:
- Around line 41-49: Replace the hard-coded "video" key with the catalog
constant to match the write path: change sections.get("video") to
sections.get(hang::catalog::VIDEO.name) in the block that reads the current
video section (the code using catalog.writer().read(), .sections, and
serde_json::from_value). Ensure the lookup uses the VIDEO.name constant so the
read path and write path use the same identifier.

In `@rs/moq-mux/src/producer/test/mod.rs`:
- Around line 22-31: Currently the test silently defaults when decoding of
present sections fails; change the logic so that when
state.sections.get("video") or get("audio") returns None you keep the default
Video/Audio, but when a Value is present you attempt serde_json::from_value and
propagate a decode error (panic or unwrap/expect) instead of turning it into a
default; update the two bindings (Video and Audio) that use
state.sections.get(...).and_then(|v|
serde_json::from_value(v.clone()).ok()).unwrap_or_default() to check presence
first (e.g., match on get("video")/get("audio")) and call
serde_json::from_value(...).expect("failed to decode video section") so
malformed payloads fail the test while absent sections still default.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: dead851f-266c-4cad-89c6-b3d6e9490edc

📥 Commits

Reviewing files that changed from the base of the PR and between 64cbba8 and 997e161.

⛔ Files ignored due to path filters (2)
  • Cargo.lock is excluded by !**/*.lock
  • bun.lock is excluded by !**/*.lock
📒 Files selected for processing (74)
  • js/hang/src/catalog/audio.ts
  • js/hang/src/catalog/capabilities.ts
  • js/hang/src/catalog/chat.ts
  • js/hang/src/catalog/index.ts
  • js/hang/src/catalog/location.ts
  • js/hang/src/catalog/preview.ts
  • js/hang/src/catalog/reader.ts
  • js/hang/src/catalog/root.ts
  • js/hang/src/catalog/section.ts
  • js/hang/src/catalog/user.ts
  • js/hang/src/catalog/video.ts
  • js/hang/src/catalog/writer.ts
  • js/publish/src/audio/encoder.ts
  • js/publish/src/broadcast.ts
  • js/publish/src/chat/index.ts
  • js/publish/src/chat/message.ts
  • js/publish/src/chat/typing.ts
  • js/publish/src/location/index.ts
  • js/publish/src/location/peers.ts
  • js/publish/src/location/types.ts
  • js/publish/src/location/window.ts
  • js/publish/src/preview.ts
  • js/publish/src/user.ts
  • js/publish/src/video/encoder.ts
  • js/publish/src/video/index.ts
  • js/watch/package.json
  • js/watch/src/audio/source.ts
  • js/watch/src/broadcast.ts
  • js/watch/src/chat/index.ts
  • js/watch/src/chat/message.ts
  • js/watch/src/chat/typing.ts
  • js/watch/src/location/index.ts
  • js/watch/src/location/peers.ts
  • js/watch/src/location/window.ts
  • js/watch/src/preview.ts
  • js/watch/src/sections.ts
  • js/watch/src/user.ts
  • js/watch/src/video/source.ts
  • rs/hang/Cargo.toml
  • rs/hang/examples/subscribe.rs
  • rs/hang/examples/video.rs
  • rs/hang/src/catalog/audio/mod.rs
  • rs/hang/src/catalog/chat.rs
  • rs/hang/src/catalog/consumer.rs
  • rs/hang/src/catalog/mod.rs
  • rs/hang/src/catalog/preview.rs
  • rs/hang/src/catalog/reader.rs
  • rs/hang/src/catalog/root.rs
  • rs/hang/src/catalog/section.rs
  • rs/hang/src/catalog/user.rs
  • rs/hang/src/catalog/video/mod.rs
  • rs/hang/src/catalog/writer.rs
  • rs/hang/src/error.rs
  • rs/hang/src/lib.rs
  • rs/libmoq/Cargo.toml
  • rs/libmoq/src/consume.rs
  • rs/moq-cli/src/subscribe.rs
  • rs/moq-ffi/Cargo.toml
  • rs/moq-ffi/src/consumer.rs
  • rs/moq-ffi/src/media.rs
  • rs/moq-mux/Cargo.toml
  • rs/moq-mux/src/catalog.rs
  • rs/moq-mux/src/consumer/fmp4.rs
  • rs/moq-mux/src/convert/fmp4.rs
  • rs/moq-mux/src/convert/hang.rs
  • rs/moq-mux/src/convert/test.rs
  • rs/moq-mux/src/msf.rs
  • rs/moq-mux/src/producer/aac.rs
  • rs/moq-mux/src/producer/av01.rs
  • rs/moq-mux/src/producer/avc3.rs
  • rs/moq-mux/src/producer/fmp4.rs
  • rs/moq-mux/src/producer/hev1.rs
  • rs/moq-mux/src/producer/opus.rs
  • rs/moq-mux/src/producer/test/mod.rs
💤 Files with no reviewable changes (10)
  • js/hang/src/catalog/chat.ts
  • js/hang/src/catalog/user.ts
  • rs/hang/src/catalog/user.rs
  • js/hang/src/catalog/preview.ts
  • rs/hang/src/catalog/root.rs
  • rs/hang/src/catalog/chat.rs
  • rs/hang/src/catalog/preview.rs
  • js/hang/src/catalog/location.ts
  • js/hang/src/catalog/capabilities.ts
  • js/hang/src/catalog/root.ts

Comment on lines +19 to +25
section<T>(def: Section<T>): Getter<T | undefined> {
const existing = this.#sections.get(def.name);
if (existing) return existing.signal as Getter<T | undefined>;

const signal = new Signal<T | undefined>(undefined);
this.#sections.set(def.name, { schema: def.schema, signal });
return signal;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Make section() honor the current snapshot and full section identity.

Registrations are reused by def.name alone and new signals always start at undefined. That means a late subscriber misses the current catalog until another frame arrives, and a same-name section with a different schema aliases the first registration via an unchecked cast. Cache the last parsed object and reject schema mismatches before reusing an existing entry.

Comment thread js/hang/src/catalog/reader.ts
Comment thread rs/hang/src/catalog/consumer.rs
Comment thread rs/moq-cli/src/subscribe.rs
kixelated and others added 2 commits April 2, 2026 11:39
# Conflicts:
#	rs/moq-mux/src/convert/test.rs
#	rs/moq-mux/src/hang/convert.rs
- Resolve merge conflicts with dev branch (hang/convert.rs restructuring, orphan convert/test.rs)
- Update avc1.rs from old catalog lock() API to new section-based set()/flush() API
- Update cmaf/convert.rs test helper to use new catalog API
- Clear signal on schema validation failure in JS catalog reader (CodeRabbit)
- Close CatalogReader on all exit paths in Rust consumer (CodeRabbit)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

♻️ Duplicate comments (1)
rs/moq-cli/src/subscribe.rs (1)

68-76: ⚠️ Potential issue | 🟠 Major

Keep consuming the catalog past the first section change.

This select! stops the catalog reader as soon as either video_section or audio_section changes. With per-section flushes, that first snapshot can be partial, so later sections never reach exporter.init() or muxer_tracks and the muxed output silently drops that media type.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-cli/src/subscribe.rs` around lines 68 - 76, The select currently
awaits catalog_consumer.run() directly and returns when either
video_section.changed() or audio_section.changed() fires, which stops the
catalog reader too early; instead spawn catalog_consumer.run() as a background
task (tokio::spawn) so it keeps consuming, and replace the one-shot select with
logic that waits until both video_section and audio_section have their initial
snapshots (e.g., loop awaiting video_section.changed() and
audio_section.changed() or check video_section.get()/audio_section.get() in a
small await-loop) before reading video_section.get() and audio_section.get() and
proceeding to exporter.init() / muxer_tracks; keep references to
catalog_consumer.run(), video_section.changed(), audio_section.changed(),
exporter.init(), and muxer_tracks to locate the changes.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@rs/moq-ffi/src/consumer.rs`:
- Around line 32-45: The select currently polls self.track.recv_group() even
when self.group is Some, allowing a closed track to win and drop the buffered
frame; change the loop to drain the existing group before awaiting a new one:
inside the loop, if self.group.is_some() then only await the frame read (use
self.group.as_mut()?.read_frame().await.transpose() and handle
Some(frame)/None), otherwise await self.track.recv_group() to set self.group; in
other words, gate polling of self.track.recv_group() on self.group.is_none() so
recv_group() is never polled while there are unread frames.

In `@rs/moq-mux/src/cmaf/convert.rs`:
- Around line 31-57: The code currently reads only the first catalog frame and
defaults missing sections, which drops sections published in later frames;
instead, loop on catalog_track.recv_group().await? and for each group's
read_frame().await? parse the JSON map and merge/update the Video and Audio
values when the "video" or "audio" keys are present (deserializing into
Video/Audio and replacing/merging existing structs), continuing until the
catalog startup/flush is complete (i.e., no more immediate groups or until the
catalog stream is closed), then call run_with_catalog(&input, &mut broadcast,
&mut catalog_producer, &video, &audio). Ensure you use the existing symbols
catalog_track, recv_group, read_frame, Video, Audio, and run_with_catalog.

In `@rs/moq-mux/src/hang/convert.rs`:
- Around line 35-61: The converter currently reads only the first catalog frame
and calls unwrap_or_default on video/audio, which causes it to miss tracks that
appear later; change the logic to preserve absence as Option (i.e., let video:
Option<Video> = ... and let audio: Option<Audio> = ... instead of
unwrap_or_default) and update run_with_catalog to accept and react to
Option<Video>/Option<Audio> (or keep listening on catalog_track and apply
subsequent groups) so later catalog updates can set the missing sections; update
the call to run_with_catalog(&input, &mut broadcast, &mut catalog_producer,
&video, &audio).await accordingly and ensure any code that assumed defaulted
values now handles Option and subscribes to further catalog_track.recv_group()
events to populate tracks when they appear.

In `@rs/moq-mux/src/producer/avc1.rs`:
- Around line 100-108: The catalog entry is being flushed (self.catalog.set +
self.catalog.flush) before the actual MoQ track is created, which can advertise
a rendition that fails to subscribe or leave a stale entry if create_track
errors; reorder so you first create the backing track via
self.broadcast.create_track(track) and only after that succeeds call
self.catalog.set(&hang::catalog::VIDEO, &video) and self.catalog.flush(),
propagating any create_track errors without writing the catalog entry.
- Around line 93-105: The code is doing a read-modify-write on the shared VIDEO
snapshot via current_video() then catalog.set(&VIDEO, &video), which can
overwrite concurrent updates; instead perform an atomic/closure-based update
that mutates only the track entry. Replace the pattern around current_video(),
video.remove_track(...), video.create_track(...), and catalog.set(...) with a
single catalog-side update/transaction call (e.g., catalog.update_video(|video|
{ if let Some(track)=video.take_track(...) { video.remove_track(&track.info); }
let track = video.create_track("avc1", config.clone());
tracing::debug!(name=?track.name, ?config, "starting avc1 track"); Ok(()) })) so
the mutation runs inside the catalog update closure (or use the catalog's
compare-and-swap API) to avoid lost updates; reference current_video(),
video.remove_track, video.create_track, and catalog.set in your change.

---

Duplicate comments:
In `@rs/moq-cli/src/subscribe.rs`:
- Around line 68-76: The select currently awaits catalog_consumer.run() directly
and returns when either video_section.changed() or audio_section.changed()
fires, which stops the catalog reader too early; instead spawn
catalog_consumer.run() as a background task (tokio::spawn) so it keeps
consuming, and replace the one-shot select with logic that waits until both
video_section and audio_section have their initial snapshots (e.g., loop
awaiting video_section.changed() and audio_section.changed() or check
video_section.get()/audio_section.get() in a small await-loop) before reading
video_section.get() and audio_section.get() and proceeding to exporter.init() /
muxer_tracks; keep references to catalog_consumer.run(),
video_section.changed(), audio_section.changed(), exporter.init(), and
muxer_tracks to locate the changes.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 9201e619-25cf-438d-b9b4-0503d6fd0c02

📥 Commits

Reviewing files that changed from the base of the PR and between 997e161 and 2586e37.

⛔ Files ignored due to path filters (2)
  • Cargo.lock is excluded by !**/*.lock
  • bun.lock is excluded by !**/*.lock
📒 Files selected for processing (14)
  • js/hang/src/catalog/reader.ts
  • js/watch/package.json
  • rs/hang/Cargo.toml
  • rs/hang/examples/subscribe.rs
  • rs/hang/src/catalog/consumer.rs
  • rs/libmoq/src/consume.rs
  • rs/moq-cli/src/subscribe.rs
  • rs/moq-ffi/Cargo.toml
  • rs/moq-ffi/src/consumer.rs
  • rs/moq-ffi/src/media.rs
  • rs/moq-mux/Cargo.toml
  • rs/moq-mux/src/cmaf/convert.rs
  • rs/moq-mux/src/hang/convert.rs
  • rs/moq-mux/src/producer/avc1.rs
✅ Files skipped from review due to trivial changes (5)
  • rs/hang/Cargo.toml
  • rs/moq-ffi/Cargo.toml
  • rs/moq-mux/Cargo.toml
  • js/watch/package.json
  • js/hang/src/catalog/reader.ts
🚧 Files skipped from review as they are similar to previous changes (2)
  • rs/hang/examples/subscribe.rs
  • rs/moq-ffi/src/media.rs

Comment on lines +50 to +66
let result = async {
loop {
tokio::select! {
res = self.track.recv_group() => {
match res? {
Some(group) => {
self.group = Some(group);
}
None => return Ok(()),
}
// The track has ended, so we should return None.
None => return Ok(None),
},
Some(frame) = async { self.group.as_mut()?.read_frame().await.transpose() } => {
self.group.take(); // We don't support deltas yet

let json: serde_json::Map<String, serde_json::Value> =
serde_json::from_slice(&frame?)?;
self.reader.update(json);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

cat -n rs/hang/src/catalog/consumer.rs | head -100

Repository: moq-dev/moq

Length of output: 2835


🏁 Script executed:

# Get the size of the file first
wc -l rs/hang/src/catalog/consumer.rs

Repository: moq-dev/moq

Length of output: 89


🏁 Script executed:

# Search for test cases related to consumer or catalog
find . -type f -name "*.rs" | xargs grep -l "CatalogConsumer\|consumer.*test" | head -20

Repository: moq-dev/moq

Length of output: 298


🏁 Script executed:

# Search for tests related to CatalogConsumer
grep -r "CatalogConsumer\|#\[tokio::test\]" rs/hang/src/catalog/ --include="*.rs"

Repository: moq-dev/moq

Length of output: 259


🏁 Script executed:

# Check if there are integration tests or tests in test modules
fd -e rs . rs/hang/src/catalog/ | xargs grep -l "#\[test\]"

Repository: moq-dev/moq

Length of output: 256


🏁 Script executed:

# Look for any test files in the catalog module
find rs/hang/src/catalog -type f -name "*test*" -o -name "tests*"

Repository: moq-dev/moq

Length of output: 37


🏁 Script executed:

# Check the CatalogReader implementation to understand state management
rg "impl CatalogReader" -A 30 rs/hang/src/catalog/

Repository: moq-dev/moq

Length of output: 1912


🏁 Script executed:

# Get the full CatalogReader implementation, especially update() and close() methods
cat -n rs/hang/src/catalog/reader.rs

Repository: moq-dev/moq

Length of output: 5020


🏁 Script executed:

# Check the moq_lite TrackConsumer and GroupConsumer to understand the API
rg "recv_group|read_frame" rs/hang/src/ --include="*.rs" -B 2 -A 5 | head -80

Repository: moq-dev/moq

Length of output: 496


🏁 Script executed:

# Look for any existing tests in the repository that test consumer behavior
find . -path ./target -prune -o -type f -name "*.rs" -print | xargs grep -l "CatalogConsumer\|catalog.*test" | head -10

Repository: moq-dev/moq

Length of output: 228


🏁 Script executed:

# Look at moq_lite to understand GroupConsumer and TrackConsumer behavior
find . -path ./target -prune -o -type f -name "*.rs" -path "*/moq_lite/*" -print | head -20

Repository: moq-dev/moq

Length of output: 37


🏁 Script executed:

# Check if there are examples that use CatalogConsumer
cat rs/hang/examples/subscribe.rs | head -100

Repository: moq-dev/moq

Length of output: 3266


🏁 Script executed:

# Look for any buffering or frame handling logic in moq_lite
rg "GroupConsumer|read_frame" -B 3 -A 8 | head -150

Repository: moq-dev/moq

Length of output: 8063


🏁 Script executed:

# Check if there are any synchronization patterns or fixes applied to similar code
rg "tokio::select" -A 15 rs/hang/src/catalog/ -B 2

Repository: moq-dev/moq

Length of output: 1140


🏁 Script executed:

# Look at the frame reading pattern - check if GroupConsumer can buffer frames
rg "read_frame" rs/ -B 5 -A 3 | grep -A 8 -B 5 "Some.*read_frame"

Repository: moq-dev/moq

Length of output: 7356


Fix the EOF race with buffered catalog groups.

When recv_group() returns the last group, the next loop iteration can have both read_frame() and recv_group() -> None ready. If the EOF arm wins, run() closes the reader without publishing that final snapshot to section subscribers. A single-frame catalog test that closes immediately after writing would expose this race.

Comment on lines +32 to +45
loop {
tokio::select! {
res = self.track.recv_group() => {
match res? {
Some(group) => {
self.group = Some(group);
}
None => return Ok(None),
}
},
Some(frame) = async { self.group.as_mut()?.read_frame().await.transpose() } => {
self.group.take(); // We don't support deltas yet

let frame_data = frame?;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

cat -n rs/moq-ffi/src/consumer.rs | head -100

Repository: moq-dev/moq

Length of output: 3009


🏁 Script executed:

rg -n "MoqCatalogConsumer\|Catalog" rs/moq-ffi/src/consumer.rs -A 5 -B 5 | head -80

Repository: moq-dev/moq

Length of output: 37


🏁 Script executed:

fd -e rs -path "*/test*" | xargs rg -l "Catalog" 2>/dev/null | head -10

Repository: moq-dev/moq

Length of output: 502


🏁 Script executed:

rg -n "recv_group\|read_frame" rs/moq-ffi/src/consumer.rs

Repository: moq-dev/moq

Length of output: 37


🏁 Script executed:

find . -name "*.rs" -type f | xargs rg -l "MoqCatalogConsumer" | head -10

Repository: moq-dev/moq

Length of output: 83


🏁 Script executed:

find . -name "*test*" -o -name "*spec*" | xargs rg -l "Catalog" 2>/dev/null | head -10

Repository: moq-dev/moq

Length of output: 246


🏁 Script executed:

rg -n "fn recv_group|fn read_frame" --type-list | head -5

Repository: moq-dev/moq

Length of output: 444


🏁 Script executed:

find . -path "*/moq_lite*" -name "*.rs" 2>/dev/null | head -20

Repository: moq-dev/moq

Length of output: 37


🏁 Script executed:

find . -name "Cargo.toml" | xargs rg -l "moq_lite" | head -5

Repository: moq-dev/moq

Length of output: 37


🏁 Script executed:

rg -n "impl.*Catalog\|fn next" rs/moq-ffi/src/consumer.rs

Repository: moq-dev/moq

Length of output: 37


🏁 Script executed:

rg -n "#\[cfg(test)\]" rs/moq-ffi/src/consumer.rs

Repository: moq-dev/moq

Length of output: 37


🏁 Script executed:

grep -n "recv_group\|read_frame" rs/moq-ffi/src/*.rs

Repository: moq-dev/moq

Length of output: 238


🏁 Script executed:

cat -n rs/moq-ffi/src/consumer.rs | sed -n '30,70p'

Repository: moq-dev/moq

Length of output: 1451


🏁 Script executed:

rg -n "struct.*Consumer\|impl.*Consumer" rs/moq-ffi/src/ --type rust

Repository: moq-dev/moq

Length of output: 37


🏁 Script executed:

find . -name "*.rs" -type f | xargs rg -l "struct GroupConsumer\|struct TrackConsumer" | head -5

Repository: moq-dev/moq

Length of output: 37


🏁 Script executed:

find . -type f -name "*.rs" | xargs rg -l "GroupConsumer\|TrackConsumer" | head -10

Repository: moq-dev/moq

Length of output: 37


🏁 Script executed:

find . -name "*.toml" | xargs cat | grep -A 5 "moq_lite\|moq-lite"

Repository: moq-dev/moq

Length of output: 2705


🏁 Script executed:

rg -n "pub struct TrackConsumer\|pub fn recv_group" --type rust | head -20

Repository: moq-dev/moq

Length of output: 37


🏁 Script executed:

find rs/moq-lite -name "*.rs" -type f | xargs rg -l "TrackConsumer\|GroupConsumer" | head -10

Repository: moq-dev/moq

Length of output: 37


🏁 Script executed:

rg -n "pub struct TrackConsumer\|impl.*TrackConsumer" rs/moq-lite/src/ -A 10 | head -50

Repository: moq-dev/moq

Length of output: 37


🏁 Script executed:

rg -n "pub fn recv_group\|pub async fn recv_group" rs/moq-lite/src/ -A 15 | head -60

Repository: moq-dev/moq

Length of output: 37


🏁 Script executed:

ls -la rs/moq-lite/

Repository: moq-dev/moq

Length of output: 412


🏁 Script executed:

find rs/moq-lite -name "*.rs" | head -20

Repository: moq-dev/moq

Length of output: 680


🏁 Script executed:

rg "TrackConsumer" rs/ | head -20

Repository: moq-dev/moq

Length of output: 1962


🏁 Script executed:

cat -n rs/moq-lite/src/model/track.rs | head -150

Repository: moq-dev/moq

Length of output: 5953


🏁 Script executed:

rg -n "pub struct TrackConsumer\|pub.*fn recv_group\|pub.*fn read_frame" rs/moq-lite/src/model/track.rs -A 20 | head -100

Repository: moq-dev/moq

Length of output: 37


🏁 Script executed:

rg -n "pub struct TrackConsumer" rs/moq-lite/src/ -A 30

Repository: moq-dev/moq

Length of output: 2258


🏁 Script executed:

rg -n "impl TrackConsumer" rs/moq-lite/src/ -A 50 | head -150

Repository: moq-dev/moq

Length of output: 6929


🏁 Script executed:

rg -n "async fn recv_group" rs/moq-lite/src/ -B 5 -A 20

Repository: moq-dev/moq

Length of output: 7365


🏁 Script executed:

rg -n "pub struct GroupConsumer\|pub.*fn read_frame" rs/moq-lite/src/model/group.rs -A 20 | head -100

Repository: moq-dev/moq

Length of output: 37


🏁 Script executed:

cat -n rs/moq-lite/src/model/group.rs | head -150

Repository: moq-dev/moq

Length of output: 4978


🏁 Script executed:

rg -n "pub struct GroupConsumer" rs/moq-lite/src/model/group.rs -A 80 | head -150

Repository: moq-dev/moq

Length of output: 3338


🏁 Script executed:

rg -n "pub.*fn read_frame\|pub async fn read_frame" rs/moq-lite/src/model/group.rs -A 20

Repository: moq-dev/moq

Length of output: 37


Don't poll recv_group() while the current group has unread frames.

Once self.group is Some(_), the recv_group() arm can still win when the track closes and return Ok(None) before the buffered frame is parsed, dropping the final catalog snapshot nondeterministically. Drain the current group before allowing the select to poll for the next one. A one-group regression test where the producer closes immediately after writing the frame should expose this.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-ffi/src/consumer.rs` around lines 32 - 45, The select currently polls
self.track.recv_group() even when self.group is Some, allowing a closed track to
win and drop the buffered frame; change the loop to drain the existing group
before awaiting a new one: inside the loop, if self.group.is_some() then only
await the frame read (use self.group.as_mut()?.read_frame().await.transpose()
and handle Some(frame)/None), otherwise await self.track.recv_group() to set
self.group; in other words, gate polling of self.track.recv_group() on
self.group.is_none() so recv_group() is never polled while there are unread
frames.

Comment thread rs/moq-mux/src/cmaf/convert.rs Outdated
Comment on lines +31 to +57
let mut catalog_track = input.subscribe_track(&hang::catalog::default_track())?;

// Read the first catalog group/frame directly
let mut group = catalog_track
.recv_group()
.await?
.context("catalog track closed before first group")?;
let frame = group
.read_frame()
.await?
.context("catalog group closed before first frame")?;

let json: serde_json::Map<String, serde_json::Value> = serde_json::from_slice(&frame)?;

let video: Video = json
.get("video")
.map(|v| serde_json::from_value(v.clone()))
.transpose()?
.unwrap_or_default();

let audio: Audio = json
.get("audio")
.map(|v| serde_json::from_value(v.clone()))
.transpose()?
.unwrap_or_default();

run_with_catalog(&input, &mut broadcast, &mut catalog_producer, &video, &audio).await
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don't treat the first catalog frame as the full track layout.

With section-based set()+flush(), startup can legitimately publish video and audio in separate catalog frames. This path defaults any missing section from the first frame and never reads later updates, so tracks that appear in a subsequent flush are silently dropped from conversion/output.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-mux/src/cmaf/convert.rs` around lines 31 - 57, The code currently
reads only the first catalog frame and defaults missing sections, which drops
sections published in later frames; instead, loop on
catalog_track.recv_group().await? and for each group's read_frame().await? parse
the JSON map and merge/update the Video and Audio values when the "video" or
"audio" keys are present (deserializing into Video/Audio and replacing/merging
existing structs), continuing until the catalog startup/flush is complete (i.e.,
no more immediate groups or until the catalog stream is closed), then call
run_with_catalog(&input, &mut broadcast, &mut catalog_producer, &video, &audio).
Ensure you use the existing symbols catalog_track, recv_group, read_frame,
Video, Audio, and run_with_catalog.

Comment thread rs/moq-mux/src/hang/convert.rs Outdated
Comment on lines +35 to +61
let mut catalog_track = input.subscribe_track(&hang::catalog::default_track())?;

// Read the first catalog group/frame directly
let mut group = catalog_track
.recv_group()
.await?
.context("catalog track closed before first group")?;
let frame = group
.read_frame()
.await?
.context("catalog group closed before first frame")?;

let json: serde_json::Map<String, serde_json::Value> = serde_json::from_slice(&frame)?;

let video: Video = json
.get("video")
.map(|v| serde_json::from_value(v.clone()))
.transpose()?
.unwrap_or_default();

let audio: Audio = json
.get("audio")
.map(|v| serde_json::from_value(v.clone()))
.transpose()?
.unwrap_or_default();

run_with_catalog(&input, &mut broadcast, &mut catalog_producer, &video, &audio).await
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don't freeze the Legacy converter on the first catalog snapshot.

The new section-based writer can flush video and audio independently during startup. By defaulting whichever section is absent in the first frame and never consuming later catalog updates, this converter can permanently miss tracks that appear a moment later.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-mux/src/hang/convert.rs` around lines 35 - 61, The converter currently
reads only the first catalog frame and calls unwrap_or_default on video/audio,
which causes it to miss tracks that appear later; change the logic to preserve
absence as Option (i.e., let video: Option<Video> = ... and let audio:
Option<Audio> = ... instead of unwrap_or_default) and update run_with_catalog to
accept and react to Option<Video>/Option<Audio> (or keep listening on
catalog_track and apply subsequent groups) so later catalog updates can set the
missing sections; update the call to run_with_catalog(&input, &mut broadcast,
&mut catalog_producer, &video, &audio).await accordingly and ensure any code
that assumed defaulted values now handles Option and subscribes to further
catalog_track.recv_group() events to populate tracks when they appear.

Comment on lines +93 to +105
let mut video = self.current_video();

if let Some(track) = &self.track.take() {
tracing::debug!(name = ?track.info.name, "reinitializing avc1 track");
catalog.video.remove_track(&track.info);
video.remove_track(&track.info);
}

let track = catalog.video.create_track("avc1", config.clone());
let track = video.create_track("avc1", config.clone());
tracing::debug!(name = ?track.name, ?config, "starting avc1 track");

self.catalog
.set(&hang::catalog::VIDEO, &video)
.map_err(|e| anyhow::anyhow!(e))?;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Avoid read-modify-write on the shared VIDEO section.

current_video() snapshots the entire section, and both initialize() and drop() write that whole section back after mutating a single track. If another producer updates VIDEO in between, its rendition gets overwritten because the writer only synchronizes each individual set(), not the full read/modify/write cycle.

Also applies to: 215-217

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-mux/src/producer/avc1.rs` around lines 93 - 105, The code is doing a
read-modify-write on the shared VIDEO snapshot via current_video() then
catalog.set(&VIDEO, &video), which can overwrite concurrent updates; instead
perform an atomic/closure-based update that mutates only the track entry.
Replace the pattern around current_video(), video.remove_track(...),
video.create_track(...), and catalog.set(...) with a single catalog-side
update/transaction call (e.g., catalog.update_video(|video| { if let
Some(track)=video.take_track(...) { video.remove_track(&track.info); } let track
= video.create_track("avc1", config.clone()); tracing::debug!(name=?track.name,
?config, "starting avc1 track"); Ok(()) })) so the mutation runs inside the
catalog update closure (or use the catalog's compare-and-swap API) to avoid lost
updates; reference current_video(), video.remove_track, video.create_track, and
catalog.set in your change.

Comment on lines +100 to 108
let track = video.create_track("avc1", config.clone());
tracing::debug!(name = ?track.name, ?config, "starting avc1 track");

self.catalog
.set(&hang::catalog::VIDEO, &video)
.map_err(|e| anyhow::anyhow!(e))?;
self.catalog.flush();

let track = self.broadcast.create_track(track)?;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Create the backing track before flushing its catalog entry.

Line 106 publishes the new "avc1" rendition before Line 108 registers the actual MoQ track. Consumers that subscribe directly from catalog state can observe the rendition and fail to subscribe, and a create_track() error leaves a stale entry advertised.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-mux/src/producer/avc1.rs` around lines 100 - 108, The catalog entry is
being flushed (self.catalog.set + self.catalog.flush) before the actual MoQ
track is created, which can advertise a rendition that fails to subscribe or
leave a stale entry if create_track errors; reorder so you first create the
backing track via self.broadcast.create_track(track) and only after that
succeeds call self.catalog.set(&hang::catalog::VIDEO, &video) and
self.catalog.flush(), propagating any create_track errors without writing the
catalog entry.

kixelated and others added 3 commits April 3, 2026 11:28
# Conflicts:
#	js/hang/src/catalog/capabilities.ts
#	js/hang/src/catalog/chat.ts
#	js/hang/src/catalog/location.ts
#	js/hang/src/catalog/preview.ts
#	js/hang/src/catalog/root.ts
#	js/hang/src/catalog/user.ts
#	rs/hang/examples/subscribe.rs
#	rs/hang/src/catalog/consumer.rs
#	rs/hang/src/catalog/root.rs
#	rs/libmoq/src/consume.rs
#	rs/moq-cli/src/subscribe.rs
#	rs/moq-ffi/src/consumer.rs
#	rs/moq-mux/Cargo.toml
#	rs/moq-mux/src/catalog.rs
#	rs/moq-mux/src/cmaf/convert.rs
#	rs/moq-mux/src/hang/convert.rs
#	rs/moq-mux/src/producer/test/mod.rs
- Remove Track.priority field (removed on dev)
- Add Subscription::default() to subscribe_track calls
- Use zod/mini ZodMiniType instead of zod ZodType in Section/Reader
- Add tokio dependency to hang crate (needed for CatalogConsumer::run)
- Restore moq-mux [features] section lost during merge
- Update moq-mux optional deps for feature-gated codec support

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

♻️ Duplicate comments (4)
rs/hang/src/catalog/consumer.rs (1)

49-66: ⚠️ Potential issue | 🟠 Major

EOF race with buffered catalog frame persists.

When recv_group() returns the final group, the next loop iteration races between recv_group() → None (EOF) and read_frame(). If EOF wins the tokio::select!, the buffered frame is discarded without being processed.

Restructure to drain pending frames before accepting EOF:

🐛 Proposed fix to prioritize frame draining
 	pub async fn run(&mut self) -> Result<()> {
 		let result = async {
 			loop {
-				tokio::select! {
-					res = self.track.recv_group() => {
-						match res? {
-							Some(group) => {
-								self.group = Some(group);
-							}
-							None => return Ok(()),
-						}
-					},
-					Some(frame) = async { self.group.as_mut()?.read_frame().await.transpose() } => {
-						self.group.take(); // We don't support deltas yet
-
-						let json: serde_json::Map<String, serde_json::Value> =
-							serde_json::from_slice(&frame?)?;
-						self.reader.update(json);
-					}
+				// Prioritize draining any buffered frame before checking for new groups
+				if let Some(group) = self.group.as_mut() {
+					if let Some(frame_result) = group.read_frame().await.transpose() {
+						self.group.take(); // We don't support deltas yet
+						let json: serde_json::Map<String, serde_json::Value> =
+							serde_json::from_slice(&frame_result?)?;
+						self.reader.update(json);
+						continue;
+					}
+				}
+
+				// No pending frame; wait for the next group or EOF
+				match self.track.recv_group().await? {
+					Some(group) => self.group = Some(group),
+					None => return Ok(()),
 				}
 			}
 		}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/hang/src/catalog/consumer.rs` around lines 49 - 66, The loop races EOF
against buffered frames because you await recv_group() in tokio::select! and may
lose a pending frame; fix by draining any available frames from the newly
received group before allowing recv_group() to be selected again: when res =
self.track.recv_group() yields Some(group), assign self.group = Some(group) and
then immediately loop on self.group.as_mut().and_then(|g|
g.read_frame().await.transpose()) (or equivalent) to consume all ready frames
and call self.reader.update(...) for each frame; only return to the outer select
after the group’s pending frames are exhausted so EOF (recv_group() -> None)
cannot drop buffered frames.
rs/moq-ffi/src/consumer.rs (1)

32-43: ⚠️ Potential issue | 🟠 Major

Drain the active group before polling recv_group() again.

With self.group set, this select! still lets recv_group() win. A close or a newer group can arrive before the buffered frame is read, so next() can drop the pending catalog update or overwrite the unread group. Only poll recv_group() when self.group.is_none(). A one-group test that writes a frame and immediately closes should catch it.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-ffi/src/consumer.rs` around lines 32 - 43, The select! must not poll
self.track.recv_group() while there's an active group; add a guard so the
recv_group branch only runs when self.group.is_none() (e.g. use the select! `if
self.group.is_none()` guard on the `res = self.track.recv_group()` arm) and keep
the existing frame-reading arm to fully drain the current group before accepting
a new one from self.track; reference the recv_group() call, the read_frame()
call on self.group.as_mut(), and the place where self.group is taken.
rs/moq-mux/src/cmaf/convert.rs (1)

31-48: ⚠️ Potential issue | 🟠 Major

Keep consuming catalog updates after startup.

This waits for the first VIDEO/AUDIO change, snapshots both sections once, and then stops driving catalog_consumer. CatalogProducer::flush() emits a new catalog group whenever tracks are added or removed, so later updates are ignored and the converter's track set is frozen at startup. A startup sequence that flushes video and audio separately still breaks here.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-mux/src/cmaf/convert.rs` around lines 31 - 48, The current code waits
for the first VIDEO/AUDIO change, takes a snapshot via
video_section.get()/audio_section.get(), and then stops driving catalog_consumer
so later CatalogProducer::flush() updates are ignored; fix by continuing to
drive catalog_consumer after the initial snapshot (e.g. spawn or concurrently
run catalog_consumer.run() so it keeps processing updates) while the converter
runs (ensure catalog_consumer.run() is polled in the background or selected
alongside run_with_catalog), referencing the existing catalog_consumer,
video_section, audio_section, and run_with_catalog symbols so later flushes
update the track set instead of freezing it at startup.
rs/moq-mux/src/hang/convert.rs (1)

34-52: ⚠️ Potential issue | 🟠 Major

Keep consuming catalog updates after startup.

This waits for the first VIDEO/AUDIO change, snapshots both sections once, and then stops driving catalog_consumer. CatalogProducer::flush() emits a new catalog group whenever tracks are added or removed, so later updates are ignored and the converter's track set is frozen at startup. A startup sequence that flushes video and audio separately still breaks here.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-mux/src/hang/convert.rs` around lines 34 - 52, The current startup
waits for the first VIDEO/AUDIO change but then stops driving catalog_consumer,
freezing the track set; instead ensure catalog_consumer.run() is driven for the
lifetime of the converter (so CatalogProducer::flush() updates are processed).
Fix by running catalog_consumer.run() concurrently (e.g., tokio::spawn or a
background task) before/while you wait for the initial
video_section.changed()/audio_section.changed(), keep the task alive and
propagate/report its error instead of awaiting it inline, and continue to use
video_section.get()/audio_section.get() and call run_with_catalog(&input,
&broadcast, &mut catalog_producer, &video, &audio).await with the consumer still
running so later catalog updates are not ignored.
🧹 Nitpick comments (1)
js/hang/src/catalog/video.ts (1)

108-109: Make the public comment explain why this constant exists.

The current comment is descriptive, but it doesn’t capture the intent for consumers (shared canonical key/interoperability).

♻️ Suggested wording update
-/// Predefined section for video catalog data.
+// Why: canonical video section key used across reader/writer boundaries for consistent interop.
 export const VIDEO = new Section("video", VideoSchema);

As per coding guidelines, "Write comments that explain the 'why', not just the 'what'".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@js/hang/src/catalog/video.ts` around lines 108 - 109, Update the public
comment for the exported constant VIDEO to explain why it exists (its intent),
not just what it is: state that VIDEO is the shared canonical section/key used
across services for video catalog interoperability and schema enforcement,
referencing that it instantiates Section with the VideoSchema to provide a
stable, reusable identifier for consumers; modify the comment above export const
VIDEO (and mention Section and VideoSchema) to capture that purpose.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@rs/moq-mux/src/catalog.rs`:
- Around line 49-74: Change flush(&mut self) to return Result<(), E> (choose an
appropriate error type or create one) and stop silently discarding failures: if
writer.encode() fails, return the error; if hang_track.append_group() fails,
return the error; propagate errors from group.write_frame() and group.finish()
instead of ignoring their Results; and ensure crate::msf::publish is only called
if the hang-track publish succeeded (propagate or map any msf publish errors as
well). Update callers to propagate the new Result where used. Use the existing
function names (flush, writer.encode, hang_track.append_group,
group.write_frame, group.finish, crate::msf::publish) to locate and change the
code accordingly.

In `@rs/moq-mux/src/producer/aac.rs`:
- Around line 105-106: The current Aac implementation caches the entire
hang::catalog::Audio in Aac::new and then writes that stale snapshot back on
Drop, which can overwrite newer updates; fix by not persisting the full cached
section — either remove the audio field and only store the track identifier
(track.id) or, in the Drop impl, re-read the current audio section from the
catalog before mutating it; specifically, update the Drop for struct Aac to
fetch the latest hang::catalog::Audio (instead of using the stored audio),
remove only this instance's track from the tracks list (or call a catalog-level
remove_track_by_id helper if available), and write back just that modified
tracks array rather than overwriting the whole audio snapshot.
- Around line 138-144: The catalog.flush() call is currently publishing the
audio rendition before broadcast.create_track(track_info)? completes; move the
flush so it only runs after create_track succeeds: keep creating track_info via
audio.create_track(...) and calling catalog.set(&hang::catalog::AUDIO, &audio),
then call broadcast.create_track(track_info)? and only after that succeeds
invoke catalog.flush(); ensure you do not flush on error (i.e., run
catalog.flush() after the successful return of broadcast.create_track and before
returning from the function).

In `@rs/moq-mux/src/producer/opus.rs`:
- Around line 48-49: The code currently caches the entire hang::catalog::Audio
in Opus::new and then writes that stale AUDIO snapshot back in Drop, which can
overwrite newer updates; instead, modify the Drop/cleanup logic to re-read the
current AUDIO section from the catalog immediately before mutating it (or,
better, perform a targeted update that only removes this instance's track using
the OrderedProducer track id), so use the live catalog read and remove/update
the specific track entry rather than writing the previously-captured Audio
struct; update any other places that similarly store full audio snapshots (see
the Opus::new constructor and its Drop impl and any code paths around
hang::catalog::Audio and hang::container::OrderedProducer) to follow the same
pattern.
- Around line 79-85: The catalog is being flushed before verifying
broadcast.create_track(track_info) succeeds, causing clients to see a
non-existent track; move the catalog.flush() call so it runs only after
broadcast.create_track(track_info)? returns successfully (i.e., call
catalog.set(&hang::catalog::AUDIO, &audio); then call
broadcast.create_track(track_info) and only after that call catalog.flush()), or
alternatively add rollback logic to remove the previously set catalog entry if
broadcast.create_track fails (use the same catalog key hang::catalog::AUDIO and
the track_info/track creation result to identify and remove the entry).

---

Duplicate comments:
In `@rs/hang/src/catalog/consumer.rs`:
- Around line 49-66: The loop races EOF against buffered frames because you
await recv_group() in tokio::select! and may lose a pending frame; fix by
draining any available frames from the newly received group before allowing
recv_group() to be selected again: when res = self.track.recv_group() yields
Some(group), assign self.group = Some(group) and then immediately loop on
self.group.as_mut().and_then(|g| g.read_frame().await.transpose()) (or
equivalent) to consume all ready frames and call self.reader.update(...) for
each frame; only return to the outer select after the group’s pending frames are
exhausted so EOF (recv_group() -> None) cannot drop buffered frames.

In `@rs/moq-ffi/src/consumer.rs`:
- Around line 32-43: The select! must not poll self.track.recv_group() while
there's an active group; add a guard so the recv_group branch only runs when
self.group.is_none() (e.g. use the select! `if self.group.is_none()` guard on
the `res = self.track.recv_group()` arm) and keep the existing frame-reading arm
to fully drain the current group before accepting a new one from self.track;
reference the recv_group() call, the read_frame() call on self.group.as_mut(),
and the place where self.group is taken.

In `@rs/moq-mux/src/cmaf/convert.rs`:
- Around line 31-48: The current code waits for the first VIDEO/AUDIO change,
takes a snapshot via video_section.get()/audio_section.get(), and then stops
driving catalog_consumer so later CatalogProducer::flush() updates are ignored;
fix by continuing to drive catalog_consumer after the initial snapshot (e.g.
spawn or concurrently run catalog_consumer.run() so it keeps processing updates)
while the converter runs (ensure catalog_consumer.run() is polled in the
background or selected alongside run_with_catalog), referencing the existing
catalog_consumer, video_section, audio_section, and run_with_catalog symbols so
later flushes update the track set instead of freezing it at startup.

In `@rs/moq-mux/src/hang/convert.rs`:
- Around line 34-52: The current startup waits for the first VIDEO/AUDIO change
but then stops driving catalog_consumer, freezing the track set; instead ensure
catalog_consumer.run() is driven for the lifetime of the converter (so
CatalogProducer::flush() updates are processed). Fix by running
catalog_consumer.run() concurrently (e.g., tokio::spawn or a background task)
before/while you wait for the initial
video_section.changed()/audio_section.changed(), keep the task alive and
propagate/report its error instead of awaiting it inline, and continue to use
video_section.get()/audio_section.get() and call run_with_catalog(&input,
&broadcast, &mut catalog_producer, &video, &audio).await with the consumer still
running so later catalog updates are not ignored.

---

Nitpick comments:
In `@js/hang/src/catalog/video.ts`:
- Around line 108-109: Update the public comment for the exported constant VIDEO
to explain why it exists (its intent), not just what it is: state that VIDEO is
the shared canonical section/key used across services for video catalog
interoperability and schema enforcement, referencing that it instantiates
Section with the VideoSchema to provide a stable, reusable identifier for
consumers; modify the comment above export const VIDEO (and mention Section and
VideoSchema) to capture that purpose.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: e49a89f4-b0dd-4aa8-a177-35d01679703f

📥 Commits

Reviewing files that changed from the base of the PR and between 2586e37 and 9b237f5.

⛔ Files ignored due to path filters (2)
  • Cargo.lock is excluded by !**/*.lock
  • bun.lock is excluded by !**/*.lock
📒 Files selected for processing (23)
  • js/hang/src/catalog/audio.ts
  • js/hang/src/catalog/reader.ts
  • js/hang/src/catalog/section.ts
  • js/hang/src/catalog/video.ts
  • js/publish/src/video/index.ts
  • rs/hang/Cargo.toml
  • rs/hang/examples/subscribe.rs
  • rs/hang/examples/video.rs
  • rs/hang/src/catalog/audio/mod.rs
  • rs/hang/src/catalog/consumer.rs
  • rs/hang/src/catalog/video/mod.rs
  • rs/libmoq/src/consume.rs
  • rs/moq-cli/src/publish.rs
  • rs/moq-cli/src/subscribe.rs
  • rs/moq-ffi/Cargo.toml
  • rs/moq-ffi/src/consumer.rs
  • rs/moq-mux/Cargo.toml
  • rs/moq-mux/src/catalog.rs
  • rs/moq-mux/src/cmaf/convert.rs
  • rs/moq-mux/src/hang/convert.rs
  • rs/moq-mux/src/producer/aac.rs
  • rs/moq-mux/src/producer/opus.rs
  • rs/moq-mux/src/producer/test/mod.rs
✅ Files skipped from review due to trivial changes (5)
  • rs/moq-ffi/Cargo.toml
  • rs/hang/Cargo.toml
  • js/hang/src/catalog/section.ts
  • js/publish/src/video/index.ts
  • rs/hang/examples/subscribe.rs
🚧 Files skipped from review as they are similar to previous changes (8)
  • rs/hang/src/catalog/video/mod.rs
  • js/hang/src/catalog/audio.ts
  • rs/hang/src/catalog/audio/mod.rs
  • rs/moq-mux/Cargo.toml
  • rs/moq-mux/src/producer/test/mod.rs
  • js/hang/src/catalog/reader.ts
  • rs/moq-cli/src/subscribe.rs
  • rs/hang/examples/video.rs

Comment thread rs/moq-mux/src/catalog.rs
Comment on lines +49 to +74
pub fn flush(&mut self) {
// Publish hang catalog
let Ok(encoded) = self.writer.encode() else {
return;
};
let Ok(mut group) = self.hang_track.append_group() else {
return;
};
let _ = group.write_frame(encoded);
let _ = group.finish();

// Publish MSF catalog
// Read video and audio sections from the writer state for MSF conversion.
let state = self.writer.read();
let video: Option<Video> = state
.sections
.get("video")
.and_then(|v| serde_json::from_value(v.clone()).ok());
let audio: Option<Audio> = state
.sections
.get("audio")
.and_then(|v| serde_json::from_value(v.clone()).ok());
drop(state);

crate::msf::publish(video.as_ref(), audio.as_ref(), &mut self.msf_track);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# Determine crate type and structure
cd /tmp && find . -name "Cargo.toml" -path "*/moq-mux/*" | head -5

Repository: moq-dev/moq

Length of output: 37


🏁 Script executed:

# Read the catalog.rs file to see the full context
cat -n rs/moq-mux/src/catalog.rs | head -100

Repository: moq-dev/moq

Length of output: 3693


🏁 Script executed:

# Search for all calls to flush() in the codebase
rg "\.flush\(\)" rs/moq-mux/ -n

Repository: moq-dev/moq

Length of output: 2453


🏁 Script executed:

# Check the Cargo.toml to understand if moq-mux is a library or binary
cat rs/moq-mux/Cargo.toml

Repository: moq-dev/moq

Length of output: 1839


🏁 Script executed:

# Look for error handling patterns in the codebase - check for thiserror/anyhow usage
rg "(thiserror|anyhow)" rs/moq-mux/src/ -l

Repository: moq-dev/moq

Length of output: 561


🏁 Script executed:

# Check the error handling patterns in the crate
cat -n rs/moq-mux/src/error.rs

Repository: moq-dev/moq

Length of output: 417


🏁 Script executed:

# Look at a few call sites to see their context and error handling
cat -n rs/moq-mux/src/producer/fmp4.rs | sed -n '185,195p'
cat -n rs/moq-mux/src/producer/fmp4.rs | sed -n '665,675p'

Repository: moq-dev/moq

Length of output: 632


🏁 Script executed:

# Check if any call sites are in Result-returning contexts
rg -B5 -A2 "self\.catalog\.flush\(\)" rs/moq-mux/src/producer/fmp4.rs | head -30

Repository: moq-dev/moq

Length of output: 595


🏁 Script executed:

# Look at the msf::publish function to see its error handling
rg -A10 "pub.*fn publish" rs/moq-mux/src/msf.rs

Repository: moq-dev/moq

Length of output: 377


🏁 Script executed:

# Verify that the call sites are indeed in Result-returning functions
rg -B10 "self\.catalog\.flush\(\)" rs/moq-mux/src/producer/opus.rs | head -25

Repository: moq-dev/moq

Length of output: 344


🏁 Script executed:

# Check one more call site context to confirm they're all in error-aware contexts
cat -n rs/moq-mux/src/producer/opus.rs | sed -n '75,90p'

Repository: moq-dev/moq

Length of output: 653


Make flush() return a Result to surface publication failures.

flush() is the only publish path for both catalogs, but all fallible operations are silently ignored: writer.encode(), hang_track.append_group(), group.write_frame(), and group.finish() can all fail without callers knowing. This allows callers to proceed as if the catalog update succeeded when it may have failed, risking divergence between the hang and MSF catalogs. Return a Result and propagate errors at call sites (which are already in Result-returning contexts).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-mux/src/catalog.rs` around lines 49 - 74, Change flush(&mut self) to
return Result<(), E> (choose an appropriate error type or create one) and stop
silently discarding failures: if writer.encode() fails, return the error; if
hang_track.append_group() fails, return the error; propagate errors from
group.write_frame() and group.finish() instead of ignoring their Results; and
ensure crate::msf::publish is only called if the hang-track publish succeeded
(propagate or map any msf publish errors as well). Update callers to propagate
the new Result where used. Use the existing function names (flush,
writer.encode, hang_track.append_group, group.write_frame, group.finish,
crate::msf::publish) to locate and change the code accordingly.

Comment on lines +105 to 106
audio: hang::catalog::Audio,
track: hang::container::OrderedProducer,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don't write back a stale AUDIO snapshot.

This caches the entire audio section for the lifetime of the producer and then writes that cached copy back in Drop. If another audio producer updates AUDIO after Aac::new() runs, dropping this instance will overwrite those newer renditions while removing its own track. Re-read the current section before mutating it, or update the catalog at track granularity.

Also applies to: 116-124, 194-196

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-mux/src/producer/aac.rs` around lines 105 - 106, The current Aac
implementation caches the entire hang::catalog::Audio in Aac::new and then
writes that stale snapshot back on Drop, which can overwrite newer updates; fix
by not persisting the full cached section — either remove the audio field and
only store the track identifier (track.id) or, in the Drop impl, re-read the
current audio section from the catalog before mutating it; specifically, update
the Drop for struct Aac to fetch the latest hang::catalog::Audio (instead of
using the stored audio), remove only this instance's track from the tracks list
(or call a catalog-level remove_track_by_id helper if available), and write back
just that modified tracks array rather than overwriting the whole audio
snapshot.

Comment on lines +138 to +144
let track_info = audio.create_track("aac", audio_config.clone());
tracing::debug!(name = ?track_info.name, config = ?audio_config, "starting track");

let _ = catalog.set(&hang::catalog::AUDIO, &audio);
catalog.flush();

let track = broadcast.create_track(track_info)?;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# First, let's look at the file structure and locate the target file
git ls-files rs/moq-mux/src/producer/aac.rs

Repository: moq-dev/moq

Length of output: 85


🏁 Script executed:

# Read the file around lines 138-144 with broader context
wc -l rs/moq-mux/src/producer/aac.rs

Repository: moq-dev/moq

Length of output: 89


🏁 Script executed:

# Get the content around the specified lines with context
sed -n '120,160p' rs/moq-mux/src/producer/aac.rs

Repository: moq-dev/moq

Length of output: 1177


🏁 Script executed:

# Get broader context to see the full function definition
sed -n '100,165p' rs/moq-mux/src/producer/aac.rs

Repository: moq-dev/moq

Length of output: 1912


🏁 Script executed:

# Look for the function signature
sed -n '1,145p' rs/moq-mux/src/producer/aac.rs | grep -A 50 "fn " | head -60

Repository: moq-dev/moq

Length of output: 2363


🏁 Script executed:

# Check if there's any rollback mechanism or transaction support in catalog
rg "flush|rollback|transaction" rs/moq-mux/src/producer/aac.rs

Repository: moq-dev/moq

Length of output: 148


🏁 Script executed:

# Find CatalogProducer definition
rg "type CatalogProducer|struct CatalogProducer" rs/moq-mux/src/

Repository: moq-dev/moq

Length of output: 109


🏁 Script executed:

# Look for the main.rs or lib.rs to find CatalogProducer type alias
find rs/moq-mux/src -name "*.rs" -exec grep -l "CatalogProducer" {} \;

Repository: moq-dev/moq

Length of output: 467


🏁 Script executed:

# Check for rollback or revert methods in catalog-related code
rg "rollback|revert|reset" rs/moq-mux/src/

Repository: moq-dev/moq

Length of output: 153


🏁 Script executed:

# Examine the CatalogProducer struct definition and methods
sed -n '1,100p' rs/moq-mux/src/catalog.rs

Repository: moq-dev/moq

Length of output: 3028


🏁 Script executed:

# Look for all methods on CatalogProducer to see if rollback/revert exists
rg "impl CatalogProducer" -A 100 rs/moq-mux/src/catalog.rs | head -150

Repository: moq-dev/moq

Length of output: 2450


🏁 Script executed:

# Check the full catalog.rs file to understand flush() semantics
wc -l rs/moq-mux/src/catalog.rs

Repository: moq-dev/moq

Length of output: 83


Move catalog.flush() after broadcast.create_track() succeeds.

catalog.flush() publishes the rendition before broadcast.create_track(track_info)? completes. If track creation fails, the catalog advertises a track that was never created. Create the broadcast track first, then flush the catalog.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-mux/src/producer/aac.rs` around lines 138 - 144, The catalog.flush()
call is currently publishing the audio rendition before
broadcast.create_track(track_info)? completes; move the flush so it only runs
after create_track succeeds: keep creating track_info via
audio.create_track(...) and calling catalog.set(&hang::catalog::AUDIO, &audio),
then call broadcast.create_track(track_info)? and only after that succeeds
invoke catalog.flush(); ensure you do not flush on error (i.e., run
catalog.flush() after the successful return of broadcast.create_track and before
returning from the function).

Comment on lines +48 to 49
audio: hang::catalog::Audio,
track: hang::container::OrderedProducer,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don't write back a stale AUDIO snapshot.

This caches the entire audio section for the lifetime of the producer and then writes that cached copy back in Drop. If another audio producer updates AUDIO after Opus::new() runs, dropping this instance will overwrite those newer renditions while removing its own track. Re-read the current section before mutating it, or update the catalog at track granularity.

Also applies to: 59-67, 135-137

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-mux/src/producer/opus.rs` around lines 48 - 49, The code currently
caches the entire hang::catalog::Audio in Opus::new and then writes that stale
AUDIO snapshot back in Drop, which can overwrite newer updates; instead, modify
the Drop/cleanup logic to re-read the current AUDIO section from the catalog
immediately before mutating it (or, better, perform a targeted update that only
removes this instance's track using the OrderedProducer track id), so use the
live catalog read and remove/update the specific track entry rather than writing
the previously-captured Audio struct; update any other places that similarly
store full audio snapshots (see the Opus::new constructor and its Drop impl and
any code paths around hang::catalog::Audio and hang::container::OrderedProducer)
to follow the same pattern.

Comment on lines +79 to +85
let track_info = audio.create_track("opus", audio_config.clone());
tracing::debug!(name = ?track_info.name, config = ?audio_config, "starting track");

let _ = catalog.set(&hang::catalog::AUDIO, &audio);
catalog.flush();

let track = broadcast.create_track(track_info)?;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# First, let's read the file to see the full context around lines 79-85
cat -n rs/moq-mux/src/producer/opus.rs | sed -n '60,100p'

Repository: moq-dev/moq

Length of output: 1411


🏁 Script executed:

# Also check the entire function to understand the broader context
head -150 rs/moq-mux/src/producer/opus.rs | tail -100

Repository: moq-dev/moq

Length of output: 2647


🏁 Script executed:

# Search for the function definition to understand its full scope
rg -A 50 "fn.*opus" rs/moq-mux/src/producer/opus.rs | head -80

Repository: moq-dev/moq

Length of output: 37


🏁 Script executed:

# Let's look at catalog operations and track creation to understand the semantics
rg "catalog\." rs/moq-mux/src/producer/opus.rs -B 2 -A 2

Repository: moq-dev/moq

Length of output: 641


🏁 Script executed:

# Check if there's any rollback or cleanup logic for catalog failures
rg -A 10 "broadcast.create_track" rs/moq-mux/src/producer/opus.rs

Repository: moq-dev/moq

Length of output: 315


Move catalog.flush() after broadcast.create_track(track_info)? succeeds.

If track creation fails, catalog.flush() has already advertised the track to clients before it actually exists. Reorder to flush the catalog only after the broadcast track is successfully created, or implement rollback logic to remove the track from the catalog on failure.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-mux/src/producer/opus.rs` around lines 79 - 85, The catalog is being
flushed before verifying broadcast.create_track(track_info) succeeds, causing
clients to see a non-existent track; move the catalog.flush() call so it runs
only after broadcast.create_track(track_info)? returns successfully (i.e., call
catalog.set(&hang::catalog::AUDIO, &audio); then call
broadcast.create_track(track_info) and only after that call catalog.flush()), or
alternatively add rollback logic to remove the previously set catalog entry if
broadcast.create_track fails (use the same catalog key hang::catalog::AUDIO and
the track_info/track creation result to identify and remove the entry).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant