From caf6941d8c0bca0837c5fa262a786e8d0baee548 Mon Sep 17 00:00:00 2001 From: Ned Anderson Date: Tue, 28 Apr 2026 12:09:45 -0400 Subject: [PATCH 1/8] perf: allow multiple DATA frames per write --- src/codec/framed_write.rs | 288 ++++++++++++++++++++++---------- src/codec/mod.rs | 12 +- src/proto/streams/prioritize.rs | 116 +++++-------- src/proto/streams/stream.rs | 15 +- 4 files changed, 265 insertions(+), 166 deletions(-) diff --git a/src/codec/framed_write.rs b/src/codec/framed_write.rs index 17e557623..6a7c128aa 100644 --- a/src/codec/framed_write.rs +++ b/src/codec/framed_write.rs @@ -4,12 +4,13 @@ use crate::frame::{self, Frame, FrameSize}; use crate::hpack; use bytes::{Buf, BufMut, BytesMut}; +use std::collections::VecDeque; use std::pin::Pin; use std::task::{Context, Poll}; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; -use tokio_util::io::poll_write_buf; -use std::io::{self, Cursor}; +use std::io::{self, Cursor, IoSlice}; +use std::ops::ControlFlow; // A macro to get around a method needing to borrow &mut self macro_rules! limited_write_buf { @@ -38,11 +39,11 @@ struct Encoder { /// TODO: Should this be a ring buffer? buf: Cursor, - /// Next frame to encode - next: Option>, + /// Vector of buffer data and data frames to send next + next_vec: VecDeque>, - /// Last data frame - last_data_frame: Option>, + /// Next continuation frame to encode + next_continuation: Option, /// Max frame size, this is specified by the peer max_frame_size: FrameSize, @@ -55,9 +56,11 @@ struct Encoder { } #[derive(Debug)] -enum Next { - Data(frame::Data), - Continuation(frame::Continuation), +struct BufElement { + /// Number of bytes in the buffer that should be written before the next data frame payload. + buf_len: usize, + /// Data frame of the payload that should be written as part of this buffer element. + data_frame: frame::Data, } /// Initialize the connection with this amount of write buffer. @@ -76,6 +79,8 @@ const CHAIN_THRESHOLD: usize = 256; /// fragmented data being sent, and hereby improve the throughput. const CHAIN_THRESHOLD_WITHOUT_VECTORED_IO: usize = 1024; +const MAX_VECTORED_IO_COUNT: usize = 1024; + // TODO: Make generic impl FramedWrite where @@ -83,10 +88,10 @@ where B: Buf, { pub fn new(inner: T) -> FramedWrite { - let chain_threshold = if inner.is_write_vectored() { - CHAIN_THRESHOLD + let (chain_threshold, next_vec_capacity) = if inner.is_write_vectored() { + (CHAIN_THRESHOLD, MAX_VECTORED_IO_COUNT / 2) } else { - CHAIN_THRESHOLD_WITHOUT_VECTORED_IO + (CHAIN_THRESHOLD_WITHOUT_VECTORED_IO, 1) }; FramedWrite { inner, @@ -94,8 +99,8 @@ where encoder: Encoder { hpack: hpack::Encoder::default(), buf: Cursor::new(BytesMut::with_capacity(DEFAULT_BUFFER_CAPACITY)), - next: None, - last_data_frame: None, + next_vec: VecDeque::with_capacity(next_vec_capacity), + next_continuation: None, max_frame_size: frame::DEFAULT_MAX_FRAME_SIZE, chain_threshold, min_buffer_capacity: chain_threshold + frame::HEADER_LEN, @@ -134,27 +139,16 @@ where let _e = span.enter(); loop { - while !self.encoder.is_empty() { - match self.encoder.next { - Some(Next::Data(ref mut frame)) => { - tracing::trace!(queued_data_frame = true); - let mut buf = (&mut self.encoder.buf).chain(frame.payload_mut()); - ready!(poll_write_buf(Pin::new(&mut self.inner), cx, &mut buf))? - } - _ => { - tracing::trace!(queued_data_frame = false); - ready!(poll_write_buf( - Pin::new(&mut self.inner), - cx, - &mut self.encoder.buf - ))? - } - }; + while ready!(poll_write_buf(Pin::new(&mut self.inner), cx, &mut self.encoder))?.is_continue() { } - match self.encoder.unset_frame() { - ControlFlow::Continue => (), - ControlFlow::Break => break, + self.encoder.reclaim_empty_buffer(); + + if let Some(frame) = self.encoder.next_continuation.take() { + let mut buf = limited_write_buf!(self.encoder); + self.encoder.next_continuation = frame.encode(&mut buf); + } else { + break; } } @@ -175,38 +169,15 @@ where } } -#[must_use] -enum ControlFlow { - Continue, - Break, -} - impl Encoder where B: Buf, { - fn unset_frame(&mut self) -> ControlFlow { - // Clear internal buffer - self.buf.set_position(0); - self.buf.get_mut().clear(); - - // The data frame has been written, so unset it - match self.next.take() { - Some(Next::Data(frame)) => { - self.last_data_frame = Some(frame); - debug_assert!(self.is_empty()); - ControlFlow::Break - } - Some(Next::Continuation(frame)) => { - // Buffer the continuation frame, then try to write again - let mut buf = limited_write_buf!(self); - if let Some(continuation) = frame.encode(&mut buf) { - self.next = Some(Next::Continuation(continuation)); - } - ControlFlow::Continue - } - None => ControlFlow::Break, - } + fn reclaim_empty_buffer(&mut self) { + assert_eq!(self.buf.position(), 0); + assert_eq!(self.buf.remaining(), 0); + let buf = self.buf.get_mut(); + let _ = buf.try_reclaim(buf.capacity() + 1); } fn buffer(&mut self, item: Frame) -> Result<(), UserError> { @@ -225,7 +196,7 @@ where if len > self.max_frame_size() { return Err(PayloadTooBig); } - + let mut buf_len_to_push = 0; if len >= self.chain_threshold { let head = v.head(); @@ -237,30 +208,26 @@ where self.buf.get_mut().put(v.payload_mut().take(extra_bytes)); } - // Save the data frame - self.next = Some(Next::Data(v)); + buf_len_to_push = self.buf.remaining(); + self.buf.advance(buf_len_to_push); } else { v.encode_chunk(self.buf.get_mut()); // The chunk has been fully encoded, so there is no need to // keep it around assert_eq!(v.payload().remaining(), 0, "chunk not fully encoded"); - - // Save off the last frame... - self.last_data_frame = Some(v); } + + // Push the most recent data frame... + self.next_vec.push_back(BufElement { buf_len: buf_len_to_push, data_frame: v }); } Frame::Headers(v) => { let mut buf = limited_write_buf!(self); - if let Some(continuation) = v.encode(&mut self.hpack, &mut buf) { - self.next = Some(Next::Continuation(continuation)); - } + self.next_continuation = v.encode(&mut self.hpack, &mut buf); } Frame::PushPromise(v) => { let mut buf = limited_write_buf!(self); - if let Some(continuation) = v.encode(&mut self.hpack, &mut buf) { - self.next = Some(Next::Continuation(continuation)); - } + self.next_continuation = v.encode(&mut self.hpack, &mut buf); } Frame::Settings(v) => { v.encode(self.buf.get_mut()); @@ -296,17 +263,11 @@ where } fn has_capacity(&self) -> bool { - self.next.is_none() + self.next_continuation.is_none() + && self.next_vec.len() < self.next_vec.capacity() && (self.buf.get_ref().capacity() - self.buf.get_ref().len() >= self.min_buffer_capacity) } - - fn is_empty(&self) -> bool { - match self.next { - Some(Next::Data(ref frame)) => !frame.payload().has_remaining(), - _ => !self.buf.has_remaining(), - } - } } impl Encoder { @@ -315,6 +276,109 @@ impl Encoder { } } +impl Buf for Encoder { + fn remaining(&self) -> usize { + let mut n = self.buf.get_ref().len(); + for next in self.next_vec.iter() { + n = n.saturating_add(next.data_frame.payload().remaining()); + } + n + } + + fn chunk(&self) -> &[u8] { + for next in self.next_vec.iter() { + if next.buf_len > 0 { + return &self.buf.get_ref()[..next.buf_len]; + } + let slice = next.data_frame.payload().chunk(); + if slice.len() > 0 { + return slice; + } + } + return &*self.buf.get_ref(); + } + + fn advance(&mut self, mut n: usize) { + for next in self.next_vec.iter_mut() { + if next.buf_len > 0 { + let i = n.min(next.buf_len); + self.buf.get_mut().advance(i); + self.buf.set_position(self.buf.position().checked_sub(i as u64).unwrap()); + n -= i; + next.buf_len -= i; + if next.buf_len > 0 { + return; + } + } + let rem = next.data_frame.payload().remaining(); + if rem > 0 { + let i = n.min(rem); + n -= i; + next.data_frame.payload_mut().advance(i); + if i < rem { + return; + } + } + } + if n > 0 { + self.buf.get_mut().advance(n); + } + } + + fn chunks_vectored<'a>(&'a self, dst: &mut [IoSlice<'a>]) -> usize { + let mut n = 0; + let mut buf_index = 0; + let mut vec_iter = self.next_vec.iter(); + while n < dst.len() { + if let Some(next) = vec_iter.next() { + if next.buf_len > 0 { + let buf_end = buf_index + next.buf_len; + dst[n] = IoSlice::new(&self.buf.get_ref()[buf_index..buf_end]); + buf_index = buf_end; + n += 1; + if n == dst.len() { + break; + } + } + let mut rem = next.data_frame.payload().remaining(); + if rem > 0 { + let n0 = n; + n = n.wrapping_add(next.data_frame.payload().chunks_vectored(&mut dst[n..])); + assert!(n0 <= n && n <= dst.len()); + if rem < usize::MAX { + for s in &dst[n0..n] { + rem = rem.saturating_sub(s.len()); + } + } + if rem > 0 { + break; + } + } + assert!(n <= dst.len()); + } else { + if self.buf.get_ref().len() > buf_index { + dst[n] = IoSlice::new(&self.buf.get_ref()[buf_index..]); + n += 1; + } + break; + } + } + n + } + + fn has_remaining(&self) -> bool { + if self.buf.get_ref().len() > 0 { + return true; + } + for next in self.next_vec.iter() { + if next.data_frame.payload().has_remaining() { + return true; + } + } + false + } +} + impl FramedWrite { /// Returns the max frame size that can be sent pub fn max_frame_size(&self) -> usize { @@ -332,16 +396,18 @@ impl FramedWrite { self.encoder.hpack.update_max_size(val); } - /// Retrieve the last data frame that has been sent - pub fn take_last_data_frame(&mut self) -> Option> { - self.encoder.last_data_frame.take() - } - pub fn get_mut(&mut self) -> &mut T { &mut self.inner } } +impl FramedWrite { + /// Take back data frames that have been buffered and/or fully written. + pub fn take_used_data_frames(&mut self) -> impl Iterator> + use<'_, T, B> { + UsedDataFrameTaker { vec: &mut self.encoder.next_vec, index: 0 } + } +} + impl AsyncRead for FramedWrite { fn poll_read( mut self: Pin<&mut Self>, @@ -365,3 +431,53 @@ mod unstable { } } } + +fn poll_write_buf( + io: Pin<&mut T>, + cx: &mut Context<'_>, + buf: &mut B, +) -> Poll>> { + let n = if io.is_write_vectored() { + let mut slices = [IoSlice::new(&[]); MAX_VECTORED_IO_COUNT]; + let cnt = buf.chunks_vectored(&mut slices); + if cnt == 0 { + return Poll::Ready(Ok(ControlFlow::Break(0))); + } + ready!(io.poll_write_vectored(cx, &slices[..cnt]))? + } else { + let slice = buf.chunk(); + if slice.len() == 0 { + return Poll::Ready(Ok(ControlFlow::Break(0))); + } + ready!(io.poll_write(cx, slice))? + }; + + buf.advance(n); + + Poll::Ready(Ok(ControlFlow::Continue(n))) +} + +struct UsedDataFrameTaker<'a, B> { + vec: &'a mut VecDeque>, + index: usize, +} + +impl<'a, B: Buf> Iterator for UsedDataFrameTaker<'a, B> { + type Item = frame::Data; + + #[inline] + fn next(&mut self) -> Option { + while let Some(item) = self.vec.get(self.index) { + if item.buf_len == 0 && !item.data_frame.payload().has_remaining() { + return self.vec.remove(self.index).map(|x| x.data_frame); + } + self.index += 1; + } + None + } + + #[inline] + fn size_hint(&self) -> (usize, Option) { + (0, Some(self.vec.len())) + } +} diff --git a/src/codec/mod.rs b/src/codec/mod.rs index 6cbdc1e18..01836b75d 100644 --- a/src/codec/mod.rs +++ b/src/codec/mod.rs @@ -116,16 +116,18 @@ impl Codec { self.inner.get_mut().get_mut() } - /// Takes the data payload value that was fully written to the socket - pub(crate) fn take_last_data_frame(&mut self) -> Option> { - self.framed_write().take_last_data_frame() - } - fn framed_write(&mut self) -> &mut FramedWrite { self.inner.get_mut() } } +impl Codec { + /// Take back data frames that have been buffered and/or fully written. + pub(crate) fn take_used_data_frames(&mut self) -> impl Iterator> + use<'_, T, B> { + self.framed_write().take_used_data_frames() + } +} + impl Codec where T: AsyncWrite + Unpin, diff --git a/src/proto/streams/prioritize.rs b/src/proto/streams/prioritize.rs index 75358d473..e2c3681fb 100644 --- a/src/proto/streams/prioritize.rs +++ b/src/proto/streams/prioritize.rs @@ -9,7 +9,8 @@ use crate::codec::UserError::*; use bytes::buf::Take; use std::{ cmp::{self, Ordering}, - fmt, io, mem, + fmt, io, + ops::ControlFlow, task::{Context, Poll, Waker}, }; @@ -51,31 +52,15 @@ pub(super) struct Prioritize { /// Stream ID of the last stream opened. last_opened_id: StreamId, - /// What `DATA` frame is currently being sent in the codec. - in_flight_data_frame: InFlightData, - /// The maximum amount of bytes a stream should buffer. max_buffer_size: usize, } -#[derive(Debug, Eq, PartialEq)] -enum InFlightData { - /// There is no `DATA` frame in flight. - Nothing, - /// There is a `DATA` frame in flight belonging to the given stream. - DataFrame(store::Key), - /// There was a `DATA` frame, but the stream's queue was since cleared. - Drop, -} - pub(crate) struct Prioritized { // The buffer inner: Take, end_of_stream: bool, - - // The stream that this is associated with - stream: store::Key, } // ===== impl Prioritize ===== @@ -99,7 +84,6 @@ impl Prioritize { pending_open: store::Queue::new(), flow, last_opened_id: StreamId::ZERO, - in_flight_data_frame: InFlightData::Nothing, max_buffer_size: config.local_max_buffer_size, } } @@ -493,18 +477,7 @@ impl Prioritize { // If data is buffered and the stream is send ready, then // schedule the stream for execution if stream.buffered_send_data > 0 && stream.is_send_ready() { - // TODO: This assertion isn't *exactly* correct. There can still be - // buffered send data while the stream's pending send queue is - // empty. This can happen when a large data frame is in the process - // of being **partially** sent. Once the window has been sent, the - // data frame will be returned to the prioritization layer to be - // re-scheduled. - // - // That said, it would be nice to figure out how to make this - // assertion correctly. - // - // debug_assert!(!stream.pending_send.is_empty()); - + debug_assert!(!stream.pending_send.is_empty()); self.pending_send.push(stream); } } @@ -524,8 +497,8 @@ impl Prioritize { // Ensure codec is ready ready!(dst.poll_ready(cx))?; - // Reclaim any frame that has previously been written - self.reclaim_frame(buffer, store, dst); + // Reclaim any frames that have previously been written + self.reclaim_frames(buffer, store, dst); // The max frame length let max_frame_len = dst.max_send_frame_size(); @@ -542,24 +515,20 @@ impl Prioritize { Some(frame) => { tracing::trace!(?frame, "writing"); - debug_assert_eq!(self.in_flight_data_frame, InFlightData::Nothing); - if let Frame::Data(ref frame) = frame { - self.in_flight_data_frame = InFlightData::DataFrame(frame.payload().stream); - } dst.buffer(frame).expect("invalid frame"); // Ensure the codec is ready to try the loop again. ready!(dst.poll_ready(cx))?; // Because, always try to reclaim... - self.reclaim_frame(buffer, store, dst); + self.reclaim_frames(buffer, store, dst); } None => { // Try to flush the codec. ready!(dst.flush(cx))?; - // This might release a data frame... - if !self.reclaim_frame(buffer, store, dst) { + // This might release data frames... + if !self.reclaim_frames(buffer, store, dst) { return Poll::Ready(Ok(())); } @@ -577,7 +546,7 @@ impl Prioritize { /// When a data frame is written to the codec, it may not be written in its /// entirety (large chunks are split up into potentially many data frames). /// In this case, the stream needs to be reprioritized. - fn reclaim_frame( + fn reclaim_frames( &mut self, buffer: &mut Buffer>, store: &mut Store, @@ -589,12 +558,14 @@ impl Prioritize { let span = tracing::trace_span!("try_reclaim_frame"); let _e = span.enter(); + let mut ret = false; + // First check if there are any data chunks to take back - if let Some(frame) = dst.take_last_data_frame() { - self.reclaim_frame_inner(buffer, store, frame) - } else { - false + for frame in dst.take_used_data_frames() { + ret |= self.reclaim_frame_inner(buffer, store, frame); } + + ret } fn reclaim_frame_inner( @@ -612,36 +583,29 @@ impl Prioritize { "reclaimed" ); - let mut eos = false; - let key = frame.payload().stream; - - match mem::replace(&mut self.in_flight_data_frame, InFlightData::Nothing) { - InFlightData::Nothing => panic!("wasn't expecting a frame to reclaim"), - InFlightData::Drop => { - tracing::trace!("not reclaiming frame for cancelled stream"); - return false; - } - InFlightData::DataFrame(k) => { - debug_assert_eq!(k, key); - } - } - + let eos = frame.payload().end_of_stream; let mut frame = frame.map(|prioritized| { // TODO: Ensure fully written - eos = prioritized.end_of_stream; prioritized.inner.into_inner() }); if frame.payload().has_remaining() { - let mut stream = store.resolve(key); - - if eos { - frame.set_end_stream(true); + if let Some(mut stream) = store.find_mut(&frame.stream_id()) { + match stream.in_flight_partial_send { + Some(ControlFlow::Continue(())) => { + stream.in_flight_partial_send = None; + if eos { + frame.set_end_stream(true); + } + self.push_back_frame(frame.into(), buffer, &mut stream); + return true; + } + Some(ControlFlow::Break(())) => tracing::trace!("not reclaiming frame for cancelled stream"), + None => panic!("wasn't expecting a frame to reclaim"), + } + } else { + tracing::debug!(?frame, "not reclaiming frame for stream not found"); } - - self.push_back_frame(frame.into(), buffer, &mut stream); - - return true; } false @@ -676,11 +640,8 @@ impl Prioritize { stream.buffered_send_data = 0; stream.requested_send_capacity = 0; - if let InFlightData::DataFrame(key) = self.in_flight_data_frame { - if stream.key() == key { - // This stream could get cleaned up now - don't allow the buffered frame to get reclaimed. - self.in_flight_data_frame = InFlightData::Drop; - } + if stream.in_flight_partial_send == Some(ControlFlow::Continue(())) { + stream.in_flight_partial_send = Some(ControlFlow::Break(())); } } @@ -720,6 +681,14 @@ impl Prioritize { let span = tracing::trace_span!("popped", ?stream.id, ?stream.state); let _e = span.enter(); + if stream.in_flight_partial_send == Some(ControlFlow::Continue(())) { + tracing::trace!( + "stream has an in-flight partial send data frame \ + that needs to be reclaimed before proceeding" + ); + continue; + } + // It's possible that this stream, besides having data to send, // is also queued to send a reset, and thus is already in the queue // to wait for "some time" after a reset. @@ -813,6 +782,7 @@ impl Prioritize { if frame.payload().remaining() > len { frame.set_end_stream(false); + stream.in_flight_partial_send = Some(ControlFlow::Continue(())); } (eos, len) }); @@ -820,7 +790,6 @@ impl Prioritize { Frame::Data(frame.map(|buf| Prioritized { inner: buf.take(len), end_of_stream: eos, - stream: stream.key(), })) } Some(Frame::PushPromise(pp)) => { @@ -939,7 +908,6 @@ impl fmt::Debug for Prioritized { fmt.debug_struct("Prioritized") .field("remaining", &self.inner.get_ref().remaining()) .field("end_of_stream", &self.end_of_stream) - .field("stream", &self.stream) .finish() } } diff --git a/src/proto/streams/stream.rs b/src/proto/streams/stream.rs index f522f5d5d..25ae5dc77 100644 --- a/src/proto/streams/stream.rs +++ b/src/proto/streams/stream.rs @@ -5,6 +5,7 @@ use super::*; use std::fmt; use std::task::{Context, Waker}; use std::time::Instant; +use std::ops::ControlFlow; /// Tracks Stream related state /// @@ -73,6 +74,11 @@ pub(super) struct Stream { /// Set to true when a push is pending for this stream pub is_pending_push: bool, + /// Set to Some(_) when a data frame is in the process of being partially sent. + /// Some(ControlFlow::Continue) means that the rest of the data frame should still be sent. + /// Some(ControlFlow::Break) means that the rest of the data frame should NOT be sent. + pub in_flight_partial_send: Option>, + // ===== Fields related to receiving ===== /// Next node in the accept linked list pub next_pending_accept: Option, @@ -178,6 +184,7 @@ impl Stream { is_pending_open: false, next_open: None, is_pending_push: false, + in_flight_partial_send: None, // ===== Fields related to receiving ===== next_pending_accept: None, @@ -232,7 +239,12 @@ impl Stream { // This is different from the "open" check because reserved streams don't count // toward the concurrency limit. // See https://httpwg.org/specs/rfc7540.html#rfc.section.5.1.2 - !self.is_pending_open && !self.is_pending_push + // + // With in_flight_partial_send, we track whether a data frame is in the process of being partially sent. + // If so, we should wait until the last part of that data frame is encoded before sending any other frames for this stream. + !self.is_pending_open + && !self.is_pending_push + && self.in_flight_partial_send != Some(ControlFlow::Continue(())) } /// Returns true if the stream is closed @@ -418,6 +430,7 @@ impl fmt::Debug for Stream { .h2_field_some("next_open", &self.next_open) .h2_field_if("is_pending_open", &self.is_pending_open) .h2_field_if("is_pending_push", &self.is_pending_push) + .h2_field_some("in_flight_partial_send", &self.in_flight_partial_send) .h2_field_some("next_pending_accept", &self.next_pending_accept) .h2_field_if("is_pending_accept", &self.is_pending_accept) .field("recv_flow", &self.recv_flow) From f56ee72439926a95f32a85ec21be997d336f359a Mon Sep 17 00:00:00 2001 From: Ned Anderson Date: Wed, 6 May 2026 21:33:48 -0400 Subject: [PATCH 2/8] fix formatting --- src/codec/framed_write.rs | 26 ++++++++++++++++++++------ src/codec/mod.rs | 4 +++- src/proto/streams/prioritize.rs | 7 +++++-- src/proto/streams/stream.rs | 2 +- 4 files changed, 29 insertions(+), 10 deletions(-) diff --git a/src/codec/framed_write.rs b/src/codec/framed_write.rs index 6a7c128aa..75322de8a 100644 --- a/src/codec/framed_write.rs +++ b/src/codec/framed_write.rs @@ -139,8 +139,13 @@ where let _e = span.enter(); loop { - while ready!(poll_write_buf(Pin::new(&mut self.inner), cx, &mut self.encoder))?.is_continue() { - } + while ready!(poll_write_buf( + Pin::new(&mut self.inner), + cx, + &mut self.encoder + ))? + .is_continue() + {} self.encoder.reclaim_empty_buffer(); @@ -219,7 +224,10 @@ where } // Push the most recent data frame... - self.next_vec.push_back(BufElement { buf_len: buf_len_to_push, data_frame: v }); + self.next_vec.push_back(BufElement { + buf_len: buf_len_to_push, + data_frame: v, + }); } Frame::Headers(v) => { let mut buf = limited_write_buf!(self); @@ -303,7 +311,8 @@ impl Buf for Encoder { if next.buf_len > 0 { let i = n.min(next.buf_len); self.buf.get_mut().advance(i); - self.buf.set_position(self.buf.position().checked_sub(i as u64).unwrap()); + self.buf + .set_position(self.buf.position().checked_sub(i as u64).unwrap()); n -= i; next.buf_len -= i; if next.buf_len > 0 { @@ -403,8 +412,13 @@ impl FramedWrite { impl FramedWrite { /// Take back data frames that have been buffered and/or fully written. - pub fn take_used_data_frames(&mut self) -> impl Iterator> + use<'_, T, B> { - UsedDataFrameTaker { vec: &mut self.encoder.next_vec, index: 0 } + pub fn take_used_data_frames( + &mut self, + ) -> impl Iterator> + use<'_, T, B> { + UsedDataFrameTaker { + vec: &mut self.encoder.next_vec, + index: 0, + } } } diff --git a/src/codec/mod.rs b/src/codec/mod.rs index 01836b75d..e6589e886 100644 --- a/src/codec/mod.rs +++ b/src/codec/mod.rs @@ -123,7 +123,9 @@ impl Codec { impl Codec { /// Take back data frames that have been buffered and/or fully written. - pub(crate) fn take_used_data_frames(&mut self) -> impl Iterator> + use<'_, T, B> { + pub(crate) fn take_used_data_frames( + &mut self, + ) -> impl Iterator> + use<'_, T, B> { self.framed_write().take_used_data_frames() } } diff --git a/src/proto/streams/prioritize.rs b/src/proto/streams/prioritize.rs index e2c3681fb..36ecbffe0 100644 --- a/src/proto/streams/prioritize.rs +++ b/src/proto/streams/prioritize.rs @@ -600,7 +600,9 @@ impl Prioritize { self.push_back_frame(frame.into(), buffer, &mut stream); return true; } - Some(ControlFlow::Break(())) => tracing::trace!("not reclaiming frame for cancelled stream"), + Some(ControlFlow::Break(())) => { + tracing::trace!("not reclaiming frame for cancelled stream"); + } None => panic!("wasn't expecting a frame to reclaim"), } } else { @@ -782,7 +784,8 @@ impl Prioritize { if frame.payload().remaining() > len { frame.set_end_stream(false); - stream.in_flight_partial_send = Some(ControlFlow::Continue(())); + stream.in_flight_partial_send = + Some(ControlFlow::Continue(())); } (eos, len) }); diff --git a/src/proto/streams/stream.rs b/src/proto/streams/stream.rs index 25ae5dc77..0f7811139 100644 --- a/src/proto/streams/stream.rs +++ b/src/proto/streams/stream.rs @@ -3,9 +3,9 @@ use crate::Reason; use super::*; use std::fmt; +use std::ops::ControlFlow; use std::task::{Context, Waker}; use std::time::Instant; -use std::ops::ControlFlow; /// Tracks Stream related state /// From 8c006353f9078f4a6af7a3a253d7b90e373682ea Mon Sep 17 00:00:00 2001 From: Ned Anderson Date: Wed, 13 May 2026 15:50:42 -0400 Subject: [PATCH 3/8] remove previously re-introduced debug assertion --- src/proto/streams/prioritize.rs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/proto/streams/prioritize.rs b/src/proto/streams/prioritize.rs index 36ecbffe0..4536cd84c 100644 --- a/src/proto/streams/prioritize.rs +++ b/src/proto/streams/prioritize.rs @@ -477,7 +477,15 @@ impl Prioritize { // If data is buffered and the stream is send ready, then // schedule the stream for execution if stream.buffered_send_data > 0 && stream.is_send_ready() { - debug_assert!(!stream.pending_send.is_empty()); + // TODO: This assertion isn't *exactly* correct. There can still be + // buffered send data while the stream's pending send queue is + // empty and the stream is send ready. This can happen when + // try_assign_capacity is called from send_data. + // + // That said, it would be nice to figure out how to make this + // assertion correctly. + // + // debug_assert!(!stream.pending_send.is_empty()); self.pending_send.push(stream); } } From 6d39ee7cae331a12635765774321966a0ca5637c Mon Sep 17 00:00:00 2001 From: Ned Anderson Date: Wed, 13 May 2026 16:00:48 -0400 Subject: [PATCH 4/8] return use of stream key --- src/proto/streams/prioritize.rs | 32 +++++++++++++++++--------------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/src/proto/streams/prioritize.rs b/src/proto/streams/prioritize.rs index 4536cd84c..a8f489adf 100644 --- a/src/proto/streams/prioritize.rs +++ b/src/proto/streams/prioritize.rs @@ -61,6 +61,9 @@ pub(crate) struct Prioritized { inner: Take, end_of_stream: bool, + + // The stream that this is associated with + stream: store::Key, } // ===== impl Prioritize ===== @@ -591,6 +594,7 @@ impl Prioritize { "reclaimed" ); + let key = frame.payload().stream; let eos = frame.payload().end_of_stream; let mut frame = frame.map(|prioritized| { // TODO: Ensure fully written @@ -598,23 +602,19 @@ impl Prioritize { }); if frame.payload().has_remaining() { - if let Some(mut stream) = store.find_mut(&frame.stream_id()) { - match stream.in_flight_partial_send { - Some(ControlFlow::Continue(())) => { - stream.in_flight_partial_send = None; - if eos { - frame.set_end_stream(true); - } - self.push_back_frame(frame.into(), buffer, &mut stream); - return true; - } - Some(ControlFlow::Break(())) => { - tracing::trace!("not reclaiming frame for cancelled stream"); + let mut stream = store.resolve(key); + match stream.in_flight_partial_send.take() { + Some(ControlFlow::Continue(())) => { + if eos { + frame.set_end_stream(true); } - None => panic!("wasn't expecting a frame to reclaim"), + self.push_back_frame(frame.into(), buffer, &mut stream); + return true; } - } else { - tracing::debug!(?frame, "not reclaiming frame for stream not found"); + Some(ControlFlow::Break(())) => { + tracing::trace!("not reclaiming frame for cancelled stream"); + } + None => panic!("wasn't expecting a frame to reclaim"), } } @@ -801,6 +801,7 @@ impl Prioritize { Frame::Data(frame.map(|buf| Prioritized { inner: buf.take(len), end_of_stream: eos, + stream: stream.key(), })) } Some(Frame::PushPromise(pp)) => { @@ -919,6 +920,7 @@ impl fmt::Debug for Prioritized { fmt.debug_struct("Prioritized") .field("remaining", &self.inner.get_ref().remaining()) .field("end_of_stream", &self.end_of_stream) + .field("stream", &self.stream) .finish() } } From ecab674f6452aebb613d3e00926ac29b5d05acc0 Mon Sep 17 00:00:00 2001 From: Ned Anderson Date: Wed, 13 May 2026 17:02:46 -0400 Subject: [PATCH 5/8] fix issue with old rust version --- src/codec/framed_write.rs | 4 +--- src/codec/mod.rs | 4 +--- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/src/codec/framed_write.rs b/src/codec/framed_write.rs index 75322de8a..5b7415ccb 100644 --- a/src/codec/framed_write.rs +++ b/src/codec/framed_write.rs @@ -412,9 +412,7 @@ impl FramedWrite { impl FramedWrite { /// Take back data frames that have been buffered and/or fully written. - pub fn take_used_data_frames( - &mut self, - ) -> impl Iterator> + use<'_, T, B> { + pub fn take_used_data_frames(&mut self) -> impl Iterator> + '_ { UsedDataFrameTaker { vec: &mut self.encoder.next_vec, index: 0, diff --git a/src/codec/mod.rs b/src/codec/mod.rs index e6589e886..6c42fbc37 100644 --- a/src/codec/mod.rs +++ b/src/codec/mod.rs @@ -123,9 +123,7 @@ impl Codec { impl Codec { /// Take back data frames that have been buffered and/or fully written. - pub(crate) fn take_used_data_frames( - &mut self, - ) -> impl Iterator> + use<'_, T, B> { + pub(crate) fn take_used_data_frames(&mut self) -> impl Iterator> + '_ { self.framed_write().take_used_data_frames() } } From 6da4f72e60fe7729c072a216a294686ebe050252 Mon Sep 17 00:00:00 2001 From: Ned Anderson Date: Wed, 13 May 2026 17:07:26 -0400 Subject: [PATCH 6/8] fix hang due to inappropriate pending return in poll_ready --- src/codec/framed_write.rs | 56 +++++++++++++++++++++++++++------------ 1 file changed, 39 insertions(+), 17 deletions(-) diff --git a/src/codec/framed_write.rs b/src/codec/framed_write.rs index 5b7415ccb..7c8b1cebb 100644 --- a/src/codec/framed_write.rs +++ b/src/codec/framed_write.rs @@ -113,16 +113,23 @@ where /// Calling this function may result in the current contents of the buffer /// to be flushed to `T`. pub fn poll_ready(&mut self, cx: &mut Context) -> Poll> { - if !self.encoder.has_capacity() { + if !self.encoder.has_vec_capacity() { + self.poll_ready_inner(cx) + } else { + Poll::Ready(Ok(())) + } + } + + #[cold] + fn poll_ready_inner(&mut self, cx: &mut Context) -> Poll> { + loop { // Try flushing - ready!(self.flush(cx))?; + ready!(self.flush_inner(cx, /* flush_all: */ false))?; - if !self.encoder.has_capacity() { - return Poll::Pending; + if self.encoder.has_capacity() { + return Poll::Ready(Ok(())); } } - - Poll::Ready(Ok(())) } /// Buffer a frame. @@ -135,7 +142,12 @@ where /// Flush buffered data to the wire pub fn flush(&mut self, cx: &mut Context) -> Poll> { - let span = tracing::trace_span!("FramedWrite::flush"); + self.flush_inner(cx, /* flush_all: */ true) + } + + #[inline] + fn flush_inner(&mut self, cx: &mut Context, flush_all: bool) -> Poll> { + let span = tracing::trace_span!("FramedWrite::flush", %flush_all); let _e = span.enter(); loop { @@ -145,9 +157,14 @@ where &mut self.encoder ))? .is_continue() + && (flush_all || !self.encoder.has_capacity()) {} - self.encoder.reclaim_empty_buffer(); + if flush_all { + assert_eq!(self.encoder.buf.position(), 0); + assert_eq!(self.encoder.buf.remaining(), 0); + } + self.encoder.reclaim_buffer(); if let Some(frame) = self.encoder.next_continuation.take() { let mut buf = limited_write_buf!(self.encoder); @@ -178,16 +195,15 @@ impl Encoder where B: Buf, { - fn reclaim_empty_buffer(&mut self) { - assert_eq!(self.buf.position(), 0); - assert_eq!(self.buf.remaining(), 0); + #[inline] + fn reclaim_buffer(&mut self) { let buf = self.buf.get_mut(); - let _ = buf.try_reclaim(buf.capacity() + 1); + let _ = buf.try_reclaim(buf.capacity() + buf.len() + 1); } fn buffer(&mut self, item: Frame) -> Result<(), UserError> { // Ensure that we have enough capacity to accept the write. - assert!(self.has_capacity()); + assert!(self.has_vec_capacity()); let span = tracing::trace_span!("FramedWrite::buffer", frame = ?item); let _e = span.enter(); @@ -271,6 +287,12 @@ where } fn has_capacity(&self) -> bool { + self.next_continuation.is_none() + && (self.buf.get_ref().capacity() - self.buf.get_ref().len() + >= self.min_buffer_capacity) + } + + fn has_vec_capacity(&self) -> bool { self.next_continuation.is_none() && self.next_vec.len() < self.next_vec.capacity() && (self.buf.get_ref().capacity() - self.buf.get_ref().len() @@ -299,11 +321,11 @@ impl Buf for Encoder { return &self.buf.get_ref()[..next.buf_len]; } let slice = next.data_frame.payload().chunk(); - if slice.len() > 0 { + if !slice.is_empty() { return slice; } } - return &*self.buf.get_ref(); + self.buf.get_ref() } fn advance(&mut self, mut n: usize) { @@ -376,7 +398,7 @@ impl Buf for Encoder { } fn has_remaining(&self) -> bool { - if self.buf.get_ref().len() > 0 { + if !self.buf.get_ref().is_empty() { return true; } for next in self.next_vec.iter() { @@ -458,7 +480,7 @@ fn poll_write_buf( ready!(io.poll_write_vectored(cx, &slices[..cnt]))? } else { let slice = buf.chunk(); - if slice.len() == 0 { + if slice.is_empty() { return Poll::Ready(Ok(ControlFlow::Break(0))); } ready!(io.poll_write(cx, slice))? From 4fde2a169212938fb7728746dc4c718f6ef4f719 Mon Sep 17 00:00:00 2001 From: Ned Anderson Date: Wed, 20 May 2026 04:07:22 -0400 Subject: [PATCH 7/8] fix buffer behavior & mock send --- src/codec/framed_write.rs | 6 +++++- src/codec/mod.rs | 2 +- tests/h2-support/src/mock.rs | 4 ++++ 3 files changed, 10 insertions(+), 2 deletions(-) diff --git a/src/codec/framed_write.rs b/src/codec/framed_write.rs index 7c8b1cebb..bddd10ba2 100644 --- a/src/codec/framed_write.rs +++ b/src/codec/framed_write.rs @@ -203,7 +203,7 @@ where fn buffer(&mut self, item: Frame) -> Result<(), UserError> { // Ensure that we have enough capacity to accept the write. - assert!(self.has_vec_capacity()); + assert!(self.has_capacity()); let span = tracing::trace_span!("FramedWrite::buffer", frame = ?item); let _e = span.enter(); @@ -211,13 +211,17 @@ where match item { Frame::Data(mut v) => { + assert!(self.has_vec_capacity()); + // Ensure that the payload is not greater than the max frame. let len = v.payload().remaining(); if len > self.max_frame_size() { return Err(PayloadTooBig); } + let mut buf_len_to_push = 0; + if len >= self.chain_threshold { let head = v.head(); diff --git a/src/codec/mod.rs b/src/codec/mod.rs index 6c42fbc37..04ec0be69 100644 --- a/src/codec/mod.rs +++ b/src/codec/mod.rs @@ -123,7 +123,7 @@ impl Codec { impl Codec { /// Take back data frames that have been buffered and/or fully written. - pub(crate) fn take_used_data_frames(&mut self) -> impl Iterator> + '_ { + pub fn take_used_data_frames(&mut self) -> impl Iterator> + '_ { self.framed_write().take_used_data_frames() } } diff --git a/tests/h2-support/src/mock.rs b/tests/h2-support/src/mock.rs index 9ec5ba379..810e1c2c9 100644 --- a/tests/h2-support/src/mock.rs +++ b/tests/h2-support/src/mock.rs @@ -117,6 +117,10 @@ impl Handle { p }) .await?; + + // Take the frame back from the codec + self.codec.take_used_data_frames().next(); + Ok(()) } From a1f880bc6b11d71c2880240117eb569db7ee3d0e Mon Sep 17 00:00:00 2001 From: Ned Anderson Date: Fri, 22 May 2026 16:35:32 -0400 Subject: [PATCH 8/8] improve send buffer reclamation --- src/codec/framed_write.rs | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/src/codec/framed_write.rs b/src/codec/framed_write.rs index bddd10ba2..997769f43 100644 --- a/src/codec/framed_write.rs +++ b/src/codec/framed_write.rs @@ -147,7 +147,7 @@ where #[inline] fn flush_inner(&mut self, cx: &mut Context, flush_all: bool) -> Poll> { - let span = tracing::trace_span!("FramedWrite::flush", %flush_all); + let span = tracing::trace_span!("FramedWrite::flush", all = %flush_all); let _e = span.enter(); loop { @@ -164,7 +164,9 @@ where assert_eq!(self.encoder.buf.position(), 0); assert_eq!(self.encoder.buf.remaining(), 0); } - self.encoder.reclaim_buffer(); + let capacity_before = self.encoder.buf_capacity(); + let _ = self.encoder.try_reclaim_buf(); + tracing::trace!(%capacity_before, capacity_after = %self.encoder.buf_capacity(), "try reclaim buffer"); if let Some(frame) = self.encoder.next_continuation.take() { let mut buf = limited_write_buf!(self.encoder); @@ -196,9 +198,15 @@ where B: Buf, { #[inline] - fn reclaim_buffer(&mut self) { - let buf = self.buf.get_mut(); - let _ = buf.try_reclaim(buf.capacity() + buf.len() + 1); + fn buf_capacity(&self) -> usize { + self.buf.get_ref().capacity() - self.buf.get_ref().len() + } + + #[inline] + fn try_reclaim_buf(&mut self) -> bool { + // Use the minimum value that can reclaim all of the buffer's original capacity. + let rem = self.buf_capacity(); + self.buf.get_mut().try_reclaim(rem + 1) } fn buffer(&mut self, item: Frame) -> Result<(), UserError> { @@ -291,16 +299,13 @@ where } fn has_capacity(&self) -> bool { - self.next_continuation.is_none() - && (self.buf.get_ref().capacity() - self.buf.get_ref().len() - >= self.min_buffer_capacity) + self.next_continuation.is_none() && self.buf_capacity() >= self.min_buffer_capacity } fn has_vec_capacity(&self) -> bool { self.next_continuation.is_none() && self.next_vec.len() < self.next_vec.capacity() - && (self.buf.get_ref().capacity() - self.buf.get_ref().len() - >= self.min_buffer_capacity) + && self.buf_capacity() >= self.min_buffer_capacity } }