@@ -82,6 +82,42 @@ impl State {
8282 }
8383 }
8484
85+ /// Scan groups at or after `index` in arrival order, looking for the first with sequence
86+ /// `>= next_sequence` that has a fully-buffered next frame. Returns the frame plus the
87+ /// winning slot's absolute index and sequence so the consumer can advance past it.
88+ fn poll_read_frame (
89+ & self ,
90+ index : usize ,
91+ next_sequence : u64 ,
92+ waiter : & conducer:: Waiter ,
93+ ) -> Poll < Result < Option < ( bytes:: Bytes , usize , u64 ) > > > {
94+ let start = index. saturating_sub ( self . offset ) ;
95+ for ( i, slot) in self . groups . iter ( ) . enumerate ( ) . skip ( start) {
96+ let Some ( ( group, _) ) = slot else { continue } ;
97+ if group. info . sequence < next_sequence {
98+ continue ;
99+ }
100+
101+ let mut consumer = group. consume ( ) ;
102+ match consumer. poll_read_frame ( waiter) {
103+ Poll :: Ready ( Ok ( Some ( frame) ) ) => {
104+ return Poll :: Ready ( Ok ( Some ( ( frame, self . offset + i, group. info . sequence ) ) ) ) ;
105+ }
106+ Poll :: Ready ( Ok ( None ) ) => continue ,
107+ Poll :: Ready ( Err ( e) ) => return Poll :: Ready ( Err ( e) ) ,
108+ Poll :: Pending => continue ,
109+ }
110+ }
111+
112+ if self . final_sequence . is_some ( ) {
113+ Poll :: Ready ( Ok ( None ) )
114+ } else if let Some ( err) = & self . abort {
115+ Poll :: Ready ( Err ( err. clone ( ) ) )
116+ } else {
117+ Poll :: Pending
118+ }
119+ }
120+
85121 fn poll_get_group ( & self , sequence : u64 ) -> Poll < Result < Option < GroupConsumer > > > {
86122 // Search for the group with the matching sequence, skipping tombstones.
87123 for ( group, _) in self . groups . iter ( ) . flatten ( ) {
@@ -489,6 +525,28 @@ impl TrackConsumer {
489525 conducer:: wait ( |waiter| self . poll_next_group_ordered ( waiter) ) . await
490526 }
491527
528+ /// A helper that calls [`Self::poll_next_group_ordered`] and returns its first frame,
529+ /// skipping the rest of the group. Intended for single-frame groups (see
530+ /// [`TrackProducer::write_frame`]).
531+ pub fn poll_read_frame ( & mut self , waiter : & conducer:: Waiter ) -> Poll < Result < Option < bytes:: Bytes > > > {
532+ let Some ( ( frame, found_index, sequence) ) = ready ! ( self . poll( waiter, |state| {
533+ state. poll_read_frame( self . index, self . next_sequence, waiter)
534+ } ) ?) else {
535+ return Poll :: Ready ( Ok ( None ) ) ;
536+ } ;
537+
538+ self . index = found_index + 1 ;
539+ self . next_sequence = sequence. saturating_add ( 1 ) ;
540+ Poll :: Ready ( Ok ( Some ( frame) ) )
541+ }
542+
543+ /// Read a single full frame from the next group in sequence order.
544+ ///
545+ /// See [`Self::poll_read_frame`] for semantics.
546+ pub async fn read_frame ( & mut self ) -> Result < Option < bytes:: Bytes > > {
547+ conducer:: wait ( |waiter| self . poll_read_frame ( waiter) ) . await
548+ }
549+
492550 /// Poll for the group with the given sequence, without blocking.
493551 pub fn poll_get_group ( & self , waiter : & conducer:: Waiter , sequence : u64 ) -> Poll < Result < Option < GroupConsumer > > > {
494552 self . poll ( waiter, |state| state. poll_get_group ( sequence) )
@@ -938,6 +996,109 @@ mod test {
938996 assert_eq ! ( consumer. assert_group( ) . info. sequence, 3 ) ;
939997 }
940998
999+ #[ tokio:: test]
1000+ async fn read_frame_returns_single_frame_per_group ( ) {
1001+ let mut producer = Track :: new ( "test" ) . produce ( ) ;
1002+ let mut consumer = producer. consume ( ) ;
1003+
1004+ producer. write_frame ( b"hello" . as_slice ( ) ) . unwrap ( ) ;
1005+ producer. write_frame ( b"world" . as_slice ( ) ) . unwrap ( ) ;
1006+
1007+ let frame = consumer
1008+ . read_frame ( )
1009+ . now_or_never ( )
1010+ . expect ( "should not block" )
1011+ . expect ( "would have errored" )
1012+ . expect ( "track should not be closed" ) ;
1013+ assert_eq ! ( & frame[ ..] , b"hello" ) ;
1014+
1015+ let frame = consumer
1016+ . read_frame ( )
1017+ . now_or_never ( )
1018+ . expect ( "should not block" )
1019+ . expect ( "would have errored" )
1020+ . expect ( "track should not be closed" ) ;
1021+ assert_eq ! ( & frame[ ..] , b"world" ) ;
1022+ }
1023+
1024+ #[ tokio:: test]
1025+ async fn read_frame_skips_stalled_group_for_newer_ready_frame ( ) {
1026+ let mut producer = Track :: new ( "test" ) . produce ( ) ;
1027+ let mut consumer = producer. consume ( ) ;
1028+
1029+ // Seq 3: group open, no frame yet (stalled).
1030+ let _stalled = producer. create_group ( Group { sequence : 3 } ) . unwrap ( ) ;
1031+ // Seq 5: fully-written group with a frame.
1032+ let mut g5 = producer. create_group ( Group { sequence : 5 } ) . unwrap ( ) ;
1033+ g5. write_frame ( bytes:: Bytes :: from_static ( b"later" ) ) . unwrap ( ) ;
1034+ g5. finish ( ) . unwrap ( ) ;
1035+
1036+ // read_frame should not block on the stalled seq 3 — it returns seq 5's frame.
1037+ let frame = consumer
1038+ . read_frame ( )
1039+ . now_or_never ( )
1040+ . expect ( "should not block on stalled earlier group" )
1041+ . expect ( "would have errored" )
1042+ . expect ( "track should not be closed" ) ;
1043+ assert_eq ! ( & frame[ ..] , b"later" ) ;
1044+ }
1045+
1046+ #[ tokio:: test]
1047+ async fn read_frame_discards_rest_of_multi_frame_group ( ) {
1048+ let mut producer = Track :: new ( "test" ) . produce ( ) ;
1049+ let mut consumer = producer. consume ( ) ;
1050+
1051+ // Group 0 has two frames; only the first is returned.
1052+ let mut g0 = producer. create_group ( Group { sequence : 0 } ) . unwrap ( ) ;
1053+ g0. write_frame ( bytes:: Bytes :: from_static ( b"one" ) ) . unwrap ( ) ;
1054+ g0. write_frame ( bytes:: Bytes :: from_static ( b"two" ) ) . unwrap ( ) ;
1055+ g0. finish ( ) . unwrap ( ) ;
1056+
1057+ // Group 1 is a normal single-frame group.
1058+ producer. write_frame ( b"next" . as_slice ( ) ) . unwrap ( ) ;
1059+
1060+ let frame = consumer
1061+ . read_frame ( )
1062+ . now_or_never ( )
1063+ . expect ( "should not block" )
1064+ . expect ( "would have errored" )
1065+ . expect ( "track should not be closed" ) ;
1066+ assert_eq ! ( & frame[ ..] , b"one" ) ;
1067+
1068+ // The second frame of group 0 is discarded; the next read jumps to group 1.
1069+ let frame = consumer
1070+ . read_frame ( )
1071+ . now_or_never ( )
1072+ . expect ( "should not block" )
1073+ . expect ( "would have errored" )
1074+ . expect ( "track should not be closed" ) ;
1075+ assert_eq ! ( & frame[ ..] , b"next" ) ;
1076+ }
1077+
1078+ #[ tokio:: test]
1079+ async fn read_frame_returns_none_when_finished ( ) {
1080+ let mut producer = Track :: new ( "test" ) . produce ( ) ;
1081+ let mut consumer = producer. consume ( ) ;
1082+
1083+ producer. write_frame ( b"only" . as_slice ( ) ) . unwrap ( ) ;
1084+ producer. finish ( ) . unwrap ( ) ;
1085+
1086+ let frame = consumer
1087+ . read_frame ( )
1088+ . now_or_never ( )
1089+ . expect ( "should not block" )
1090+ . expect ( "would have errored" )
1091+ . expect ( "track should not be closed" ) ;
1092+ assert_eq ! ( & frame[ ..] , b"only" ) ;
1093+
1094+ let done = consumer
1095+ . read_frame ( )
1096+ . now_or_never ( )
1097+ . expect ( "should not block" )
1098+ . expect ( "would have errored" ) ;
1099+ assert ! ( done. is_none( ) ) ;
1100+ }
1101+
9411102 #[ tokio:: test]
9421103 async fn get_group_finishes_without_waiting_for_gaps ( ) {
9431104 let mut producer = Track :: new ( "test" ) . produce ( ) ;
0 commit comments