Skip to content

Commit 27235d9

Browse files
kixelatedclaude
andauthored
Fix TrackConsumer::read_frame: respect start_at, avoid premature EOF (#1327)
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent a20f85c commit 27235d9

1 file changed

Lines changed: 67 additions & 5 deletions

File tree

rs/moq-lite/src/model/track.rs

Lines changed: 67 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ impl State {
9292
waiter: &conducer::Waiter,
9393
) -> Poll<Result<Option<(bytes::Bytes, usize, u64)>>> {
9494
let start = index.saturating_sub(self.offset);
95+
let mut pending_seen = false;
9596
for (i, slot) in self.groups.iter().enumerate().skip(start) {
9697
let Some((group, _)) = slot else { continue };
9798
if group.info.sequence < next_sequence {
@@ -105,11 +106,18 @@ impl State {
105106
}
106107
Poll::Ready(Ok(None)) => continue,
107108
Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
108-
Poll::Pending => continue,
109+
Poll::Pending => {
110+
pending_seen = true;
111+
continue;
112+
}
109113
}
110114
}
111115

112-
if self.final_sequence.is_some() {
116+
// A pending group can still produce a frame even after finish() — finish only
117+
// blocks new groups at/above final_sequence, not frames on existing groups.
118+
if pending_seen {
119+
Poll::Pending
120+
} else if self.final_sequence.is_some() {
113121
Poll::Ready(Ok(None))
114122
} else if let Some(err) = &self.abort {
115123
Poll::Ready(Err(err.clone()))
@@ -529,9 +537,10 @@ impl TrackConsumer {
529537
/// skipping the rest of the group. Intended for single-frame groups (see
530538
/// [`TrackProducer::write_frame`]).
531539
pub fn poll_read_frame(&mut self, waiter: &conducer::Waiter) -> Poll<Result<Option<bytes::Bytes>>> {
532-
let Some((frame, found_index, sequence)) = ready!(self.poll(waiter, |state| {
533-
state.poll_read_frame(self.index, self.next_sequence, waiter)
534-
})?) else {
540+
let lower = self.min_sequence.max(self.next_sequence);
541+
let Some((frame, found_index, sequence)) =
542+
ready!(self.poll(waiter, |state| { state.poll_read_frame(self.index, lower, waiter) })?)
543+
else {
535544
return Poll::Ready(Ok(None));
536545
};
537546

@@ -1075,6 +1084,59 @@ mod test {
10751084
assert_eq!(&frame[..], b"next");
10761085
}
10771086

1087+
#[tokio::test]
1088+
async fn read_frame_waits_for_pending_group_after_finish() {
1089+
// finish() sets final_sequence, but groups already created with lower sequences
1090+
// can still produce frames. read_frame must not return None prematurely.
1091+
let mut producer = Track::new("test").produce();
1092+
let mut consumer = producer.consume();
1093+
1094+
let mut g0 = producer.create_group(Group { sequence: 0 }).unwrap();
1095+
producer.finish().unwrap();
1096+
1097+
// Track is finished but group 0 has no frame yet — must block, not return None.
1098+
assert!(
1099+
consumer.read_frame().now_or_never().is_none(),
1100+
"read_frame must block on a pending group even after finish()"
1101+
);
1102+
1103+
// A late frame on the pending group is still delivered.
1104+
g0.write_frame(bytes::Bytes::from_static(b"late")).unwrap();
1105+
let frame = consumer
1106+
.read_frame()
1107+
.now_or_never()
1108+
.expect("should not block once a frame is written")
1109+
.expect("would have errored")
1110+
.expect("track should not be closed");
1111+
assert_eq!(&frame[..], b"late");
1112+
}
1113+
1114+
#[tokio::test]
1115+
async fn read_frame_respects_start_at() {
1116+
// start_at sets min_sequence; read_frame must skip groups below it even though
1117+
// next_sequence is still 0.
1118+
let mut producer = Track::new("test").produce();
1119+
let mut consumer = producer.consume();
1120+
consumer.start_at(5);
1121+
1122+
// Seq 3 has a frame but is below min_sequence — must be skipped.
1123+
let mut g3 = producer.create_group(Group { sequence: 3 }).unwrap();
1124+
g3.write_frame(bytes::Bytes::from_static(b"skip-me")).unwrap();
1125+
g3.finish().unwrap();
1126+
1127+
let mut g5 = producer.create_group(Group { sequence: 5 }).unwrap();
1128+
g5.write_frame(bytes::Bytes::from_static(b"keep")).unwrap();
1129+
g5.finish().unwrap();
1130+
1131+
let frame = consumer
1132+
.read_frame()
1133+
.now_or_never()
1134+
.expect("should not block")
1135+
.expect("would have errored")
1136+
.expect("track should not be closed");
1137+
assert_eq!(&frame[..], b"keep");
1138+
}
1139+
10781140
#[tokio::test]
10791141
async fn read_frame_returns_none_when_finished() {
10801142
let mut producer = Track::new("test").produce();

0 commit comments

Comments
 (0)