22
33from __future__ import annotations
44
5- from moq_ffi import Container , MoqBroadcastConsumer , MoqCatalogConsumer , MoqMediaConsumer
5+ from moq_ffi import (
6+ Container ,
7+ MoqBroadcastConsumer ,
8+ MoqCatalogConsumer ,
9+ MoqGroupConsumer ,
10+ MoqMediaConsumer ,
11+ MoqTrackConsumer ,
12+ )
613
714from .types import Catalog , Frame
815
@@ -26,6 +33,84 @@ def cancel(self) -> None:
2633 self ._inner .cancel ()
2734
2835
36+ class GroupConsumer :
37+ """Async iterator of byte payloads within a single group."""
38+
39+ def __init__ (self , inner : MoqGroupConsumer ) -> None :
40+ self ._inner = inner
41+
42+ @property
43+ def sequence (self ) -> int :
44+ """The sequence number of this group within the track."""
45+ return self ._inner .sequence ()
46+
47+ def __aiter__ (self ):
48+ return self
49+
50+ async def __anext__ (self ) -> bytes :
51+ frame = await self ._inner .read_frame ()
52+ if frame is None :
53+ raise StopAsyncIteration
54+ return frame
55+
56+ def cancel (self ) -> None :
57+ self ._inner .cancel ()
58+
59+
60+ class TrackConsumer :
61+ """Async iterator of groups from a track.
62+
63+ Each group is itself an async iterator of byte payloads. Same pattern as
64+ moq-boy's status/command tracks (one frame per group), but multi-frame
65+ groups are also supported.
66+ """
67+
68+ def __init__ (self , inner : MoqTrackConsumer ) -> None :
69+ self ._inner = inner
70+
71+ def __aiter__ (self ):
72+ return self
73+
74+ async def __anext__ (self ) -> GroupConsumer :
75+ group = await self .recv_group ()
76+ if group is None :
77+ raise StopAsyncIteration
78+ return group
79+
80+ async def recv_group (self ) -> GroupConsumer | None :
81+ """Return the next group in arrival order. Returns `None` when the track ends.
82+
83+ Groups are returned as they arrive on the wire, which may be out of sequence
84+ order. Use this for live consumption where latency matters more than order.
85+ """
86+ group = await self ._inner .recv_group ()
87+ if group is None :
88+ return None
89+ return GroupConsumer (group )
90+
91+ async def next_group (self ) -> GroupConsumer | None :
92+ """Return the next group in sequence order, skipping forward if behind.
93+
94+ Returns `None` when the track ends. Use this when order matters more than
95+ latency; `recv_group` is preferred for live consumption.
96+ """
97+ group = await self ._inner .next_group ()
98+ if group is None :
99+ return None
100+ return GroupConsumer (group )
101+
102+ async def read_frame (self ) -> bytes | None :
103+ """Read the first frame of the next group.
104+
105+ Convenience for tracks using one-frame-per-group (like moq-boy's
106+ status/command tracks). Returns `None` when the track ends.
107+ """
108+ return await self ._inner .read_frame ()
109+
110+ def cancel (self ) -> None :
111+ self ._inner .cancel ()
112+
113+
29114class CatalogConsumer :
30115 """Wraps MoqCatalogConsumer as an async iterator of Catalog."""
31116
@@ -54,6 +139,10 @@ def __init__(self, inner: MoqBroadcastConsumer) -> None:
54139 def subscribe_catalog (self ) -> CatalogConsumer :
55140 return CatalogConsumer (self ._inner .subscribe_catalog ())
56141
142+ def subscribe_track (self , name : str ) -> TrackConsumer :
143+ """Subscribe to a track — receive arbitrary byte payloads."""
144+ return TrackConsumer (self ._inner .subscribe_track (name ))
145+
57146 def subscribe_media (self , name : str , container : Container , max_latency_ms : int ) -> MediaConsumer :
58147 return MediaConsumer (self ._inner .subscribe_media (name , container , max_latency_ms ))
59148
0 commit comments