StreamingPipeline {
) -> Result<(), Report> {
use flate2::read::ZlibDecoder;
use flate2::write::ZlibEncoder;
- use flate2::Compression;
let decoder = ZlibDecoder::new(input);
- let encoder = ZlibEncoder::new(output, Compression::default());
-
- self.process_through_compression(decoder, encoder)
+ let mut encoder = ZlibEncoder::new(output, flate2::Compression::default());
+ self.process_through_compression(decoder, &mut encoder)?;
+ encoder.finish().change_context(TrustedServerError::Proxy {
+ message: "Failed to finalize deflate encoder".to_string(),
+ })?;
+ Ok(())
}
/// Process deflate compressed input to uncompressed output (decompression only)
@@ -315,9 +317,11 @@ impl StreamingPipeline {
lgwin: 22,
..Default::default()
};
- let encoder = CompressorWriter::with_params(output, 4096, ¶ms);
-
- self.process_through_compression(decoder, encoder)
+ let mut encoder = CompressorWriter::with_params(output, 4096, ¶ms);
+ self.process_through_compression(decoder, &mut encoder)?;
+ // CompressorWriter finalizes on flush (already called) and into_inner
+ encoder.into_inner();
+ Ok(())
}
/// Process brotli compressed input to uncompressed output (decompression only)
@@ -332,10 +336,14 @@ impl StreamingPipeline {
}
/// Generic processing through compression layers
+ ///
+ /// The caller retains ownership of `encoder` and must call its
+ /// type-specific finalization method (e.g., `finish()` or `into_inner()`)
+ /// after this function returns successfully.
fn process_through_compression(
&mut self,
mut decoder: R,
- mut encoder: W,
+ encoder: &mut W,
) -> Result<(), Report> {
let mut buffer = vec![0u8; self.config.chunk_size];
@@ -380,15 +388,11 @@ impl StreamingPipeline {
}
}
- // Flush encoder (this also finishes compression)
encoder.flush().change_context(TrustedServerError::Proxy {
message: "Failed to flush encoder".to_string(),
})?;
- // For GzEncoder and similar, we need to finish() to properly close the stream
- // The flush above might not be enough
- drop(encoder);
-
+ // Caller owns encoder and must call finish() after this returns.
Ok(())
}
}
@@ -646,6 +650,58 @@ mod tests {
);
}
+ #[test]
+ fn test_deflate_round_trip_produces_valid_output() {
+ // Verify that deflate-to-deflate (which uses process_through_compression)
+ // produces valid output that decompresses correctly. This establishes the
+ // correctness contract before we change the finalization path.
+ use flate2::read::ZlibDecoder;
+ use flate2::write::ZlibEncoder;
+ use std::io::{Read as _, Write as _};
+
+ let input_data = b"
hello world";
+
+ // Compress input
+ let mut compressed_input = Vec::new();
+ {
+ let mut enc =
+ ZlibEncoder::new(&mut compressed_input, flate2::Compression::default());
+ enc.write_all(input_data)
+ .expect("should compress test input");
+ enc.finish().expect("should finish compression");
+ }
+
+ let replacer = StreamingReplacer::new(vec![Replacement {
+ find: "hello".to_string(),
+ replace_with: "hi".to_string(),
+ }]);
+
+ let config = PipelineConfig {
+ input_compression: Compression::Deflate,
+ output_compression: Compression::Deflate,
+ chunk_size: 8192,
+ };
+
+ let mut pipeline = StreamingPipeline::new(config, replacer);
+ let mut output = Vec::new();
+
+ pipeline
+ .process(&compressed_input[..], &mut output)
+ .expect("should process deflate-to-deflate");
+
+ // Decompress output and verify correctness
+ let mut decompressed = Vec::new();
+ ZlibDecoder::new(&output[..])
+ .read_to_end(&mut decompressed)
+ .expect("should decompress output — implies encoder was finalized correctly");
+
+ assert_eq!(
+ String::from_utf8(decompressed).expect("should be valid UTF-8"),
+ "hi world",
+ "should have replaced content through deflate round-trip"
+ );
+ }
+
#[test]
fn test_streaming_pipeline_with_html_rewriter() {
use lol_html::{element, Settings};
From a4fd5c69568fd815286e7c3946efd97472b62424 Mon Sep 17 00:00:00 2001
From: Aram Grigoryan <132480+aram356@users.noreply.github.com>
Date: Thu, 26 Mar 2026 09:13:06 -0700
Subject: [PATCH 08/29] Convert process_gzip_to_gzip to chunk-based processing
---
.../src/streaming_processor.rs | 85 ++++++++++++-------
1 file changed, 54 insertions(+), 31 deletions(-)
diff --git a/crates/trusted-server-core/src/streaming_processor.rs b/crates/trusted-server-core/src/streaming_processor.rs
index 50c595d9..accf80e2 100644
--- a/crates/trusted-server-core/src/streaming_processor.rs
+++ b/crates/trusted-server-core/src/streaming_processor.rs
@@ -187,40 +187,13 @@ impl StreamingPipeline {
) -> Result<(), Report> {
use flate2::read::GzDecoder;
use flate2::write::GzEncoder;
- use flate2::Compression;
- // Decompress input
- let mut decoder = GzDecoder::new(input);
- let mut decompressed = Vec::new();
- decoder
- .read_to_end(&mut decompressed)
- .change_context(TrustedServerError::Proxy {
- message: "Failed to decompress gzip".to_string(),
- })?;
-
- log::info!("Decompressed size: {} bytes", decompressed.len());
-
- // Process the decompressed content
- let processed = self
- .processor
- .process_chunk(&decompressed, true)
- .change_context(TrustedServerError::Proxy {
- message: "Failed to process content".to_string(),
- })?;
-
- log::info!("Processed size: {} bytes", processed.len());
-
- // Recompress the output
- let mut encoder = GzEncoder::new(output, Compression::default());
- encoder
- .write_all(&processed)
- .change_context(TrustedServerError::Proxy {
- message: "Failed to write to gzip encoder".to_string(),
- })?;
+ let decoder = GzDecoder::new(input);
+ let mut encoder = GzEncoder::new(output, flate2::Compression::default());
+ self.process_through_compression(decoder, &mut encoder)?;
encoder.finish().change_context(TrustedServerError::Proxy {
- message: "Failed to finish gzip encoder".to_string(),
+ message: "Failed to finalize gzip encoder".to_string(),
})?;
-
Ok(())
}
@@ -702,6 +675,56 @@ mod tests {
);
}
+ #[test]
+ fn test_gzip_to_gzip_produces_correct_output() {
+ use flate2::read::GzDecoder;
+ use flate2::write::GzEncoder;
+ use std::io::{Read as _, Write as _};
+
+ // Arrange
+ let input_data = b"hello world";
+
+ let mut compressed_input = Vec::new();
+ {
+ let mut enc =
+ GzEncoder::new(&mut compressed_input, flate2::Compression::default());
+ enc.write_all(input_data)
+ .expect("should compress test input");
+ enc.finish().expect("should finish compression");
+ }
+
+ let replacer = StreamingReplacer::new(vec![Replacement {
+ find: "hello".to_string(),
+ replace_with: "hi".to_string(),
+ }]);
+
+ let config = PipelineConfig {
+ input_compression: Compression::Gzip,
+ output_compression: Compression::Gzip,
+ chunk_size: 8192,
+ };
+
+ let mut pipeline = StreamingPipeline::new(config, replacer);
+ let mut output = Vec::new();
+
+ // Act
+ pipeline
+ .process(&compressed_input[..], &mut output)
+ .expect("should process gzip-to-gzip");
+
+ // Assert
+ let mut decompressed = Vec::new();
+ GzDecoder::new(&output[..])
+ .read_to_end(&mut decompressed)
+ .expect("should decompress output — implies encoder was finalized correctly");
+
+ assert_eq!(
+ String::from_utf8(decompressed).expect("should be valid UTF-8"),
+ "hi world",
+ "should have replaced content through gzip round-trip"
+ );
+ }
+
#[test]
fn test_streaming_pipeline_with_html_rewriter() {
use lol_html::{element, Settings};
From a4f4a7c189eeeaa5a778eac958e4881c623aa8af Mon Sep 17 00:00:00 2001
From: Aram Grigoryan <132480+aram356@users.noreply.github.com>
Date: Thu, 26 Mar 2026 09:19:29 -0700
Subject: [PATCH 09/29] Convert decompress_and_process to chunk-based
processing
---
.../src/streaming_processor.rs | 114 ++++++++++++++----
1 file changed, 89 insertions(+), 25 deletions(-)
diff --git a/crates/trusted-server-core/src/streaming_processor.rs b/crates/trusted-server-core/src/streaming_processor.rs
index accf80e2..5ea7aa5b 100644
--- a/crates/trusted-server-core/src/streaming_processor.rs
+++ b/crates/trusted-server-core/src/streaming_processor.rs
@@ -197,39 +197,58 @@ impl StreamingPipeline {
Ok(())
}
- /// Decompress input, process content, and write uncompressed output.
+ /// Decompress input, process content in chunks, and write uncompressed output.
fn decompress_and_process(
&mut self,
mut decoder: R,
mut output: W,
codec_name: &str,
) -> Result<(), Report> {
- let mut decompressed = Vec::new();
- decoder
- .read_to_end(&mut decompressed)
- .change_context(TrustedServerError::Proxy {
- message: format!("Failed to decompress {codec_name}"),
- })?;
-
- log::info!(
- "{codec_name} decompressed size: {} bytes",
- decompressed.len()
- );
-
- let processed = self
- .processor
- .process_chunk(&decompressed, true)
- .change_context(TrustedServerError::Proxy {
- message: "Failed to process content".to_string(),
- })?;
+ let mut buffer = vec![0u8; self.config.chunk_size];
- log::info!("{codec_name} processed size: {} bytes", processed.len());
+ loop {
+ match decoder.read(&mut buffer) {
+ Ok(0) => {
+ let final_chunk = self.processor.process_chunk(&[], true).change_context(
+ TrustedServerError::Proxy {
+ message: format!("Failed to process final {codec_name} chunk"),
+ },
+ )?;
+ if !final_chunk.is_empty() {
+ output.write_all(&final_chunk).change_context(
+ TrustedServerError::Proxy {
+ message: format!("Failed to write final {codec_name} chunk"),
+ },
+ )?;
+ }
+ break;
+ }
+ Ok(n) => {
+ let processed = self
+ .processor
+ .process_chunk(&buffer[..n], false)
+ .change_context(TrustedServerError::Proxy {
+ message: format!("Failed to process {codec_name} chunk"),
+ })?;
+ if !processed.is_empty() {
+ output.write_all(&processed).change_context(
+ TrustedServerError::Proxy {
+ message: format!("Failed to write {codec_name} chunk"),
+ },
+ )?;
+ }
+ }
+ Err(e) => {
+ return Err(Report::new(TrustedServerError::Proxy {
+ message: format!("Failed to read from {codec_name} decoder: {e}"),
+ }));
+ }
+ }
+ }
- output
- .write_all(&processed)
- .change_context(TrustedServerError::Proxy {
- message: "Failed to write output".to_string(),
- })?;
+ output.flush().change_context(TrustedServerError::Proxy {
+ message: format!("Failed to flush {codec_name} output"),
+ })?;
Ok(())
}
@@ -725,6 +744,51 @@ mod tests {
);
}
+ #[test]
+ fn test_gzip_to_none_produces_correct_output() {
+ use flate2::write::GzEncoder;
+ use std::io::Write as _;
+
+ // Arrange
+ let input_data = b"hello world";
+
+ let mut compressed_input = Vec::new();
+ {
+ let mut enc =
+ GzEncoder::new(&mut compressed_input, flate2::Compression::default());
+ enc.write_all(input_data)
+ .expect("should compress test input");
+ enc.finish().expect("should finish compression");
+ }
+
+ let replacer = StreamingReplacer::new(vec![Replacement {
+ find: "hello".to_string(),
+ replace_with: "hi".to_string(),
+ }]);
+
+ let config = PipelineConfig {
+ input_compression: Compression::Gzip,
+ output_compression: Compression::None,
+ chunk_size: 8192,
+ };
+
+ let mut pipeline = StreamingPipeline::new(config, replacer);
+ let mut output = Vec::new();
+
+ // Act
+ pipeline
+ .process(&compressed_input[..], &mut output)
+ .expect("should process gzip-to-none");
+
+ // Assert
+ let result =
+ String::from_utf8(output).expect("should be valid UTF-8 uncompressed output");
+ assert_eq!(
+ result, "hi world",
+ "should have replaced content after gzip decompression"
+ );
+ }
+
#[test]
fn test_streaming_pipeline_with_html_rewriter() {
use lol_html::{element, Settings};
From 105244c1dab0468c4155c220adf81da04b8c3264 Mon Sep 17 00:00:00 2001
From: Aram Grigoryan <132480+aram356@users.noreply.github.com>
Date: Thu, 26 Mar 2026 09:23:49 -0700
Subject: [PATCH 10/29] Rewrite HtmlRewriterAdapter for incremental lol_html
streaming
---
.../src/streaming_processor.rs | 241 +++++++++++-------
1 file changed, 144 insertions(+), 97 deletions(-)
diff --git a/crates/trusted-server-core/src/streaming_processor.rs b/crates/trusted-server-core/src/streaming_processor.rs
index 5ea7aa5b..20171b6a 100644
--- a/crates/trusted-server-core/src/streaming_processor.rs
+++ b/crates/trusted-server-core/src/streaming_processor.rs
@@ -6,6 +6,9 @@
//! - Memory-efficient streaming
//! - UTF-8 boundary handling
+use std::cell::RefCell;
+use std::rc::Rc;
+
use error_stack::{Report, ResultExt};
use std::io::{self, Read, Write};
@@ -389,81 +392,70 @@ impl StreamingPipeline {
}
}
-/// Adapter to use `lol_html` `HtmlRewriter` as a `StreamProcessor`
-/// Important: Due to `lol_html`'s ownership model, we must accumulate input
-/// and process it all at once when the stream ends. This is a limitation
-/// of the `lol_html` library's API design.
+/// Shared output buffer used as an [`lol_html::OutputSink`].
+///
+/// The `HtmlRewriter` invokes [`OutputSink::handle_chunk`] synchronously during
+/// each [`HtmlRewriter::write`] call, so the buffer is drained after every
+/// `process_chunk` invocation to emit output incrementally.
+struct RcVecSink(Rc>>);
+
+impl lol_html::OutputSink for RcVecSink {
+ fn handle_chunk(&mut self, chunk: &[u8]) {
+ self.0.borrow_mut().extend_from_slice(chunk);
+ }
+}
+
+/// Adapter to use `lol_html` [`HtmlRewriter`](lol_html::HtmlRewriter) as a [`StreamProcessor`].
+///
+/// Output is emitted incrementally on every [`StreamProcessor::process_chunk`] call.
+/// The adapter is single-use: one adapter per request. Calling [`StreamProcessor::reset`]
+/// is a no-op because the rewriter consumes its settings on construction.
pub struct HtmlRewriterAdapter {
- settings: lol_html::Settings<'static, 'static>,
- accumulated_input: Vec,
+ rewriter: Option>,
+ output: Rc>>,
}
impl HtmlRewriterAdapter {
- /// Create a new HTML rewriter adapter
+ /// Create a new HTML rewriter adapter that streams output per chunk.
#[must_use]
pub fn new(settings: lol_html::Settings<'static, 'static>) -> Self {
+ let output = Rc::new(RefCell::new(Vec::new()));
+ let sink = RcVecSink(Rc::clone(&output));
+ let rewriter = lol_html::HtmlRewriter::new(settings, sink);
Self {
- settings,
- accumulated_input: Vec::new(),
+ rewriter: Some(rewriter),
+ output,
}
}
}
impl StreamProcessor for HtmlRewriterAdapter {
fn process_chunk(&mut self, chunk: &[u8], is_last: bool) -> Result, io::Error> {
- // Accumulate input chunks
- self.accumulated_input.extend_from_slice(chunk);
-
- if !chunk.is_empty() {
- log::debug!(
- "Buffering chunk: {} bytes, total buffered: {} bytes",
- chunk.len(),
- self.accumulated_input.len()
- );
+ if let Some(rewriter) = &mut self.rewriter {
+ if !chunk.is_empty() {
+ rewriter.write(chunk).map_err(|e| {
+ log::error!("Failed to process HTML chunk: {e}");
+ io::Error::other(format!("HTML processing failed: {e}"))
+ })?;
+ }
}
- // Only process when we have all the input
if is_last {
- log::info!(
- "Processing complete document: {} bytes",
- self.accumulated_input.len()
- );
-
- // Process all accumulated input at once
- let mut output = Vec::new();
-
- // Create rewriter with output sink
- let mut rewriter = lol_html::HtmlRewriter::new(
- std::mem::take(&mut self.settings),
- |chunk: &[u8]| {
- output.extend_from_slice(chunk);
- },
- );
-
- // Process the entire document
- rewriter.write(&self.accumulated_input).map_err(|e| {
- log::error!("Failed to process HTML: {}", e);
- io::Error::other(format!("HTML processing failed: {}", e))
- })?;
-
- // Finalize the rewriter
- rewriter.end().map_err(|e| {
- log::error!("Failed to finalize: {}", e);
- io::Error::other(format!("HTML finalization failed: {}", e))
- })?;
-
- log::debug!("Output size: {} bytes", output.len());
- self.accumulated_input.clear();
- Ok(output)
- } else {
- // Return empty until we have all input
- // This is a limitation of lol_html's API
- Ok(Vec::new())
+ if let Some(rewriter) = self.rewriter.take() {
+ rewriter.end().map_err(|e| {
+ log::error!("Failed to finalize HTML: {e}");
+ io::Error::other(format!("HTML finalization failed: {e}"))
+ })?;
+ }
}
+
+ // Drain whatever lol_html produced since the last call
+ Ok(std::mem::take(&mut *self.output.borrow_mut()))
}
fn reset(&mut self) {
- self.accumulated_input.clear();
+ // No-op: the rewriter consumed its Settings on construction.
+ // Single-use by design (one adapter per request).
}
}
@@ -530,7 +522,7 @@ mod tests {
}
#[test]
- fn test_html_rewriter_adapter_accumulates_until_last() {
+ fn test_html_rewriter_adapter_streams_incrementally() {
use lol_html::{element, Settings};
// Create a simple HTML rewriter that replaces text
@@ -544,32 +536,40 @@ mod tests {
let mut adapter = HtmlRewriterAdapter::new(settings);
- // Test that intermediate chunks return empty
let chunk1 = b"";
let result1 = adapter
.process_chunk(chunk1, false)
.expect("should process chunk1");
- assert_eq!(result1.len(), 0, "Should return empty for non-last chunk");
let chunk2 = b"original
";
let result2 = adapter
.process_chunk(chunk2, false)
.expect("should process chunk2");
- assert_eq!(result2.len(), 0, "Should return empty for non-last chunk");
- // Test that last chunk processes everything
let chunk3 = b"";
let result3 = adapter
.process_chunk(chunk3, true)
.expect("should process final chunk");
+
+ // Concatenate all outputs and verify the final HTML is correct
+ let mut all_output = result1;
+ all_output.extend_from_slice(&result2);
+ all_output.extend_from_slice(&result3);
+
assert!(
- !result3.is_empty(),
- "Should return processed content for last chunk"
+ !all_output.is_empty(),
+ "should produce non-empty concatenated output"
);
- let output = String::from_utf8(result3).expect("output should be valid UTF-8");
- assert!(output.contains("replaced"), "Should have replaced content");
- assert!(output.contains(""), "Should have complete HTML");
+ let output = String::from_utf8(all_output).expect("output should be valid UTF-8");
+ assert!(
+ output.contains("replaced"),
+ "should have replaced content in concatenated output"
+ );
+ assert!(
+ output.contains(""),
+ "should have complete HTML in concatenated output"
+ );
}
#[test]
@@ -586,59 +586,59 @@ mod tests {
}
large_html.push_str("");
- // Process in chunks
+ // Process in chunks and collect all output
let chunk_size = 1024;
let bytes = large_html.as_bytes();
- let mut chunks = bytes.chunks(chunk_size);
- let mut last_chunk = chunks.next().unwrap_or(&[]);
+ let mut chunks = bytes.chunks(chunk_size).peekable();
+ let mut all_output = Vec::new();
- for chunk in chunks {
+ while let Some(chunk) = chunks.next() {
+ let is_last = chunks.peek().is_none();
let result = adapter
- .process_chunk(last_chunk, false)
- .expect("should process intermediate chunk");
- assert_eq!(result.len(), 0, "Intermediate chunks should return empty");
- last_chunk = chunk;
+ .process_chunk(chunk, is_last)
+ .expect("should process chunk");
+ all_output.extend_from_slice(&result);
}
- // Process last chunk
- let result = adapter
- .process_chunk(last_chunk, true)
- .expect("should process last chunk");
- assert!(!result.is_empty(), "Last chunk should return content");
+ assert!(
+ !all_output.is_empty(),
+ "should produce non-empty output for large document"
+ );
- let output = String::from_utf8(result).expect("output should be valid UTF-8");
+ let output = String::from_utf8(all_output).expect("output should be valid UTF-8");
assert!(
output.contains("Paragraph 999"),
- "Should contain all content"
+ "should contain all content from large document"
);
}
#[test]
- fn test_html_rewriter_adapter_reset() {
+ fn test_html_rewriter_adapter_reset_is_noop() {
use lol_html::Settings;
let settings = Settings::default();
let mut adapter = HtmlRewriterAdapter::new(settings);
// Process some content
- adapter
- .process_chunk(b"", false)
- .expect("should process html tag");
- adapter
- .process_chunk(b"test", false)
- .expect("should process body");
-
- // Reset should clear accumulated input
+ let result1 = adapter
+ .process_chunk(b"test", false)
+ .expect("should process html");
+
+ // Reset is a no-op — the adapter is single-use by design
adapter.reset();
- // After reset, adapter should be ready for new input
- let result = adapter
- .process_chunk(b"new
", true)
- .expect("should process new content after reset");
- let output = String::from_utf8(result).expect("output should be valid UTF-8");
- assert_eq!(
- output, "new
",
- "Should only contain new input after reset"
+ // The rewriter is still alive; finalize it
+ let result2 = adapter
+ .process_chunk(b"", true)
+ .expect("should finalize after reset");
+
+ let mut all_output = result1;
+ all_output.extend_from_slice(&result2);
+
+ let output = String::from_utf8(all_output).expect("output should be valid UTF-8");
+ assert!(
+ output.contains("test"),
+ "should still produce output after no-op reset"
);
}
@@ -789,6 +789,53 @@ mod tests {
);
}
+ #[test]
+ fn test_html_rewriter_adapter_emits_output_per_chunk() {
+ use lol_html::Settings;
+
+ let settings = Settings::default();
+ let mut adapter = HtmlRewriterAdapter::new(settings);
+
+ // Send three chunks
+ let chunk1 = b"";
+ let result1 = adapter
+ .process_chunk(chunk1, false)
+ .expect("should process chunk1");
+ assert!(
+ !result1.is_empty(),
+ "should emit output for first chunk, got empty"
+ );
+
+ let chunk2 = b"hello
";
+ let result2 = adapter
+ .process_chunk(chunk2, false)
+ .expect("should process chunk2");
+
+ let chunk3 = b"";
+ let result3 = adapter
+ .process_chunk(chunk3, true)
+ .expect("should process final chunk");
+
+ // Concatenate all outputs and verify correctness
+ let mut all_output = result1;
+ all_output.extend_from_slice(&result2);
+ all_output.extend_from_slice(&result3);
+
+ let output = String::from_utf8(all_output).expect("output should be valid UTF-8");
+ assert!(
+ output.contains(""),
+ "should contain html tag in concatenated output"
+ );
+ assert!(
+ output.contains("hello
"),
+ "should contain paragraph in concatenated output"
+ );
+ assert!(
+ output.contains(""),
+ "should contain closing html tag in concatenated output"
+ );
+ }
+
#[test]
fn test_streaming_pipeline_with_html_rewriter() {
use lol_html::{element, Settings};
From d72669c6c8057c411177692d8f4be4e0ab3d95a4 Mon Sep 17 00:00:00 2001
From: Aram Grigoryan <132480+aram356@users.noreply.github.com>
Date: Thu, 26 Mar 2026 09:28:08 -0700
Subject: [PATCH 11/29] Unify compression paths into single process_chunks
method
---
.../src/streaming_processor.rs | 300 +++++-------------
1 file changed, 73 insertions(+), 227 deletions(-)
diff --git a/crates/trusted-server-core/src/streaming_processor.rs b/crates/trusted-server-core/src/streaming_processor.rs
index 20171b6a..7062df93 100644
--- a/crates/trusted-server-core/src/streaming_processor.rs
+++ b/crates/trusted-server-core/src/streaming_processor.rs
@@ -94,6 +94,10 @@ impl StreamingPipeline {
/// Process a stream from input to output
///
+ /// Handles all supported compression transformations by wrapping the raw
+ /// reader/writer in the appropriate decoder/encoder, then delegating to
+ /// [`Self::process_chunks`].
+ ///
/// # Errors
///
/// Returns an error if the compression transformation is unsupported or if reading/writing fails.
@@ -106,253 +110,96 @@ impl StreamingPipeline {
self.config.input_compression,
self.config.output_compression,
) {
- (Compression::None, Compression::None) => self.process_uncompressed(input, output),
- (Compression::Gzip, Compression::Gzip) => self.process_gzip_to_gzip(input, output),
- (Compression::Gzip, Compression::None) => self.process_gzip_to_none(input, output),
+ (Compression::None, Compression::None) => self.process_chunks(input, output),
+ (Compression::Gzip, Compression::Gzip) => {
+ use flate2::read::GzDecoder;
+ use flate2::write::GzEncoder;
+
+ let decoder = GzDecoder::new(input);
+ let mut encoder = GzEncoder::new(output, flate2::Compression::default());
+ self.process_chunks(decoder, &mut encoder)?;
+ encoder.finish().change_context(TrustedServerError::Proxy {
+ message: "Failed to finalize gzip encoder".to_string(),
+ })?;
+ Ok(())
+ }
+ (Compression::Gzip, Compression::None) => {
+ use flate2::read::GzDecoder;
+
+ self.process_chunks(GzDecoder::new(input), output)
+ }
(Compression::Deflate, Compression::Deflate) => {
- self.process_deflate_to_deflate(input, output)
+ use flate2::read::ZlibDecoder;
+ use flate2::write::ZlibEncoder;
+
+ let decoder = ZlibDecoder::new(input);
+ let mut encoder = ZlibEncoder::new(output, flate2::Compression::default());
+ self.process_chunks(decoder, &mut encoder)?;
+ encoder.finish().change_context(TrustedServerError::Proxy {
+ message: "Failed to finalize deflate encoder".to_string(),
+ })?;
+ Ok(())
}
(Compression::Deflate, Compression::None) => {
- self.process_deflate_to_none(input, output)
+ use flate2::read::ZlibDecoder;
+
+ self.process_chunks(ZlibDecoder::new(input), output)
}
(Compression::Brotli, Compression::Brotli) => {
- self.process_brotli_to_brotli(input, output)
+ use brotli::enc::writer::CompressorWriter;
+ use brotli::enc::BrotliEncoderParams;
+ use brotli::Decompressor;
+
+ let decoder = Decompressor::new(input, 4096);
+ let params = BrotliEncoderParams {
+ quality: 4,
+ lgwin: 22,
+ ..Default::default()
+ };
+ let mut encoder = CompressorWriter::with_params(output, 4096, ¶ms);
+ self.process_chunks(decoder, &mut encoder)?;
+ // CompressorWriter finalizes on flush (already called) and into_inner
+ encoder.into_inner();
+ Ok(())
+ }
+ (Compression::Brotli, Compression::None) => {
+ use brotli::Decompressor;
+
+ self.process_chunks(Decompressor::new(input, 4096), output)
}
- (Compression::Brotli, Compression::None) => self.process_brotli_to_none(input, output),
_ => Err(Report::new(TrustedServerError::Proxy {
message: "Unsupported compression transformation".to_string(),
})),
}
}
- /// Process uncompressed stream
- fn process_uncompressed(
- &mut self,
- mut input: R,
- mut output: W,
- ) -> Result<(), Report> {
- let mut buffer = vec![0u8; self.config.chunk_size];
-
- loop {
- match input.read(&mut buffer) {
- Ok(0) => {
- // End of stream - process any remaining data
- let final_chunk = self.processor.process_chunk(&[], true).change_context(
- TrustedServerError::Proxy {
- message: "Failed to process final chunk".to_string(),
- },
- )?;
- if !final_chunk.is_empty() {
- output.write_all(&final_chunk).change_context(
- TrustedServerError::Proxy {
- message: "Failed to write final chunk".to_string(),
- },
- )?;
- }
- break;
- }
- Ok(n) => {
- // Process this chunk
- let processed = self
- .processor
- .process_chunk(&buffer[..n], false)
- .change_context(TrustedServerError::Proxy {
- message: "Failed to process chunk".to_string(),
- })?;
- if !processed.is_empty() {
- output
- .write_all(&processed)
- .change_context(TrustedServerError::Proxy {
- message: "Failed to write processed chunk".to_string(),
- })?;
- }
- }
- Err(e) => {
- return Err(Report::new(TrustedServerError::Proxy {
- message: format!("Failed to read from input: {}", e),
- }));
- }
- }
- }
-
- output.flush().change_context(TrustedServerError::Proxy {
- message: "Failed to flush output".to_string(),
- })?;
-
- Ok(())
- }
-
- /// Process gzip compressed stream
- fn process_gzip_to_gzip(
- &mut self,
- input: R,
- output: W,
- ) -> Result<(), Report> {
- use flate2::read::GzDecoder;
- use flate2::write::GzEncoder;
-
- let decoder = GzDecoder::new(input);
- let mut encoder = GzEncoder::new(output, flate2::Compression::default());
- self.process_through_compression(decoder, &mut encoder)?;
- encoder.finish().change_context(TrustedServerError::Proxy {
- message: "Failed to finalize gzip encoder".to_string(),
- })?;
- Ok(())
- }
-
- /// Decompress input, process content in chunks, and write uncompressed output.
- fn decompress_and_process(
- &mut self,
- mut decoder: R,
- mut output: W,
- codec_name: &str,
- ) -> Result<(), Report> {
- let mut buffer = vec![0u8; self.config.chunk_size];
-
- loop {
- match decoder.read(&mut buffer) {
- Ok(0) => {
- let final_chunk = self.processor.process_chunk(&[], true).change_context(
- TrustedServerError::Proxy {
- message: format!("Failed to process final {codec_name} chunk"),
- },
- )?;
- if !final_chunk.is_empty() {
- output.write_all(&final_chunk).change_context(
- TrustedServerError::Proxy {
- message: format!("Failed to write final {codec_name} chunk"),
- },
- )?;
- }
- break;
- }
- Ok(n) => {
- let processed = self
- .processor
- .process_chunk(&buffer[..n], false)
- .change_context(TrustedServerError::Proxy {
- message: format!("Failed to process {codec_name} chunk"),
- })?;
- if !processed.is_empty() {
- output.write_all(&processed).change_context(
- TrustedServerError::Proxy {
- message: format!("Failed to write {codec_name} chunk"),
- },
- )?;
- }
- }
- Err(e) => {
- return Err(Report::new(TrustedServerError::Proxy {
- message: format!("Failed to read from {codec_name} decoder: {e}"),
- }));
- }
- }
- }
-
- output.flush().change_context(TrustedServerError::Proxy {
- message: format!("Failed to flush {codec_name} output"),
- })?;
-
- Ok(())
- }
-
- /// Process gzip compressed input to uncompressed output (decompression only)
- fn process_gzip_to_none(
- &mut self,
- input: R,
- output: W,
- ) -> Result<(), Report> {
- use flate2::read::GzDecoder;
-
- self.decompress_and_process(GzDecoder::new(input), output, "gzip")
- }
-
- /// Process deflate compressed stream
- fn process_deflate_to_deflate(
- &mut self,
- input: R,
- output: W,
- ) -> Result<(), Report> {
- use flate2::read::ZlibDecoder;
- use flate2::write::ZlibEncoder;
-
- let decoder = ZlibDecoder::new(input);
- let mut encoder = ZlibEncoder::new(output, flate2::Compression::default());
- self.process_through_compression(decoder, &mut encoder)?;
- encoder.finish().change_context(TrustedServerError::Proxy {
- message: "Failed to finalize deflate encoder".to_string(),
- })?;
- Ok(())
- }
-
- /// Process deflate compressed input to uncompressed output (decompression only)
- fn process_deflate_to_none(
- &mut self,
- input: R,
- output: W,
- ) -> Result<(), Report> {
- use flate2::read::ZlibDecoder;
-
- self.decompress_and_process(ZlibDecoder::new(input), output, "deflate")
- }
-
- /// Process brotli compressed stream
- fn process_brotli_to_brotli(
- &mut self,
- input: R,
- output: W,
- ) -> Result<(), Report> {
- use brotli::enc::writer::CompressorWriter;
- use brotli::enc::BrotliEncoderParams;
- use brotli::Decompressor;
-
- let decoder = Decompressor::new(input, 4096);
- let params = BrotliEncoderParams {
- quality: 4,
- lgwin: 22,
- ..Default::default()
- };
- let mut encoder = CompressorWriter::with_params(output, 4096, ¶ms);
- self.process_through_compression(decoder, &mut encoder)?;
- // CompressorWriter finalizes on flush (already called) and into_inner
- encoder.into_inner();
- Ok(())
- }
-
- /// Process brotli compressed input to uncompressed output (decompression only)
- fn process_brotli_to_none(
- &mut self,
- input: R,
- output: W,
- ) -> Result<(), Report> {
- use brotli::Decompressor;
-
- self.decompress_and_process(Decompressor::new(input, 4096), output, "brotli")
- }
-
- /// Generic processing through compression layers
+ /// Read chunks from `reader`, pass each through the processor, and write output to `writer`.
///
- /// The caller retains ownership of `encoder` and must call its
- /// type-specific finalization method (e.g., `finish()` or `into_inner()`)
- /// after this function returns successfully.
- fn process_through_compression(
+ /// This is the single unified chunk loop used by all compression paths.
+ /// The caller is responsible for wrapping `reader`/`writer` in the appropriate
+ /// decoder/encoder and for finalizing the encoder (e.g., calling `finish()`)
+ /// after this method returns.
+ ///
+ /// # Errors
+ ///
+ /// Returns an error if reading, processing, or writing any chunk fails.
+ fn process_chunks(
&mut self,
- mut decoder: R,
- encoder: &mut W,
+ mut reader: R,
+ mut writer: W,
) -> Result<(), Report> {
let mut buffer = vec![0u8; self.config.chunk_size];
loop {
- match decoder.read(&mut buffer) {
+ match reader.read(&mut buffer) {
Ok(0) => {
- // End of stream
let final_chunk = self.processor.process_chunk(&[], true).change_context(
TrustedServerError::Proxy {
message: "Failed to process final chunk".to_string(),
},
)?;
if !final_chunk.is_empty() {
- encoder.write_all(&final_chunk).change_context(
+ writer.write_all(&final_chunk).change_context(
TrustedServerError::Proxy {
message: "Failed to write final chunk".to_string(),
},
@@ -368,7 +215,7 @@ impl StreamingPipeline {
message: "Failed to process chunk".to_string(),
})?;
if !processed.is_empty() {
- encoder.write_all(&processed).change_context(
+ writer.write_all(&processed).change_context(
TrustedServerError::Proxy {
message: "Failed to write processed chunk".to_string(),
},
@@ -377,17 +224,16 @@ impl StreamingPipeline {
}
Err(e) => {
return Err(Report::new(TrustedServerError::Proxy {
- message: format!("Failed to read from decoder: {}", e),
+ message: format!("Failed to read: {e}"),
}));
}
}
}
- encoder.flush().change_context(TrustedServerError::Proxy {
- message: "Failed to flush encoder".to_string(),
+ writer.flush().change_context(TrustedServerError::Proxy {
+ message: "Failed to flush output".to_string(),
})?;
- // Caller owns encoder and must call finish() after this returns.
Ok(())
}
}
From 80e51d4807411bc5d3bb77cc6a6971d2a5e4cebb Mon Sep 17 00:00:00 2001
From: Aram Grigoryan <132480+aram356@users.noreply.github.com>
Date: Thu, 26 Mar 2026 09:30:06 -0700
Subject: [PATCH 12/29] Update plan with compression refactor implementation
note
---
docs/superpowers/plans/2026-03-25-streaming-response.md | 8 ++++++++
1 file changed, 8 insertions(+)
diff --git a/docs/superpowers/plans/2026-03-25-streaming-response.md b/docs/superpowers/plans/2026-03-25-streaming-response.md
index 268517b8..4afca7fe 100644
--- a/docs/superpowers/plans/2026-03-25-streaming-response.md
+++ b/docs/superpowers/plans/2026-03-25-streaming-response.md
@@ -35,6 +35,14 @@ rewriting), `flate2` (gzip/deflate), `brotli` (brotli compression).
## Phase 1: Make the Pipeline Chunk-Emitting
+> **Implementation note (2026-03-26):** Tasks 1-3 were implemented as planned,
+> then followed by a refactor that unified all 9 `process_*_to_*` methods into
+> a single `process_chunks` method with inline decoder/encoder creation in
+> `process()`. This eliminated ~150 lines of duplication. The refactor was
+> committed as "Unify compression paths into single process_chunks method".
+> Tasks 1-3 descriptions below reflect the original plan; the final code is
+> cleaner than described.
+
### Task 1: Fix encoder finalization in `process_through_compression`
This is the prerequisite for Task 2. The current code calls `flush()` then
From c505c00395efb034ef2dce6047f7adc2dcb11948 Mon Sep 17 00:00:00 2001
From: Aram Grigoryan <132480+aram356@users.noreply.github.com>
Date: Thu, 26 Mar 2026 09:33:45 -0700
Subject: [PATCH 13/29] Accumulate output for post-processors in
HtmlWithPostProcessing
---
.../trusted-server-core/src/html_processor.rs | 29 +++++++++++++++----
1 file changed, 24 insertions(+), 5 deletions(-)
diff --git a/crates/trusted-server-core/src/html_processor.rs b/crates/trusted-server-core/src/html_processor.rs
index 540ab29d..30550318 100644
--- a/crates/trusted-server-core/src/html_processor.rs
+++ b/crates/trusted-server-core/src/html_processor.rs
@@ -20,6 +20,9 @@ use crate::tsjs;
struct HtmlWithPostProcessing {
inner: HtmlRewriterAdapter,
post_processors: Vec>,
+ /// Buffer that accumulates all intermediate output when post-processors
+ /// need the full document. Left empty on the streaming-only path.
+ accumulated_output: Vec,
origin_host: String,
request_host: String,
request_scheme: String,
@@ -29,12 +32,26 @@ struct HtmlWithPostProcessing {
impl StreamProcessor for HtmlWithPostProcessing {
fn process_chunk(&mut self, chunk: &[u8], is_last: bool) -> Result, io::Error> {
let output = self.inner.process_chunk(chunk, is_last)?;
- if !is_last || output.is_empty() || self.post_processors.is_empty() {
+
+ // Streaming-optimized path: no post-processors, pass through immediately.
+ if self.post_processors.is_empty() {
return Ok(output);
}
- let Ok(output_str) = std::str::from_utf8(&output) else {
- return Ok(output);
+ // Post-processors need the full document. Accumulate until the last chunk.
+ self.accumulated_output.extend_from_slice(&output);
+ if !is_last {
+ return Ok(Vec::new());
+ }
+
+ // Final chunk: run post-processors on the full accumulated output.
+ let full_output = std::mem::take(&mut self.accumulated_output);
+ if full_output.is_empty() {
+ return Ok(full_output);
+ }
+
+ let Ok(output_str) = std::str::from_utf8(&full_output) else {
+ return Ok(full_output);
};
let ctx = IntegrationHtmlContext {
@@ -50,10 +67,10 @@ impl StreamProcessor for HtmlWithPostProcessing {
.iter()
.any(|p| p.should_process(output_str, &ctx))
{
- return Ok(output);
+ return Ok(full_output);
}
- let mut html = String::from_utf8(output).map_err(|e| {
+ let mut html = String::from_utf8(full_output).map_err(|e| {
io::Error::other(format!(
"HTML post-processing expected valid UTF-8 output: {e}"
))
@@ -79,6 +96,7 @@ impl StreamProcessor for HtmlWithPostProcessing {
fn reset(&mut self) {
self.inner.reset();
+ self.accumulated_output.clear();
self.document_state.clear();
}
}
@@ -467,6 +485,7 @@ pub fn create_html_processor(config: HtmlProcessorConfig) -> impl StreamProcesso
HtmlWithPostProcessing {
inner: HtmlRewriterAdapter::new(rewriter_settings),
post_processors,
+ accumulated_output: Vec::new(),
origin_host: config.origin_host,
request_host: config.request_host,
request_scheme: config.request_scheme,
From 6cae7f9982c8a1a8d02793b58c1469b0e67f0d7b Mon Sep 17 00:00:00 2001
From: Aram Grigoryan <132480+aram356@users.noreply.github.com>
Date: Wed, 25 Mar 2026 00:46:53 -0700
Subject: [PATCH 14/29] Add streaming response optimization spec for
non-Next.js paths
---
.../2026-03-25-streaming-response-design.md | 194 ++++++++++++++++++
1 file changed, 194 insertions(+)
create mode 100644 docs/superpowers/specs/2026-03-25-streaming-response-design.md
diff --git a/docs/superpowers/specs/2026-03-25-streaming-response-design.md b/docs/superpowers/specs/2026-03-25-streaming-response-design.md
new file mode 100644
index 00000000..7011dea6
--- /dev/null
+++ b/docs/superpowers/specs/2026-03-25-streaming-response-design.md
@@ -0,0 +1,194 @@
+# Streaming Response Optimization (Next.js Disabled)
+
+## Problem
+
+When Next.js is disabled, the publisher proxy buffers the entire response body
+in memory before sending any bytes to the client. This creates two costs:
+
+1. **Latency** — The client receives zero bytes until the full response is
+ decompressed, rewritten, and recompressed. For a 222KB HTML page, this adds
+ hundreds of milliseconds to time-to-last-byte.
+2. **Memory** — Peak memory holds ~4x the response size simultaneously
+ (compressed input + decompressed + processed output + recompressed output).
+ With WASM's ~16MB heap, this limits the size of pages we can proxy.
+
+## Scope
+
+**In scope**: All content types flowing through the publisher proxy path — HTML,
+text/JSON, and binary pass-through. Only when Next.js is disabled (no
+post-processor requiring the full document).
+
+**Out of scope**: Concurrent origin+auction fetch, Next.js-enabled paths (these
+require full-document post-processing by design), non-publisher routes (static
+JS, auction, discovery).
+
+## Streaming Gate
+
+Before committing to `stream_to_client()`, check:
+
+1. Backend status is success (2xx).
+2. `html_post_processors()` is empty — no registered post-processors.
+
+If either check fails, fall back to the current buffered path. This keeps the
+optimization transparent: same behavior for all existing configurations,
+streaming only activates when safe.
+
+## Architecture
+
+Two implementation steps, each independently valuable and testable.
+
+### Step 1: Make the pipeline chunk-emitting
+
+Three changes to existing processors:
+
+#### A) `HtmlRewriterAdapter` — incremental streaming
+
+The current implementation accumulates the entire HTML document and processes it
+on `is_last`. This is unnecessary — `lol_html::HtmlRewriter` supports
+incremental `write()` calls and emits output via its `OutputSink` callback after
+each chunk.
+
+Fix: create the rewriter eagerly in the constructor, use
+`Rc>>` to share the output buffer between the sink and
+`process_chunk()`, drain the buffer on every call instead of only on `is_last`.
+
+#### B) `process_gzip_to_gzip` — chunk-based decompression
+
+Currently calls `read_to_end()` to decompress the entire body into memory. The
+deflate and brotli paths already use the chunk-based
+`process_through_compression()`.
+
+Fix: use the same `process_through_compression` pattern for gzip.
+
+#### C) `process_through_compression` finalization
+
+Currently uses `drop(encoder)` which silently swallows errors from the gzip
+trailer CRC32 checksum.
+
+Fix: call `encoder.finish()` explicitly and propagate errors.
+
+### Step 2: Stream response to client
+
+Change the publisher proxy path to use Fastly's `StreamingBody` API:
+
+1. Fetch from origin, receive response headers.
+2. Validate status — if backend error, return buffered error response via
+ `send_to_client()`.
+3. Check streaming gate — if `html_post_processors()` is non-empty, fall back
+ to buffered path.
+4. Finalize all response headers (cookies, synthetic ID, geo, version).
+5. Call `response.stream_to_client()` — headers sent to client immediately.
+6. Pipe origin body through the streaming pipeline, writing chunks directly to
+ `StreamingBody`.
+7. Call `finish()` on success; on error, log and drop (client sees truncated
+ response).
+
+For binary/non-text content: use `StreamingBody::append(body)` for zero-copy
+pass-through, bypassing the pipeline entirely.
+
+#### Entry point change
+
+Migrate `main.rs` from `#[fastly::main]` to raw `main()` with `fastly::init()`
++ `Request::from_client()`. This is required because `stream_to_client()` /
+`send_to_client()` are incompatible with `#[fastly::main]`'s return-based model.
+
+Non-streaming routes (static, auction, discovery) use `send_to_client()` as
+before.
+
+## Data Flow
+
+### Streaming path (HTML, text/JSON with processing)
+
+```
+Origin body (gzip)
+ → Read 8KB chunk from GzDecoder
+ → StreamProcessor::process_chunk(chunk, is_last)
+ → HtmlRewriterAdapter: lol_html.write(chunk) → sink emits rewritten bytes
+ → OR StreamingReplacer: URL replacement with overlap buffer
+ → GzEncoder::write(processed_chunk) → compressed bytes
+ → StreamingBody::write(compressed) → chunk sent to client
+ → repeat until EOF
+ → StreamingBody::finish()
+```
+
+Memory at steady state: ~8KB input chunk buffer + lol_html internal parser state
++ gzip encoder window + overlap buffer for replacer. Roughly constant regardless
+of document size, versus the current ~4x document size.
+
+### Pass-through path (binary, images, fonts, etc.)
+
+```
+Origin body
+ → StreamingBody::append(body) → zero-copy transfer
+```
+
+No decompression, no processing, no buffering.
+
+### Buffered fallback path (error responses or post-processors present)
+
+```
+Origin returns 4xx/5xx OR html_post_processors() is non-empty
+ → Current buffered path unchanged
+ → send_to_client() with proper status and full body
+```
+
+## Error Handling
+
+**Backend returns error status**: Detected before calling `stream_to_client()`.
+Return the backend response as-is via `send_to_client()`. Client sees the
+correct error status code. No change from current behavior.
+
+**Processing fails mid-stream**: `lol_html` parse error, decompression
+corruption, I/O error. Headers (200 OK) are already sent. Log the error
+server-side, drop the `StreamingBody`. Client sees a truncated response and the
+connection closes. Standard reverse proxy behavior.
+
+**Compression finalization fails**: The gzip trailer CRC32 write fails. With the
+fix, `encoder.finish()` is called explicitly and errors propagate. Same
+mid-stream handling — log and truncate.
+
+No retry logic. No fallback to buffered after streaming has started — once
+headers are sent, we are committed.
+
+## Files Changed
+
+| File | Change | Risk |
+|------|--------|------|
+| `crates/trusted-server-core/src/streaming_processor.rs` | Rewrite `HtmlRewriterAdapter` to stream incrementally; fix `process_gzip_to_gzip` to use chunk-based processing; fix `process_through_compression` to call `finish()` explicitly | Medium |
+| `crates/trusted-server-core/src/publisher.rs` | Split `handle_publisher_request` into streaming vs buffered paths based on `html_post_processors().is_empty()` | Medium |
+| `crates/trusted-server-adapter-fastly/src/main.rs` | Migrate from `#[fastly::main]` to raw `main()` with `fastly::init()` + `Request::from_client()`; route results to `send_to_client()` or let streaming path handle its own output | Medium |
+
+**Not changed**: `html_processor.rs` (builds lol_html `Settings` passed to
+`HtmlRewriterAdapter`, works as-is), integration registration, JS build
+pipeline, tsjs module serving, auction handler, cookie/synthetic ID logic.
+
+## Testing Strategy
+
+### Unit tests (streaming_processor.rs)
+
+- `HtmlRewriterAdapter` emits output on every `process_chunk()` call, not just
+ `is_last`.
+- `process_gzip_to_gzip` produces correct output without `read_to_end`.
+- `encoder.finish()` errors propagate (not swallowed by `drop`).
+- Multi-chunk HTML produces identical output to single-chunk processing.
+
+### Integration tests (publisher.rs)
+
+- Streaming gate: when `html_post_processors()` is non-empty, response is
+ buffered.
+- Streaming gate: when `html_post_processors()` is empty, response streams.
+- Backend error (4xx/5xx) returns buffered error response with correct status.
+- Binary content passes through without processing.
+
+### End-to-end validation (Viceroy)
+
+- `cargo test --workspace` — all existing tests pass.
+- Manual verification via `fastly compute serve` against a real origin.
+- Compare response bodies before/after to confirm byte-identical output for
+ HTML, text, and binary.
+
+### Measurement (post-deploy)
+
+- Compare TTFB and time-to-last-byte on staging before and after.
+- Monitor WASM heap usage via Fastly dashboard.
+- Verify no regressions on static endpoints or auction.
From 930a584e102692a55f1c0de9bcd84588f7d8955c Mon Sep 17 00:00:00 2001
From: Aram Grigoryan <132480+aram356@users.noreply.github.com>
Date: Wed, 25 Mar 2026 00:50:17 -0700
Subject: [PATCH 15/29] Address spec review: Content-Length, streaming gate,
finalization order, rollback
---
.../2026-03-25-streaming-response-design.md | 58 ++++++++++++++-----
1 file changed, 44 insertions(+), 14 deletions(-)
diff --git a/docs/superpowers/specs/2026-03-25-streaming-response-design.md b/docs/superpowers/specs/2026-03-25-streaming-response-design.md
index 7011dea6..f745f3dd 100644
--- a/docs/superpowers/specs/2026-03-25-streaming-response-design.md
+++ b/docs/superpowers/specs/2026-03-25-streaming-response-design.md
@@ -15,8 +15,8 @@ in memory before sending any bytes to the client. This creates two costs:
## Scope
**In scope**: All content types flowing through the publisher proxy path — HTML,
-text/JSON, and binary pass-through. Only when Next.js is disabled (no
-post-processor requiring the full document).
+text/JSON, RSC Flight (`text/x-component`), and binary pass-through. Only when
+Next.js is disabled (no post-processor requiring the full document).
**Out of scope**: Concurrent origin+auction fetch, Next.js-enabled paths (these
require full-document post-processing by design), non-publisher routes (static
@@ -27,11 +27,14 @@ JS, auction, discovery).
Before committing to `stream_to_client()`, check:
1. Backend status is success (2xx).
-2. `html_post_processors()` is empty — no registered post-processors.
+2. For HTML content: `html_post_processors()` is empty — no registered
+ post-processors. Non-HTML content types (text/JSON, RSC Flight, binary) can
+ always stream regardless of post-processor registration, since
+ post-processors only apply to HTML.
-If either check fails, fall back to the current buffered path. This keeps the
-optimization transparent: same behavior for all existing configurations,
-streaming only activates when safe.
+If either check fails for the given content type, fall back to the current
+buffered path. This keeps the optimization transparent: same behavior for all
+existing configurations, streaming only activates when safe.
## Architecture
@@ -51,6 +54,12 @@ each chunk.
Fix: create the rewriter eagerly in the constructor, use
`Rc>>` to share the output buffer between the sink and
`process_chunk()`, drain the buffer on every call instead of only on `is_last`.
+The output buffer is drained *after* each `rewriter.write()` returns, so the
+`RefCell` borrow in the sink closure never overlaps with the drain borrow.
+
+Note: this makes `HtmlRewriterAdapter` single-use — `reset()` becomes a no-op
+since the `Settings` are consumed by the rewriter constructor. This matches
+actual usage (one adapter per request).
#### B) `process_gzip_to_gzip` — chunk-based decompression
@@ -60,12 +69,16 @@ deflate and brotli paths already use the chunk-based
Fix: use the same `process_through_compression` pattern for gzip.
-#### C) `process_through_compression` finalization
+#### C) `process_through_compression` finalization — prerequisite for B
-Currently uses `drop(encoder)` which silently swallows errors from the gzip
-trailer CRC32 checksum.
+`process_through_compression` currently uses `drop(encoder)` which silently
+swallows errors. For gzip specifically, the trailer contains a CRC32 checksum —
+if `finish()` fails, corrupted responses are served silently. Today this affects
+deflate and brotli (which already use `process_through_compression`); after Step
+1B moves gzip to this path, it will affect gzip too.
-Fix: call `encoder.finish()` explicitly and propagate errors.
+Fix: call `encoder.finish()` explicitly and propagate errors. This must land
+before or with Step 1B.
### Step 2: Stream response to client
@@ -77,10 +90,13 @@ Change the publisher proxy path to use Fastly's `StreamingBody` API:
3. Check streaming gate — if `html_post_processors()` is non-empty, fall back
to buffered path.
4. Finalize all response headers (cookies, synthetic ID, geo, version).
-5. Call `response.stream_to_client()` — headers sent to client immediately.
-6. Pipe origin body through the streaming pipeline, writing chunks directly to
+5. Remove `Content-Length` header — the final size is unknown after processing.
+ Fastly's `StreamingBody` sends the response using chunked transfer encoding
+ automatically.
+6. Call `response.stream_to_client()` — headers sent to client immediately.
+7. Pipe origin body through the streaming pipeline, writing chunks directly to
`StreamingBody`.
-7. Call `finish()` on success; on error, log and drop (client sees truncated
+8. Call `finish()` on success; on error, log and drop (client sees truncated
response).
For binary/non-text content: use `StreamingBody::append(body)` for zero-copy
@@ -154,7 +170,7 @@ headers are sent, we are committed.
| File | Change | Risk |
|------|--------|------|
-| `crates/trusted-server-core/src/streaming_processor.rs` | Rewrite `HtmlRewriterAdapter` to stream incrementally; fix `process_gzip_to_gzip` to use chunk-based processing; fix `process_through_compression` to call `finish()` explicitly | Medium |
+| `crates/trusted-server-core/src/streaming_processor.rs` | Rewrite `HtmlRewriterAdapter` to stream incrementally (becomes single-use); fix `process_gzip_to_gzip` to use chunk-based processing; fix `process_through_compression` to call `finish()` explicitly | High |
| `crates/trusted-server-core/src/publisher.rs` | Split `handle_publisher_request` into streaming vs buffered paths based on `html_post_processors().is_empty()` | Medium |
| `crates/trusted-server-adapter-fastly/src/main.rs` | Migrate from `#[fastly::main]` to raw `main()` with `fastly::init()` + `Request::from_client()`; route results to `send_to_client()` or let streaming path handle its own output | Medium |
@@ -162,6 +178,20 @@ headers are sent, we are committed.
`HtmlRewriterAdapter`, works as-is), integration registration, JS build
pipeline, tsjs module serving, auction handler, cookie/synthetic ID logic.
+Note: `HtmlWithPostProcessing` wraps `HtmlRewriterAdapter` and applies
+post-processors on `is_last`. In the streaming path the post-processor list is
+empty (that's the gate condition), so the wrapper is a no-op passthrough. It
+remains in place — no need to bypass it.
+
+## Rollback Strategy
+
+The `#[fastly::main]` to raw `main()` migration is a structural change. If
+streaming causes issues in production, the fastest rollback is reverting the
+`main.rs` change — the buffered path still exists and the pipeline improvements
+(Step 1) are safe to keep regardless. No feature flag needed; a git revert of
+the Step 2 commit restores buffered behavior while retaining Step 1 memory
+improvements.
+
## Testing Strategy
### Unit tests (streaming_processor.rs)
From a2b71bf53be89be5167eaa4a605767c50b3afb67 Mon Sep 17 00:00:00 2001
From: Aram Grigoryan <132480+aram356@users.noreply.github.com>
Date: Wed, 25 Mar 2026 01:13:06 -0700
Subject: [PATCH 16/29] Address deep review: header timing, error phases,
process_response_streaming refactor
---
.../2026-03-25-streaming-response-design.md | 38 ++++++++++++-------
1 file changed, 25 insertions(+), 13 deletions(-)
diff --git a/docs/superpowers/specs/2026-03-25-streaming-response-design.md b/docs/superpowers/specs/2026-03-25-streaming-response-design.md
index f745f3dd..dd31097d 100644
--- a/docs/superpowers/specs/2026-03-25-streaming-response-design.md
+++ b/docs/superpowers/specs/2026-03-25-streaming-response-design.md
@@ -54,7 +54,7 @@ each chunk.
Fix: create the rewriter eagerly in the constructor, use
`Rc>>` to share the output buffer between the sink and
`process_chunk()`, drain the buffer on every call instead of only on `is_last`.
-The output buffer is drained *after* each `rewriter.write()` returns, so the
+The output buffer is drained _after_ each `rewriter.write()` returns, so the
`RefCell` borrow in the sink closure never overlaps with the drain borrow.
Note: this makes `HtmlRewriterAdapter` single-use — `reset()` becomes a no-op
@@ -90,6 +90,10 @@ Change the publisher proxy path to use Fastly's `StreamingBody` API:
3. Check streaming gate — if `html_post_processors()` is non-empty, fall back
to buffered path.
4. Finalize all response headers (cookies, synthetic ID, geo, version).
+ Today, synthetic ID/cookie headers are set _after_ body processing in
+ `handle_publisher_request`. Since they are body-independent (computed from
+ request cookies and consent context), they must be reordered to run _before_
+ `stream_to_client()` so headers are complete before streaming begins.
5. Remove `Content-Length` header — the final size is unknown after processing.
Fastly's `StreamingBody` sends the response using chunked transfer encoding
automatically.
@@ -99,13 +103,16 @@ Change the publisher proxy path to use Fastly's `StreamingBody` API:
8. Call `finish()` on success; on error, log and drop (client sees truncated
response).
-For binary/non-text content: use `StreamingBody::append(body)` for zero-copy
-pass-through, bypassing the pipeline entirely.
+For binary/non-text content: call `response.take_body()` then
+`StreamingBody::append(body)` for zero-copy pass-through, bypassing the pipeline
+entirely. Today binary responses skip `take_body()` and return the response
+as-is — the streaming path needs to explicitly take the body to hand it to
+`append()`.
#### Entry point change
Migrate `main.rs` from `#[fastly::main]` to raw `main()` with `fastly::init()`
-+ `Request::from_client()`. This is required because `stream_to_client()` /
+\+ `Request::from_client()`. This is required because `stream_to_client()` /
`send_to_client()` are incompatible with `#[fastly::main]`'s return-based model.
Non-streaming routes (static, auction, discovery) use `send_to_client()` as
@@ -128,7 +135,7 @@ Origin body (gzip)
```
Memory at steady state: ~8KB input chunk buffer + lol_html internal parser state
-+ gzip encoder window + overlap buffer for replacer. Roughly constant regardless
+\+ gzip encoder window + overlap buffer for replacer. Roughly constant regardless
of document size, versus the current ~4x document size.
### Pass-through path (binary, images, fonts, etc.)
@@ -154,10 +161,15 @@ Origin returns 4xx/5xx OR html_post_processors() is non-empty
Return the backend response as-is via `send_to_client()`. Client sees the
correct error status code. No change from current behavior.
+**Processor creation fails**: `create_html_stream_processor()` or pipeline
+construction errors happen _before_ `stream_to_client()` is called. Since
+headers have not been sent yet, return a proper error response via
+`send_to_client()`. Same as current behavior.
+
**Processing fails mid-stream**: `lol_html` parse error, decompression
-corruption, I/O error. Headers (200 OK) are already sent. Log the error
-server-side, drop the `StreamingBody`. Client sees a truncated response and the
-connection closes. Standard reverse proxy behavior.
+corruption, I/O error during chunk processing. Headers (200 OK) are already
+sent. Log the error server-side, drop the `StreamingBody`. Client sees a
+truncated response and the connection closes. Standard reverse proxy behavior.
**Compression finalization fails**: The gzip trailer CRC32 write fails. With the
fix, `encoder.finish()` is called explicitly and errors propagate. Same
@@ -168,11 +180,11 @@ headers are sent, we are committed.
## Files Changed
-| File | Change | Risk |
-|------|--------|------|
-| `crates/trusted-server-core/src/streaming_processor.rs` | Rewrite `HtmlRewriterAdapter` to stream incrementally (becomes single-use); fix `process_gzip_to_gzip` to use chunk-based processing; fix `process_through_compression` to call `finish()` explicitly | High |
-| `crates/trusted-server-core/src/publisher.rs` | Split `handle_publisher_request` into streaming vs buffered paths based on `html_post_processors().is_empty()` | Medium |
-| `crates/trusted-server-adapter-fastly/src/main.rs` | Migrate from `#[fastly::main]` to raw `main()` with `fastly::init()` + `Request::from_client()`; route results to `send_to_client()` or let streaming path handle its own output | Medium |
+| File | Change | Risk |
+| ------------------------------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------ |
+| `crates/trusted-server-core/src/streaming_processor.rs` | Rewrite `HtmlRewriterAdapter` to stream incrementally (becomes single-use); fix `process_gzip_to_gzip` to use chunk-based processing; fix `process_through_compression` to call `finish()` explicitly | High |
+| `crates/trusted-server-core/src/publisher.rs` | Refactor `process_response_streaming` to accept `W: Write` instead of hardcoding `Vec`; split `handle_publisher_request` into streaming vs buffered paths; reorder synthetic ID/cookie logic before streaming | Medium |
+| `crates/trusted-server-adapter-fastly/src/main.rs` | Migrate from `#[fastly::main]` to raw `main()` with `fastly::init()` + `Request::from_client()`; route results to `send_to_client()` or let streaming path handle its own output | Medium |
**Not changed**: `html_processor.rs` (builds lol_html `Settings` passed to
`HtmlRewriterAdapter`, works as-is), integration registration, JS build
From b363e562ac105b114ad5562f04245507393d80e8 Mon Sep 17 00:00:00 2001
From: Aram Grigoryan <132480+aram356@users.noreply.github.com>
Date: Wed, 25 Mar 2026 07:59:42 -0700
Subject: [PATCH 17/29] Address deep review: remove fastly::init, fix API
assumptions, add missing paths
---
.../2026-03-25-streaming-response-design.md | 100 +++++++++++++-----
1 file changed, 71 insertions(+), 29 deletions(-)
diff --git a/docs/superpowers/specs/2026-03-25-streaming-response-design.md b/docs/superpowers/specs/2026-03-25-streaming-response-design.md
index dd31097d..80c49ed8 100644
--- a/docs/superpowers/specs/2026-03-25-streaming-response-design.md
+++ b/docs/superpowers/specs/2026-03-25-streaming-response-design.md
@@ -61,21 +61,32 @@ Note: this makes `HtmlRewriterAdapter` single-use — `reset()` becomes a no-op
since the `Settings` are consumed by the rewriter constructor. This matches
actual usage (one adapter per request).
-#### B) `process_gzip_to_gzip` — chunk-based decompression
+#### B) Chunk-based decompression for all compression paths
-Currently calls `read_to_end()` to decompress the entire body into memory. The
-deflate and brotli paths already use the chunk-based
+`process_gzip_to_gzip` calls `read_to_end()` to decompress the entire body into
+memory. The deflate and brotli keep-compression paths already use chunk-based
`process_through_compression()`.
Fix: use the same `process_through_compression` pattern for gzip.
+Additionally, `decompress_and_process()` (used by `process_gzip_to_none`,
+`process_deflate_to_none`, `process_brotli_to_none`) also calls
+`read_to_end()`. These strip-compression paths must be converted to chunk-based
+processing too — read decompressed chunks, process each, write uncompressed
+output directly.
+
+Reference: `process_uncompressed` already implements the correct chunk-based
+pattern (read loop → `process_chunk()` per chunk → `write_all()` → flush). The
+compressed paths should follow the same structure.
+
#### C) `process_through_compression` finalization — prerequisite for B
`process_through_compression` currently uses `drop(encoder)` which silently
-swallows errors. For gzip specifically, the trailer contains a CRC32 checksum —
-if `finish()` fails, corrupted responses are served silently. Today this affects
-deflate and brotli (which already use `process_through_compression`); after Step
-1B moves gzip to this path, it will affect gzip too.
+swallows errors. Today this affects deflate and brotli (which already use this
+path). The current `process_gzip_to_gzip` calls `encoder.finish()` explicitly —
+but Step 1B moves gzip to `process_through_compression`, which would **regress**
+gzip from working `finish()` to broken `drop()`. This fix prevents that
+regression and also fixes the pre-existing issue for deflate/brotli.
Fix: call `encoder.finish()` explicitly and propagate errors. This must land
before or with Step 1B.
@@ -89,11 +100,14 @@ Change the publisher proxy path to use Fastly's `StreamingBody` API:
`send_to_client()`.
3. Check streaming gate — if `html_post_processors()` is non-empty, fall back
to buffered path.
-4. Finalize all response headers (cookies, synthetic ID, geo, version).
- Today, synthetic ID/cookie headers are set _after_ body processing in
- `handle_publisher_request`. Since they are body-independent (computed from
- request cookies and consent context), they must be reordered to run _before_
- `stream_to_client()` so headers are complete before streaming begins.
+4. Finalize all response headers. This requires reordering two things:
+ - **Synthetic ID/cookie headers**: today set _after_ body processing in
+ `handle_publisher_request`. Since they are body-independent (computed from
+ request cookies and consent context), move them _before_ streaming.
+ - **`finalize_response()`** (main.rs): today called _after_ `route_request`
+ returns, adding geo, version, staging, and operator headers. In the
+ streaming path, this must run _before_ `stream_to_client()` since the
+ publisher handler sends the response directly instead of returning it.
5. Remove `Content-Length` header — the final size is unknown after processing.
Fastly's `StreamingBody` sends the response using chunked transfer encoding
automatically.
@@ -103,17 +117,36 @@ Change the publisher proxy path to use Fastly's `StreamingBody` API:
8. Call `finish()` on success; on error, log and drop (client sees truncated
response).
-For binary/non-text content: call `response.take_body()` then
-`StreamingBody::append(body)` for zero-copy pass-through, bypassing the pipeline
-entirely. Today binary responses skip `take_body()` and return the response
-as-is — the streaming path needs to explicitly take the body to hand it to
-`append()`.
+For binary/non-text content: call `response.take_body()` then stream via
+`io::copy(&mut body, &mut streaming_body)`. The `Body` type implements `Read`
+and `StreamingBody` implements `Write`, so this streams the backend body to the
+client without buffering the full content. Today binary responses skip
+`take_body()` and return the response as-is — the streaming path needs to
+explicitly take the body to pipe it through.
#### Entry point change
-Migrate `main.rs` from `#[fastly::main]` to raw `main()` with `fastly::init()`
-\+ `Request::from_client()`. This is required because `stream_to_client()` /
-`send_to_client()` are incompatible with `#[fastly::main]`'s return-based model.
+Migrate `main.rs` from `#[fastly::main]` to an undecorated `main()` with
+`Request::from_client()`. No separate initialization call is needed —
+`#[fastly::main]` is just syntactic sugar for `Request::from_client()` +
+`Response::send_to_client()`. The migration is required because
+`stream_to_client()` / `send_to_client()` are incompatible with
+`#[fastly::main]`'s return-based model.
+
+```rust
+fn main() {
+ let req = Request::from_client();
+ match handle(req) {
+ Ok(()) => {}
+ Err(e) => to_error_response(&e).send_to_client(),
+ }
+}
+```
+
+Note: the return type changes from `Result` to `()` (or
+`Result<(), Error>`). Errors that currently propagate to `main`'s `Result` must
+now be caught explicitly and sent via `send_to_client()` with
+`to_error_response()`.
Non-streaming routes (static, auction, discovery) use `send_to_client()` as
before.
@@ -134,18 +167,19 @@ Origin body (gzip)
→ StreamingBody::finish()
```
-Memory at steady state: ~8KB input chunk buffer + lol_html internal parser state
-\+ gzip encoder window + overlap buffer for replacer. Roughly constant regardless
+Memory at steady state: ~8KB input chunk buffer, lol_html internal parser state,
+gzip encoder window, and overlap buffer for replacer. Roughly constant regardless
of document size, versus the current ~4x document size.
### Pass-through path (binary, images, fonts, etc.)
```
-Origin body
- → StreamingBody::append(body) → zero-copy transfer
+Origin body (via take_body())
+ → io::copy(&mut body, &mut streaming_body) → streamed transfer
+ → StreamingBody::finish()
```
-No decompression, no processing, no buffering.
+No decompression, no processing. Body streams through as read.
### Buffered fallback path (error responses or post-processors present)
@@ -168,8 +202,10 @@ headers have not been sent yet, return a proper error response via
**Processing fails mid-stream**: `lol_html` parse error, decompression
corruption, I/O error during chunk processing. Headers (200 OK) are already
-sent. Log the error server-side, drop the `StreamingBody`. Client sees a
-truncated response and the connection closes. Standard reverse proxy behavior.
+sent. Log the error server-side, drop the `StreamingBody`. Per the Fastly SDK,
+`StreamingBody` automatically aborts the response if dropped without calling
+`finish()` — the client sees a connection reset / truncated response. This is
+standard reverse proxy behavior.
**Compression finalization fails**: The gzip trailer CRC32 write fails. With the
fix, `encoder.finish()` is called explicitly and errors propagate. Same
@@ -182,9 +218,9 @@ headers are sent, we are committed.
| File | Change | Risk |
| ------------------------------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------ |
-| `crates/trusted-server-core/src/streaming_processor.rs` | Rewrite `HtmlRewriterAdapter` to stream incrementally (becomes single-use); fix `process_gzip_to_gzip` to use chunk-based processing; fix `process_through_compression` to call `finish()` explicitly | High |
+| `crates/trusted-server-core/src/streaming_processor.rs` | Rewrite `HtmlRewriterAdapter` to stream incrementally (becomes single-use); convert all compression paths to chunk-based processing (`process_gzip_to_gzip` and `decompress_and_process`); fix `process_through_compression` to call `finish()` explicitly | High |
| `crates/trusted-server-core/src/publisher.rs` | Refactor `process_response_streaming` to accept `W: Write` instead of hardcoding `Vec`; split `handle_publisher_request` into streaming vs buffered paths; reorder synthetic ID/cookie logic before streaming | Medium |
-| `crates/trusted-server-adapter-fastly/src/main.rs` | Migrate from `#[fastly::main]` to raw `main()` with `fastly::init()` + `Request::from_client()`; route results to `send_to_client()` or let streaming path handle its own output | Medium |
+| `crates/trusted-server-adapter-fastly/src/main.rs` | Migrate from `#[fastly::main]` to undecorated `main()` with `Request::from_client()`; explicit error handling via `to_error_response().send_to_client()`; call `finalize_response()` before streaming | Medium |
**Not changed**: `html_processor.rs` (builds lol_html `Settings` passed to
`HtmlRewriterAdapter`, works as-is), integration registration, JS build
@@ -195,6 +231,12 @@ post-processors on `is_last`. In the streaming path the post-processor list is
empty (that's the gate condition), so the wrapper is a no-op passthrough. It
remains in place — no need to bypass it.
+Clarification: `script_rewriters` (used by Next.js and GTM) are distinct from
+`html_post_processors`. Script rewriters run inside `lol_html` element handlers
+during streaming — they do not require buffering and are unaffected by this
+change. The streaming gate checks only `html_post_processors().is_empty()`, not
+script rewriters. Currently only Next.js registers a post-processor.
+
## Rollback Strategy
The `#[fastly::main]` to raw `main()` migration is a structural change. If
From b83f61c4a5c8eab14fc34e766d48d82b25e719aa Mon Sep 17 00:00:00 2001
From: Aram Grigoryan <132480+aram356@users.noreply.github.com>
Date: Thu, 26 Mar 2026 10:37:39 -0700
Subject: [PATCH 18/29] Apply rustfmt formatting to streaming_processor
---
.../src/streaming_processor.rs | 20 ++++++++-----------
1 file changed, 8 insertions(+), 12 deletions(-)
diff --git a/crates/trusted-server-core/src/streaming_processor.rs b/crates/trusted-server-core/src/streaming_processor.rs
index 7062df93..40ec51cb 100644
--- a/crates/trusted-server-core/src/streaming_processor.rs
+++ b/crates/trusted-server-core/src/streaming_processor.rs
@@ -215,11 +215,11 @@ impl StreamingPipeline {
message: "Failed to process chunk".to_string(),
})?;
if !processed.is_empty() {
- writer.write_all(&processed).change_context(
- TrustedServerError::Proxy {
+ writer
+ .write_all(&processed)
+ .change_context(TrustedServerError::Proxy {
message: "Failed to write processed chunk".to_string(),
- },
- )?;
+ })?;
}
}
Err(e) => {
@@ -502,8 +502,7 @@ mod tests {
// Compress input
let mut compressed_input = Vec::new();
{
- let mut enc =
- ZlibEncoder::new(&mut compressed_input, flate2::Compression::default());
+ let mut enc = ZlibEncoder::new(&mut compressed_input, flate2::Compression::default());
enc.write_all(input_data)
.expect("should compress test input");
enc.finish().expect("should finish compression");
@@ -551,8 +550,7 @@ mod tests {
let mut compressed_input = Vec::new();
{
- let mut enc =
- GzEncoder::new(&mut compressed_input, flate2::Compression::default());
+ let mut enc = GzEncoder::new(&mut compressed_input, flate2::Compression::default());
enc.write_all(input_data)
.expect("should compress test input");
enc.finish().expect("should finish compression");
@@ -600,8 +598,7 @@ mod tests {
let mut compressed_input = Vec::new();
{
- let mut enc =
- GzEncoder::new(&mut compressed_input, flate2::Compression::default());
+ let mut enc = GzEncoder::new(&mut compressed_input, flate2::Compression::default());
enc.write_all(input_data)
.expect("should compress test input");
enc.finish().expect("should finish compression");
@@ -627,8 +624,7 @@ mod tests {
.expect("should process gzip-to-none");
// Assert
- let result =
- String::from_utf8(output).expect("should be valid UTF-8 uncompressed output");
+ let result = String::from_utf8(output).expect("should be valid UTF-8 uncompressed output");
assert_eq!(
result, "
hi world",
"should have replaced content after gzip decompression"
From aeca9f6479c33d87263da7a24f61d37a04f72b64 Mon Sep 17 00:00:00 2001
From: Aram Grigoryan <132480+aram356@users.noreply.github.com>
Date: Thu, 26 Mar 2026 10:47:24 -0700
Subject: [PATCH 19/29] Add debug logging, brotli round-trip test, and
post-processor accumulation test
- Add debug-level logging to process_chunks showing total bytes
read and written per pipeline invocation
- Add brotli-to-brotli round-trip test to cover the into_inner()
finalization path
- Add test proving HtmlWithPostProcessing accumulates output when
post-processors are registered while streaming path passes through
---
.../trusted-server-core/src/html_processor.rs | 85 +++++++++++++++++++
.../src/streaming_processor.rs | 57 +++++++++++++
2 files changed, 142 insertions(+)
diff --git a/crates/trusted-server-core/src/html_processor.rs b/crates/trusted-server-core/src/html_processor.rs
index 30550318..95ccf9c3 100644
--- a/crates/trusted-server-core/src/html_processor.rs
+++ b/crates/trusted-server-core/src/html_processor.rs
@@ -1010,4 +1010,89 @@ mod tests {
.collect::()
);
}
+
+ #[test]
+ fn post_processors_accumulate_while_streaming_path_passes_through() {
+ use crate::streaming_processor::{HtmlRewriterAdapter, StreamProcessor};
+ use lol_html::Settings;
+
+ // --- Streaming path: no post-processors → output emitted per chunk ---
+ let mut streaming = HtmlWithPostProcessing {
+ inner: HtmlRewriterAdapter::new(Settings::default()),
+ post_processors: Vec::new(),
+ accumulated_output: Vec::new(),
+ origin_host: String::new(),
+ request_host: String::new(),
+ request_scheme: String::new(),
+ document_state: IntegrationDocumentState::default(),
+ };
+
+ let chunk1 = streaming
+ .process_chunk(b"", false)
+ .expect("should process chunk1");
+ let chunk2 = streaming
+ .process_chunk(b"hello
", false)
+ .expect("should process chunk2");
+ let chunk3 = streaming
+ .process_chunk(b"", true)
+ .expect("should process final chunk");
+
+ assert!(
+ !chunk1.is_empty() || !chunk2.is_empty(),
+ "should emit intermediate output on streaming path"
+ );
+
+ let mut streaming_all = chunk1;
+ streaming_all.extend_from_slice(&chunk2);
+ streaming_all.extend_from_slice(&chunk3);
+
+ // --- Buffered path: post-processor registered → accumulates until is_last ---
+ struct NoopPostProcessor;
+ impl IntegrationHtmlPostProcessor for NoopPostProcessor {
+ fn integration_id(&self) -> &'static str {
+ "test-noop"
+ }
+ fn post_process(&self, _html: &mut String, _ctx: &IntegrationHtmlContext<'_>) -> bool {
+ false
+ }
+ }
+
+ let mut buffered = HtmlWithPostProcessing {
+ inner: HtmlRewriterAdapter::new(Settings::default()),
+ post_processors: vec![Arc::new(NoopPostProcessor)],
+ accumulated_output: Vec::new(),
+ origin_host: String::new(),
+ request_host: String::new(),
+ request_scheme: String::new(),
+ document_state: IntegrationDocumentState::default(),
+ };
+
+ let buf1 = buffered
+ .process_chunk(b"", false)
+ .expect("should process chunk1");
+ let buf2 = buffered
+ .process_chunk(b"hello
", false)
+ .expect("should process chunk2");
+ let buf3 = buffered
+ .process_chunk(b"", true)
+ .expect("should process final chunk");
+
+ assert!(
+ buf1.is_empty() && buf2.is_empty(),
+ "should return empty for intermediate chunks when post-processors are registered"
+ );
+ assert!(
+ !buf3.is_empty(),
+ "should emit all output in final chunk when post-processors are registered"
+ );
+
+ // Both paths should produce identical output
+ let streaming_str =
+ String::from_utf8(streaming_all).expect("streaming output should be valid UTF-8");
+ let buffered_str = String::from_utf8(buf3).expect("buffered output should be valid UTF-8");
+ assert_eq!(
+ streaming_str, buffered_str,
+ "streaming and buffered paths should produce identical output"
+ );
+ }
}
diff --git a/crates/trusted-server-core/src/streaming_processor.rs b/crates/trusted-server-core/src/streaming_processor.rs
index 40ec51cb..4f189926 100644
--- a/crates/trusted-server-core/src/streaming_processor.rs
+++ b/crates/trusted-server-core/src/streaming_processor.rs
@@ -189,6 +189,8 @@ impl StreamingPipeline {
mut writer: W,
) -> Result<(), Report> {
let mut buffer = vec![0u8; self.config.chunk_size];
+ let mut total_read: u64 = 0;
+ let mut total_written: u64 = 0;
loop {
match reader.read(&mut buffer) {
@@ -199,6 +201,7 @@ impl StreamingPipeline {
},
)?;
if !final_chunk.is_empty() {
+ total_written += final_chunk.len() as u64;
writer.write_all(&final_chunk).change_context(
TrustedServerError::Proxy {
message: "Failed to write final chunk".to_string(),
@@ -208,6 +211,7 @@ impl StreamingPipeline {
break;
}
Ok(n) => {
+ total_read += n as u64;
let processed = self
.processor
.process_chunk(&buffer[..n], false)
@@ -215,6 +219,7 @@ impl StreamingPipeline {
message: "Failed to process chunk".to_string(),
})?;
if !processed.is_empty() {
+ total_written += processed.len() as u64;
writer
.write_all(&processed)
.change_context(TrustedServerError::Proxy {
@@ -234,6 +239,10 @@ impl StreamingPipeline {
message: "Failed to flush output".to_string(),
})?;
+ log::debug!(
+ "Streaming pipeline complete: read {total_read} bytes, wrote {total_written} bytes"
+ );
+
Ok(())
}
}
@@ -631,6 +640,54 @@ mod tests {
);
}
+ #[test]
+ fn test_brotli_round_trip_produces_valid_output() {
+ use brotli::enc::writer::CompressorWriter;
+ use brotli::Decompressor;
+ use std::io::{Read as _, Write as _};
+
+ let input_data = b"
hello world";
+
+ // Compress input with brotli
+ let mut compressed_input = Vec::new();
+ {
+ let mut enc = CompressorWriter::new(&mut compressed_input, 4096, 4, 22);
+ enc.write_all(input_data)
+ .expect("should compress test input");
+ enc.flush().expect("should flush brotli encoder");
+ }
+
+ let replacer = StreamingReplacer::new(vec![Replacement {
+ find: "hello".to_string(),
+ replace_with: "hi".to_string(),
+ }]);
+
+ let config = PipelineConfig {
+ input_compression: Compression::Brotli,
+ output_compression: Compression::Brotli,
+ chunk_size: 8192,
+ };
+
+ let mut pipeline = StreamingPipeline::new(config, replacer);
+ let mut output = Vec::new();
+
+ pipeline
+ .process(&compressed_input[..], &mut output)
+ .expect("should process brotli-to-brotli");
+
+ // Decompress output and verify correctness
+ let mut decompressed = Vec::new();
+ Decompressor::new(&output[..], 4096)
+ .read_to_end(&mut decompressed)
+ .expect("should decompress output — implies encoder was finalized correctly");
+
+ assert_eq!(
+ String::from_utf8(decompressed).expect("should be valid UTF-8"),
+ "hi world",
+ "should have replaced content through brotli round-trip"
+ );
+ }
+
#[test]
fn test_html_rewriter_adapter_emits_output_per_chunk() {
use lol_html::Settings;
From e1c6cb81e3c95bbb757a9bba67fa818969ea8658 Mon Sep 17 00:00:00 2001
From: Aram Grigoryan <132480+aram356@users.noreply.github.com>
Date: Thu, 26 Mar 2026 11:05:07 -0700
Subject: [PATCH 20/29] Address deep review: imports, stale comments, brotli
finalization, tests
- Group std imports together (cell, io, rc) before external crates
- Document supported compression combinations on PipelineConfig
- Remove dead-weight byte counters from process_chunks hot loop
- Fix stale comment referencing removed process_through_compression
- Fix brotli finalization: use drop(encoder) instead of into_inner()
to make the intent clear (CompressorWriter writes trailer on drop)
- Document reset() as no-op on HtmlRewriterAdapter (single-use)
- Add brotli round-trip test covering into_inner finalization path
- Add gzip HTML rewriter pipeline test (compressed round-trip with
lol_html, not just StreamingReplacer)
- Add HtmlWithPostProcessing accumulation vs streaming behavior test
---
.../trusted-server-core/src/html_processor.rs | 3 +-
.../src/streaming_processor.rs | 126 +++++++++++++-----
2 files changed, 94 insertions(+), 35 deletions(-)
diff --git a/crates/trusted-server-core/src/html_processor.rs b/crates/trusted-server-core/src/html_processor.rs
index 95ccf9c3..52fba915 100644
--- a/crates/trusted-server-core/src/html_processor.rs
+++ b/crates/trusted-server-core/src/html_processor.rs
@@ -94,8 +94,9 @@ impl StreamProcessor for HtmlWithPostProcessing {
Ok(html.into_bytes())
}
+ /// No-op. `HtmlWithPostProcessing` wraps a single-use
+ /// [`HtmlRewriterAdapter`] and cannot be meaningfully reset.
fn reset(&mut self) {
- self.inner.reset();
self.accumulated_output.clear();
self.document_state.clear();
}
diff --git a/crates/trusted-server-core/src/streaming_processor.rs b/crates/trusted-server-core/src/streaming_processor.rs
index 4f189926..6e915737 100644
--- a/crates/trusted-server-core/src/streaming_processor.rs
+++ b/crates/trusted-server-core/src/streaming_processor.rs
@@ -7,10 +7,10 @@
//! - UTF-8 boundary handling
use std::cell::RefCell;
+use std::io::{self, Read, Write};
use std::rc::Rc;
use error_stack::{Report, ResultExt};
-use std::io::{self, Read, Write};
use crate::error::TrustedServerError;
@@ -56,7 +56,21 @@ impl Compression {
}
}
-/// Configuration for the streaming pipeline
+/// Configuration for the streaming pipeline.
+///
+/// # Supported compression combinations
+///
+/// | Input | Output | Behavior |
+/// |-------|--------|----------|
+/// | None | None | Pass-through processing |
+/// | Gzip | Gzip | Decompress → process → recompress |
+/// | Gzip | None | Decompress → process |
+/// | Deflate | Deflate | Decompress → process → recompress |
+/// | Deflate | None | Decompress → process |
+/// | Brotli | Brotli | Decompress → process → recompress |
+/// | Brotli | None | Decompress → process |
+///
+/// All other combinations return an error at runtime.
pub struct PipelineConfig {
/// Input compression type
pub input_compression: Compression,
@@ -158,8 +172,9 @@ impl StreamingPipeline {
};
let mut encoder = CompressorWriter::with_params(output, 4096, ¶ms);
self.process_chunks(decoder, &mut encoder)?;
- // CompressorWriter finalizes on flush (already called) and into_inner
- encoder.into_inner();
+ // CompressorWriter writes the brotli stream trailer on drop.
+ // process_chunks already called flush(), so drop finalizes cleanly.
+ drop(encoder);
Ok(())
}
(Compression::Brotli, Compression::None) => {
@@ -189,8 +204,6 @@ impl StreamingPipeline {
mut writer: W,
) -> Result<(), Report> {
let mut buffer = vec![0u8; self.config.chunk_size];
- let mut total_read: u64 = 0;
- let mut total_written: u64 = 0;
loop {
match reader.read(&mut buffer) {
@@ -201,7 +214,6 @@ impl StreamingPipeline {
},
)?;
if !final_chunk.is_empty() {
- total_written += final_chunk.len() as u64;
writer.write_all(&final_chunk).change_context(
TrustedServerError::Proxy {
message: "Failed to write final chunk".to_string(),
@@ -211,7 +223,6 @@ impl StreamingPipeline {
break;
}
Ok(n) => {
- total_read += n as u64;
let processed = self
.processor
.process_chunk(&buffer[..n], false)
@@ -219,7 +230,6 @@ impl StreamingPipeline {
message: "Failed to process chunk".to_string(),
})?;
if !processed.is_empty() {
- total_written += processed.len() as u64;
writer
.write_all(&processed)
.change_context(TrustedServerError::Proxy {
@@ -239,10 +249,6 @@ impl StreamingPipeline {
message: "Failed to flush output".to_string(),
})?;
- log::debug!(
- "Streaming pipeline complete: read {total_read} bytes, wrote {total_written} bytes"
- );
-
Ok(())
}
}
@@ -308,10 +314,12 @@ impl StreamProcessor for HtmlRewriterAdapter {
Ok(std::mem::take(&mut *self.output.borrow_mut()))
}
- fn reset(&mut self) {
- // No-op: the rewriter consumed its Settings on construction.
- // Single-use by design (one adapter per request).
- }
+ /// No-op. `HtmlRewriterAdapter` is single-use: the rewriter consumes its
+ /// [`Settings`](lol_html::Settings) on construction and cannot be recreated.
+ /// Calling [`process_chunk`](StreamProcessor::process_chunk) after
+ /// [`process_chunk`](StreamProcessor::process_chunk) with `is_last = true`
+ /// will produce empty output.
+ fn reset(&mut self) {}
}
/// Adapter to use our existing `StreamingReplacer` as a `StreamProcessor`
@@ -468,40 +476,33 @@ mod tests {
}
#[test]
- fn test_html_rewriter_adapter_reset_is_noop() {
+ fn test_html_rewriter_adapter_reset_then_finalize() {
use lol_html::Settings;
let settings = Settings::default();
let mut adapter = HtmlRewriterAdapter::new(settings);
- // Process some content
- let result1 = adapter
+ adapter
.process_chunk(b"
test", false)
.expect("should process html");
- // Reset is a no-op — the adapter is single-use by design
+ // reset() is a documented no-op — adapter is single-use
adapter.reset();
- // The rewriter is still alive; finalize it
- let result2 = adapter
+ // Finalize still works; the rewriter is still alive
+ let final_output = adapter
.process_chunk(b"", true)
.expect("should finalize after reset");
- let mut all_output = result1;
- all_output.extend_from_slice(&result2);
-
- let output = String::from_utf8(all_output).expect("output should be valid UTF-8");
- assert!(
- output.contains("test"),
- "should still produce output after no-op reset"
- );
+ // Output may or may not be empty depending on lol_html buffering,
+ // but it should not error
+ let _ = final_output;
}
#[test]
fn test_deflate_round_trip_produces_valid_output() {
- // Verify that deflate-to-deflate (which uses process_through_compression)
- // produces valid output that decompresses correctly. This establishes the
- // correctness contract before we change the finalization path.
+ // Verify that deflate-to-deflate produces valid output that decompresses
+ // correctly, confirming that encoder finalization works.
use flate2::read::ZlibDecoder;
use flate2::write::ZlibEncoder;
use std::io::{Read as _, Write as _};
@@ -772,4 +773,61 @@ mod tests {
"Should not contain original URL"
);
}
+
+ #[test]
+ fn test_gzip_pipeline_with_html_rewriter() {
+ use flate2::read::GzDecoder;
+ use flate2::write::GzEncoder;
+ use lol_html::{element, Settings};
+ use std::io::{Read as _, Write as _};
+
+ let settings = Settings {
+ element_content_handlers: vec![element!("a[href]", |el| {
+ if let Some(href) = el.get_attribute("href") {
+ if href.contains("example.com") {
+ el.set_attribute("href", &href.replace("example.com", "test.com"))?;
+ }
+ }
+ Ok(())
+ })],
+ ..Settings::default()
+ };
+
+ let input = b"Link";
+
+ let mut compressed_input = Vec::new();
+ {
+ let mut enc = GzEncoder::new(&mut compressed_input, flate2::Compression::default());
+ enc.write_all(input).expect("should compress test input");
+ enc.finish().expect("should finish compression");
+ }
+
+ let adapter = HtmlRewriterAdapter::new(settings);
+ let config = PipelineConfig {
+ input_compression: Compression::Gzip,
+ output_compression: Compression::Gzip,
+ chunk_size: 8192,
+ };
+ let mut pipeline = StreamingPipeline::new(config, adapter);
+ let mut output = Vec::new();
+
+ pipeline
+ .process(&compressed_input[..], &mut output)
+ .expect("pipeline should process gzip HTML");
+
+ let mut decompressed = Vec::new();
+ GzDecoder::new(&output[..])
+ .read_to_end(&mut decompressed)
+ .expect("should decompress output");
+
+ let result = String::from_utf8(decompressed).expect("output should be valid UTF-8");
+ assert!(
+ result.contains("https://test.com"),
+ "should have replaced URL through gzip HTML pipeline"
+ );
+ assert!(
+ !result.contains("example.com"),
+ "should not contain original URL after gzip HTML pipeline"
+ );
+ }
}
From 9753026afc8a81e2f1ddee452bab727df08f05b5 Mon Sep 17 00:00:00 2001
From: Aram Grigoryan <132480+aram356@users.noreply.github.com>
Date: Thu, 26 Mar 2026 11:12:42 -0700
Subject: [PATCH 21/29] Address second deep review: correctness, docs, and test
robustness
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
- Add Eq derive to Compression enum (all unit variants, trivially correct)
- Brotli finalization: use into_inner() instead of drop() to skip
redundant flush and make finalization explicit
- Document process_chunks flush semantics: callers must still call
encoder-specific finalization after this method returns
- Warn when HtmlRewriterAdapter receives data after finalization
(rewriter already consumed, data would be silently lost)
- Make HtmlWithPostProcessing::reset() a true no-op matching its doc
(clearing auxiliary state without resetting rewriter is inconsistent)
- Document extra copying overhead on post-processor path vs streaming
- Assert output content in reset-then-finalize test (was discarded)
- Relax per-chunk emission test to not depend on lol_html internal
buffering behavior — assert concatenated correctness + at least one
intermediate chunk emitted
---
.../trusted-server-core/src/html_processor.rs | 19 +++--
.../src/streaming_processor.rs | 82 +++++++++++--------
2 files changed, 64 insertions(+), 37 deletions(-)
diff --git a/crates/trusted-server-core/src/html_processor.rs b/crates/trusted-server-core/src/html_processor.rs
index 52fba915..d9840cfb 100644
--- a/crates/trusted-server-core/src/html_processor.rs
+++ b/crates/trusted-server-core/src/html_processor.rs
@@ -17,6 +17,16 @@ use crate::settings::Settings;
use crate::streaming_processor::{HtmlRewriterAdapter, StreamProcessor};
use crate::tsjs;
+/// Wraps [`HtmlRewriterAdapter`] with optional post-processing.
+///
+/// When `post_processors` is empty (the common streaming path), chunks pass
+/// through immediately with no extra copying. When post-processors are
+/// registered, intermediate output is accumulated in `accumulated_output`
+/// until `is_last`, then post-processors run on the full document. This adds
+/// an extra copy per chunk compared to the pre-streaming adapter (which
+/// accumulated raw input instead of rewriter output). The overhead is
+/// acceptable because the post-processor path is already fully buffered —
+/// the real streaming win comes from the empty-post-processor path in Phase 2.
struct HtmlWithPostProcessing {
inner: HtmlRewriterAdapter,
post_processors: Vec>,
@@ -95,11 +105,10 @@ impl StreamProcessor for HtmlWithPostProcessing {
}
/// No-op. `HtmlWithPostProcessing` wraps a single-use
- /// [`HtmlRewriterAdapter`] and cannot be meaningfully reset.
- fn reset(&mut self) {
- self.accumulated_output.clear();
- self.document_state.clear();
- }
+ /// [`HtmlRewriterAdapter`] that cannot be reset. Clearing auxiliary
+ /// state without resetting the rewriter would leave the processor
+ /// in an inconsistent state, so this method intentionally does nothing.
+ fn reset(&mut self) {}
}
/// Configuration for HTML processing
diff --git a/crates/trusted-server-core/src/streaming_processor.rs b/crates/trusted-server-core/src/streaming_processor.rs
index 6e915737..3915494c 100644
--- a/crates/trusted-server-core/src/streaming_processor.rs
+++ b/crates/trusted-server-core/src/streaming_processor.rs
@@ -35,7 +35,7 @@ pub trait StreamProcessor {
}
/// Compression type for the stream
-#[derive(Debug, Clone, Copy, PartialEq)]
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Compression {
None,
Gzip,
@@ -172,9 +172,12 @@ impl StreamingPipeline {
};
let mut encoder = CompressorWriter::with_params(output, 4096, ¶ms);
self.process_chunks(decoder, &mut encoder)?;
- // CompressorWriter writes the brotli stream trailer on drop.
- // process_chunks already called flush(), so drop finalizes cleanly.
- drop(encoder);
+ // CompressorWriter emits the brotli stream trailer via flush(),
+ // which process_chunks already called. into_inner() avoids a
+ // redundant flush on drop and makes finalization explicit.
+ // Note: unlike flate2's finish(), CompressorWriter has no
+ // fallible finalization method — flush() is the only option.
+ let _ = encoder.into_inner();
Ok(())
}
(Compression::Brotli, Compression::None) => {
@@ -191,9 +194,11 @@ impl StreamingPipeline {
/// Read chunks from `reader`, pass each through the processor, and write output to `writer`.
///
/// This is the single unified chunk loop used by all compression paths.
- /// The caller is responsible for wrapping `reader`/`writer` in the appropriate
- /// decoder/encoder and for finalizing the encoder (e.g., calling `finish()`)
- /// after this method returns.
+ /// The method calls `writer.flush()` before returning. For the `None → None`
+ /// path this is the only finalization needed. For compressed paths, the caller
+ /// must still call the encoder's type-specific finalization (e.g., `finish()`
+ /// for flate2, `into_inner()` for brotli) — `flush()` alone does not write
+ /// compression trailers for all codecs.
///
/// # Errors
///
@@ -292,13 +297,22 @@ impl HtmlRewriterAdapter {
impl StreamProcessor for HtmlRewriterAdapter {
fn process_chunk(&mut self, chunk: &[u8], is_last: bool) -> Result, io::Error> {
- if let Some(rewriter) = &mut self.rewriter {
- if !chunk.is_empty() {
- rewriter.write(chunk).map_err(|e| {
- log::error!("Failed to process HTML chunk: {e}");
- io::Error::other(format!("HTML processing failed: {e}"))
- })?;
+ match &mut self.rewriter {
+ Some(rewriter) => {
+ if !chunk.is_empty() {
+ rewriter.write(chunk).map_err(|e| {
+ log::error!("Failed to process HTML chunk: {e}");
+ io::Error::other(format!("HTML processing failed: {e}"))
+ })?;
+ }
+ }
+ None if !chunk.is_empty() => {
+ log::warn!(
+ "HtmlRewriterAdapter: {} bytes received after finalization, data will be lost",
+ chunk.len()
+ );
}
+ None => {}
}
if is_last {
@@ -482,7 +496,7 @@ mod tests {
let settings = Settings::default();
let mut adapter = HtmlRewriterAdapter::new(settings);
- adapter
+ let result1 = adapter
.process_chunk(b"test", false)
.expect("should process html");
@@ -490,13 +504,17 @@ mod tests {
adapter.reset();
// Finalize still works; the rewriter is still alive
- let final_output = adapter
+ let result2 = adapter
.process_chunk(b"", true)
.expect("should finalize after reset");
- // Output may or may not be empty depending on lol_html buffering,
- // but it should not error
- let _ = final_output;
+ let mut all_output = result1;
+ all_output.extend_from_slice(&result2);
+ let output = String::from_utf8(all_output).expect("output should be valid UTF-8");
+ assert!(
+ output.contains("test"),
+ "should produce correct output despite no-op reset"
+ );
}
#[test]
@@ -696,27 +714,27 @@ mod tests {
let settings = Settings::default();
let mut adapter = HtmlRewriterAdapter::new(settings);
- // Send three chunks
- let chunk1 = b"";
+ // Send three chunks — lol_html may buffer internally, so individual
+ // chunk outputs may vary by version. The contract is that concatenated
+ // output is correct, and that output is not deferred entirely to is_last.
let result1 = adapter
- .process_chunk(chunk1, false)
+ .process_chunk(b"", false)
.expect("should process chunk1");
- assert!(
- !result1.is_empty(),
- "should emit output for first chunk, got empty"
- );
-
- let chunk2 = b"hello
";
let result2 = adapter
- .process_chunk(chunk2, false)
+ .process_chunk(b"hello
", false)
.expect("should process chunk2");
-
- let chunk3 = b"";
let result3 = adapter
- .process_chunk(chunk3, true)
+ .process_chunk(b"", true)
.expect("should process final chunk");
- // Concatenate all outputs and verify correctness
+ // At least one intermediate chunk should produce output (verifies
+ // we're not deferring everything to is_last like the old adapter).
+ assert!(
+ !result1.is_empty() || !result2.is_empty(),
+ "should emit some output before is_last"
+ );
+
+ // Concatenated output must be correct
let mut all_output = result1;
all_output.extend_from_slice(&result2);
all_output.extend_from_slice(&result3);
From 0a4ece7c82480df7e07ef6ace4ea5773dcd0ac02 Mon Sep 17 00:00:00 2001
From: Aram Grigoryan <132480+aram356@users.noreply.github.com>
Date: Thu, 26 Mar 2026 11:32:43 -0700
Subject: [PATCH 22/29] Add active post-processor test and precise flush docs
per codec
- Add test that feeds multiple chunks through HtmlWithPostProcessing
with an active post-processor (should_process returns true, mutates
HTML). Verifies the post-processor receives the complete accumulated
document and its mutations appear in the output.
- Make flush semantics per-codec explicit in process_chunks doc:
flate2 needs finish() after flush, brotli is finalized by flush
---
.../trusted-server-core/src/html_processor.rs | 62 +++++++++++++++++++
.../src/streaming_processor.rs | 8 ++-
2 files changed, 67 insertions(+), 3 deletions(-)
diff --git a/crates/trusted-server-core/src/html_processor.rs b/crates/trusted-server-core/src/html_processor.rs
index d9840cfb..9e6efafb 100644
--- a/crates/trusted-server-core/src/html_processor.rs
+++ b/crates/trusted-server-core/src/html_processor.rs
@@ -1105,4 +1105,66 @@ mod tests {
"streaming and buffered paths should produce identical output"
);
}
+
+ #[test]
+ fn active_post_processor_receives_full_document_and_mutates_output() {
+ use crate::streaming_processor::{HtmlRewriterAdapter, StreamProcessor};
+ use lol_html::Settings;
+
+ struct AppendCommentProcessor;
+ impl IntegrationHtmlPostProcessor for AppendCommentProcessor {
+ fn integration_id(&self) -> &'static str {
+ "test-append"
+ }
+ fn should_process(&self, html: &str, _ctx: &IntegrationHtmlContext<'_>) -> bool {
+ html.contains("")
+ }
+ fn post_process(&self, html: &mut String, _ctx: &IntegrationHtmlContext<'_>) -> bool {
+ html.push_str("");
+ true
+ }
+ }
+
+ let mut processor = HtmlWithPostProcessing {
+ inner: HtmlRewriterAdapter::new(Settings::default()),
+ post_processors: vec![Arc::new(AppendCommentProcessor)],
+ accumulated_output: Vec::new(),
+ origin_host: String::new(),
+ request_host: String::new(),
+ request_scheme: String::new(),
+ document_state: IntegrationDocumentState::default(),
+ };
+
+ // Feed multiple chunks
+ let r1 = processor
+ .process_chunk(b"", false)
+ .expect("should process chunk1");
+ let r2 = processor
+ .process_chunk(b"content
", false)
+ .expect("should process chunk2");
+ let r3 = processor
+ .process_chunk(b"", true)
+ .expect("should process final chunk");
+
+ // Intermediate chunks return empty (buffered for post-processor)
+ assert!(
+ r1.is_empty() && r2.is_empty(),
+ "should buffer intermediate chunks"
+ );
+
+ // Final chunk contains the full document with post-processor mutation
+ let output = String::from_utf8(r3).expect("should be valid UTF-8");
+ assert!(
+ output.contains("content
"),
+ "should contain original content"
+ );
+ assert!(
+ output.contains(""),
+ "should contain complete document"
+ );
+ assert!(
+ output.contains(""),
+ "should contain post-processor mutation"
+ );
+ }
}
diff --git a/crates/trusted-server-core/src/streaming_processor.rs b/crates/trusted-server-core/src/streaming_processor.rs
index 3915494c..ac226d95 100644
--- a/crates/trusted-server-core/src/streaming_processor.rs
+++ b/crates/trusted-server-core/src/streaming_processor.rs
@@ -196,9 +196,11 @@ impl StreamingPipeline {
/// This is the single unified chunk loop used by all compression paths.
/// The method calls `writer.flush()` before returning. For the `None → None`
/// path this is the only finalization needed. For compressed paths, the caller
- /// must still call the encoder's type-specific finalization (e.g., `finish()`
- /// for flate2, `into_inner()` for brotli) — `flush()` alone does not write
- /// compression trailers for all codecs.
+ /// must still call the encoder's type-specific finalization after this returns:
+ /// - **flate2** (`GzEncoder`, `ZlibEncoder`): call `finish()` — `flush()` does
+ /// not write the gzip/deflate trailer.
+ /// - **brotli** (`CompressorWriter`): `flush()` does finalize the stream, so
+ /// the caller only needs `into_inner()` to reclaim the writer.
///
/// # Errors
///
From 68d11e875754623892bf66b1730f73716d9cea30 Mon Sep 17 00:00:00 2001
From: Aram Grigoryan <132480+aram356@users.noreply.github.com>
Date: Thu, 26 Mar 2026 11:46:21 -0700
Subject: [PATCH 23/29] Fix text node fragmentation regression for script
rewriters
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
lol_html fragments text nodes across chunk boundaries when fed
incrementally. This breaks script rewriters (NextJS __NEXT_DATA__,
GTM) that expect complete text content — a split domain like
"google" + "tagmanager.com" would silently miss the rewrite.
Add dual-mode HtmlRewriterAdapter:
- new(): streaming mode, emits output per chunk (no script rewriters)
- new_buffered(): accumulates input, feeds lol_html in one write()
call on is_last (script rewriters registered)
create_html_processor selects the mode based on whether
script_rewriters is non-empty. This preserves the old behavior
(single-pass processing) when rewriters need it, while enabling
streaming when they don't.
Add regression test proving lol_html does fragment text across
chunk boundaries, confirming the issue is real.
---
.../trusted-server-core/src/html_processor.rs | 15 +-
.../src/streaming_processor.rs | 135 ++++++++++++++++--
2 files changed, 137 insertions(+), 13 deletions(-)
diff --git a/crates/trusted-server-core/src/html_processor.rs b/crates/trusted-server-core/src/html_processor.rs
index 9e6efafb..1839eb59 100644
--- a/crates/trusted-server-core/src/html_processor.rs
+++ b/crates/trusted-server-core/src/html_processor.rs
@@ -455,6 +455,7 @@ pub fn create_html_processor(config: HtmlProcessorConfig) -> impl StreamProcesso
}),
];
+ let has_script_rewriters = !script_rewriters.is_empty();
for script_rewriter in script_rewriters {
let selector = script_rewriter.selector();
let rewriter = script_rewriter.clone();
@@ -492,8 +493,20 @@ pub fn create_html_processor(config: HtmlProcessorConfig) -> impl StreamProcesso
..RewriterSettings::default()
};
+ // Use buffered mode when script rewriters are registered. lol_html fragments
+ // text nodes across chunk boundaries during streaming, which breaks rewriters
+ // that expect complete text content (e.g., __NEXT_DATA__, GTM inline scripts).
+ // Buffered mode feeds the entire document to lol_html in one write() call,
+ // preserving text node integrity. When no script rewriters are active,
+ // streaming mode emits output incrementally per chunk.
+ let inner = if has_script_rewriters {
+ HtmlRewriterAdapter::new_buffered(rewriter_settings)
+ } else {
+ HtmlRewriterAdapter::new(rewriter_settings)
+ };
+
HtmlWithPostProcessing {
- inner: HtmlRewriterAdapter::new(rewriter_settings),
+ inner,
post_processors,
accumulated_output: Vec::new(),
origin_host: config.origin_host,
diff --git a/crates/trusted-server-core/src/streaming_processor.rs b/crates/trusted-server-core/src/streaming_processor.rs
index ac226d95..2ca71bc0 100644
--- a/crates/trusted-server-core/src/streaming_processor.rs
+++ b/crates/trusted-server-core/src/streaming_processor.rs
@@ -275,16 +275,33 @@ impl lol_html::OutputSink for RcVecSink {
/// Adapter to use `lol_html` [`HtmlRewriter`](lol_html::HtmlRewriter) as a [`StreamProcessor`].
///
-/// Output is emitted incrementally on every [`StreamProcessor::process_chunk`] call.
+/// Operates in one of two modes:
+///
+/// - **Streaming** (`buffered = false`): output is emitted incrementally on every
+/// [`StreamProcessor::process_chunk`] call. Use when no script rewriters are
+/// registered.
+/// - **Buffered** (`buffered = true`): input is accumulated and processed in a
+/// single `write()` call on `is_last`. Use when script rewriters are registered,
+/// because `lol_html` fragments text nodes across chunk boundaries and rewriters
+/// that expect complete text content (e.g., `__NEXT_DATA__`, GTM) would silently
+/// miss rewrites on split fragments.
+///
/// The adapter is single-use: one adapter per request. Calling [`StreamProcessor::reset`]
/// is a no-op because the rewriter consumes its settings on construction.
pub struct HtmlRewriterAdapter {
rewriter: Option>,
output: Rc>>,
+ /// When true, input is accumulated and fed to `lol_html` in one pass on `is_last`.
+ buffered: bool,
+ /// Accumulated input for the buffered path.
+ accumulated_input: Vec,
}
impl HtmlRewriterAdapter {
/// Create a new HTML rewriter adapter that streams output per chunk.
+ ///
+ /// Use [`Self::new_buffered`] when script rewriters are registered to
+ /// avoid text node fragmentation.
#[must_use]
pub fn new(settings: lol_html::Settings<'static, 'static>) -> Self {
let output = Rc::new(RefCell::new(Vec::new()));
@@ -293,28 +310,69 @@ impl HtmlRewriterAdapter {
Self {
rewriter: Some(rewriter),
output,
+ buffered: false,
+ accumulated_input: Vec::new(),
+ }
+ }
+
+ /// Create a new HTML rewriter adapter that buffers all input before processing.
+ ///
+ /// This avoids `lol_html` text node fragmentation that breaks script rewriters
+ /// expecting complete text content. The entire document is fed to the rewriter
+ /// in a single `write()` call when `is_last` is true.
+ #[must_use]
+ pub fn new_buffered(settings: lol_html::Settings<'static, 'static>) -> Self {
+ let output = Rc::new(RefCell::new(Vec::new()));
+ let sink = RcVecSink(Rc::clone(&output));
+ let rewriter = lol_html::HtmlRewriter::new(settings, sink);
+ Self {
+ rewriter: Some(rewriter),
+ output,
+ buffered: true,
+ accumulated_input: Vec::new(),
}
}
}
impl StreamProcessor for HtmlRewriterAdapter {
fn process_chunk(&mut self, chunk: &[u8], is_last: bool) -> Result, io::Error> {
- match &mut self.rewriter {
- Some(rewriter) => {
- if !chunk.is_empty() {
- rewriter.write(chunk).map_err(|e| {
- log::error!("Failed to process HTML chunk: {e}");
+ if self.buffered {
+ // Buffered mode: accumulate input, process all at once on is_last.
+ if !chunk.is_empty() {
+ self.accumulated_input.extend_from_slice(chunk);
+ }
+ if !is_last {
+ return Ok(Vec::new());
+ }
+ // Feed entire document to lol_html in one pass
+ if let Some(rewriter) = &mut self.rewriter {
+ if !self.accumulated_input.is_empty() {
+ let input = std::mem::take(&mut self.accumulated_input);
+ rewriter.write(&input).map_err(|e| {
+ log::error!("Failed to process HTML: {e}");
io::Error::other(format!("HTML processing failed: {e}"))
})?;
}
}
- None if !chunk.is_empty() => {
- log::warn!(
- "HtmlRewriterAdapter: {} bytes received after finalization, data will be lost",
- chunk.len()
- );
+ } else {
+ // Streaming mode: feed chunks to lol_html incrementally.
+ match &mut self.rewriter {
+ Some(rewriter) => {
+ if !chunk.is_empty() {
+ rewriter.write(chunk).map_err(|e| {
+ log::error!("Failed to process HTML chunk: {e}");
+ io::Error::other(format!("HTML processing failed: {e}"))
+ })?;
+ }
+ }
+ None if !chunk.is_empty() => {
+ log::warn!(
+ "HtmlRewriterAdapter: {} bytes received after finalization, data will be lost",
+ chunk.len()
+ );
+ }
+ None => {}
}
- None => {}
}
if is_last {
@@ -352,6 +410,59 @@ mod tests {
use super::*;
use crate::streaming_replacer::{Replacement, StreamingReplacer};
+ /// Verify that `lol_html` fragments text nodes when input chunks split
+ /// mid-text-node. This is critical: if `lol_html` does fragment, then
+ /// script rewriters (`NextJS` `__NEXT_DATA__`, `GTM`) that expect full
+ /// text content will silently miss rewrites when the streaming adapter
+ /// feeds chunks incrementally.
+ #[test]
+ fn lol_html_fragments_text_across_chunk_boundaries() {
+ use std::cell::RefCell;
+ use std::rc::Rc;
+
+ let fragments: Rc>> = Rc::new(RefCell::new(Vec::new()));
+ let fragments_clone = Rc::clone(&fragments);
+
+ let mut rewriter = lol_html::HtmlRewriter::new(
+ lol_html::Settings {
+ element_content_handlers: vec![lol_html::text!("script", move |text| {
+ fragments_clone
+ .borrow_mut()
+ .push((text.as_str().to_string(), text.last_in_text_node()));
+ Ok(())
+ })],
+ ..lol_html::Settings::default()
+ },
+ |_chunk: &[u8]| {},
+ );
+
+ // Split "googletagmanager.com/gtm.js" across two chunks
+ rewriter
+ .write(b"")
+ .expect("should write chunk2");
+ rewriter.end().expect("should end");
+
+ let frags = fragments.borrow();
+ // lol_html should emit at least 2 text fragments since input was split
+ assert!(
+ frags.len() >= 2,
+ "should fragment text across chunk boundaries, got {} fragments: {:?}",
+ frags.len(),
+ *frags
+ );
+ // No single fragment should contain the full domain
+ assert!(
+ !frags
+ .iter()
+ .any(|(text, _)| text.contains("googletagmanager.com")),
+ "no individual fragment should contain the full domain when split across chunks: {:?}",
+ *frags
+ );
+ }
+
#[test]
fn test_uncompressed_pipeline() {
let replacer = StreamingReplacer::new(vec![Replacement {
From 6faeea0190099e7c347b25dfd64727d9639e18cb Mon Sep 17 00:00:00 2001
From: Aram Grigoryan <132480+aram356@users.noreply.github.com>
Date: Thu, 26 Mar 2026 13:59:51 -0700
Subject: [PATCH 24/29] Gate streaming adapter on script rewriter presence
lol_html fragments text nodes across input chunk boundaries. Script
rewriters (NextJS __NEXT_DATA__, GTM) expect complete text content
and would silently miss rewrites on split fragments.
Add dual-mode HtmlRewriterAdapter:
- new(): streaming, emits output per chunk (no script rewriters)
- new_buffered(): accumulates input, single write() on is_last
create_html_processor selects mode based on script_rewriters. This
preserves correctness while enabling streaming for configs without
script rewriters. Phase 3 will make rewriters fragment-safe.
Add regression test proving lol_html does fragment text nodes.
---
.../trusted-server-core/src/html_processor.rs | 9 ++++-----
.../src/streaming_processor.rs | 19 +++++++++----------
2 files changed, 13 insertions(+), 15 deletions(-)
diff --git a/crates/trusted-server-core/src/html_processor.rs b/crates/trusted-server-core/src/html_processor.rs
index 1839eb59..079681db 100644
--- a/crates/trusted-server-core/src/html_processor.rs
+++ b/crates/trusted-server-core/src/html_processor.rs
@@ -494,11 +494,10 @@ pub fn create_html_processor(config: HtmlProcessorConfig) -> impl StreamProcesso
};
// Use buffered mode when script rewriters are registered. lol_html fragments
- // text nodes across chunk boundaries during streaming, which breaks rewriters
- // that expect complete text content (e.g., __NEXT_DATA__, GTM inline scripts).
- // Buffered mode feeds the entire document to lol_html in one write() call,
- // preserving text node integrity. When no script rewriters are active,
- // streaming mode emits output incrementally per chunk.
+ // text nodes across input chunk boundaries, breaking rewriters that expect
+ // complete text (e.g., __NEXT_DATA__, GTM). Buffered mode feeds the entire
+ // document in one write() call, preserving text node integrity.
+ // Phase 3 will make rewriters fragment-safe, enabling streaming for all configs.
let inner = if has_script_rewriters {
HtmlRewriterAdapter::new_buffered(rewriter_settings)
} else {
diff --git a/crates/trusted-server-core/src/streaming_processor.rs b/crates/trusted-server-core/src/streaming_processor.rs
index 2ca71bc0..a65958dc 100644
--- a/crates/trusted-server-core/src/streaming_processor.rs
+++ b/crates/trusted-server-core/src/streaming_processor.rs
@@ -277,14 +277,14 @@ impl lol_html::OutputSink for RcVecSink {
///
/// Operates in one of two modes:
///
-/// - **Streaming** (`buffered = false`): output is emitted incrementally on every
-/// [`StreamProcessor::process_chunk`] call. Use when no script rewriters are
-/// registered.
-/// - **Buffered** (`buffered = true`): input is accumulated and processed in a
-/// single `write()` call on `is_last`. Use when script rewriters are registered,
-/// because `lol_html` fragments text nodes across chunk boundaries and rewriters
-/// that expect complete text content (e.g., `__NEXT_DATA__`, GTM) would silently
-/// miss rewrites on split fragments.
+/// - **Streaming** ([`new`](Self::new)): output is emitted incrementally on every
+/// [`process_chunk`](StreamProcessor::process_chunk) call. Use when no script
+/// rewriters are registered.
+/// - **Buffered** ([`new_buffered`](Self::new_buffered)): input is accumulated and
+/// processed in a single `write()` call on `is_last`. Use when script rewriters
+/// are registered, because `lol_html` fragments text nodes across chunk boundaries
+/// and rewriters that expect complete text content would silently miss rewrites on
+/// split fragments. (See Phase 3 plan for making rewriters fragment-safe.)
///
/// The adapter is single-use: one adapter per request. Calling [`StreamProcessor::reset`]
/// is a no-op because the rewriter consumes its settings on construction.
@@ -344,7 +344,6 @@ impl StreamProcessor for HtmlRewriterAdapter {
if !is_last {
return Ok(Vec::new());
}
- // Feed entire document to lol_html in one pass
if let Some(rewriter) = &mut self.rewriter {
if !self.accumulated_input.is_empty() {
let input = std::mem::take(&mut self.accumulated_input);
@@ -355,7 +354,7 @@ impl StreamProcessor for HtmlRewriterAdapter {
}
}
} else {
- // Streaming mode: feed chunks to lol_html incrementally.
+ // Streaming mode: feed chunks to `lol_html` incrementally.
match &mut self.rewriter {
Some(rewriter) => {
if !chunk.is_empty() {
From 73c992e8b8b3fe13995858c140fabc570e624e32 Mon Sep 17 00:00:00 2001
From: Aram Grigoryan <132480+aram356@users.noreply.github.com>
Date: Thu, 26 Mar 2026 14:01:49 -0700
Subject: [PATCH 25/29] Document text node fragmentation workaround and Phase 3
plan
Add section to spec explaining the lol_html text fragmentation issue,
the dual-mode HtmlRewriterAdapter workaround (Phase 1), and the
planned fix to make script rewriters fragment-safe (Phase 3, #584).
---
.../2026-03-25-streaming-response-design.md | 16 ++++++++++++++++
1 file changed, 16 insertions(+)
diff --git a/docs/superpowers/specs/2026-03-25-streaming-response-design.md b/docs/superpowers/specs/2026-03-25-streaming-response-design.md
index 72716b73..c42afd5c 100644
--- a/docs/superpowers/specs/2026-03-25-streaming-response-design.md
+++ b/docs/superpowers/specs/2026-03-25-streaming-response-design.md
@@ -239,6 +239,22 @@ during streaming — they do not require buffering and are unaffected by this
change. The streaming gate checks only `html_post_processors().is_empty()`, not
script rewriters. Currently only Next.js registers a post-processor.
+## Text Node Fragmentation (Phase 3)
+
+`lol_html` fragments text nodes across input chunk boundaries when processing
+HTML incrementally. Script rewriters (`NextJsNextDataRewriter`,
+`GoogleTagManagerIntegration`) expect complete text content — if a domain string
+is split across chunks, the rewrite silently fails.
+
+**Phase 1 workaround**: `HtmlRewriterAdapter` has two modes. `new()` streams
+per chunk (no script rewriters). `new_buffered()` accumulates input and
+processes in one `write()` call (script rewriters registered).
+`create_html_processor` selects the mode automatically.
+
+**Phase 3** will make each script rewriter fragment-safe by accumulating text
+fragments internally via `is_last_in_text_node`. This removes the buffered
+fallback and enables streaming for all configurations. See #584.
+
## Rollback Strategy
The `#[fastly::main]` to raw `main()` migration is a structural change. If
From 75f455acc37f7aebd23c0ba67639f1bdba443faa Mon Sep 17 00:00:00 2001
From: Aram Grigoryan <132480+aram356@users.noreply.github.com>
Date: Thu, 26 Mar 2026 14:06:20 -0700
Subject: [PATCH 26/29] Add buffered mode guard, anti-fragmentation test, and
fix stale spec
- Add post-finalization warning to buffered path (was only in streaming)
- Add buffered_adapter_prevents_text_fragmentation test proving
new_buffered() delivers complete text to lol_html handlers
- Fix spec: html_processor.rs is changed (selects adapter mode), and
script_rewriters do require buffered mode (not "unaffected")
---
.../src/streaming_processor.rs | 60 ++++++++++++++++++-
.../2026-03-25-streaming-response-design.md | 16 +++--
2 files changed, 69 insertions(+), 7 deletions(-)
diff --git a/crates/trusted-server-core/src/streaming_processor.rs b/crates/trusted-server-core/src/streaming_processor.rs
index a65958dc..5a4ea290 100644
--- a/crates/trusted-server-core/src/streaming_processor.rs
+++ b/crates/trusted-server-core/src/streaming_processor.rs
@@ -339,7 +339,14 @@ impl StreamProcessor for HtmlRewriterAdapter {
if self.buffered {
// Buffered mode: accumulate input, process all at once on is_last.
if !chunk.is_empty() {
- self.accumulated_input.extend_from_slice(chunk);
+ if self.rewriter.is_none() {
+ log::warn!(
+ "HtmlRewriterAdapter: {} bytes received after finalization, data will be lost",
+ chunk.len()
+ );
+ } else {
+ self.accumulated_input.extend_from_slice(chunk);
+ }
}
if !is_last {
return Ok(Vec::new());
@@ -462,6 +469,57 @@ mod tests {
);
}
+ /// Companion to [`lol_html_fragments_text_across_chunk_boundaries`]:
+ /// proves that `new_buffered()` prevents fragmentation by feeding the
+ /// entire document to `lol_html` in one `write()` call.
+ #[test]
+ fn buffered_adapter_prevents_text_fragmentation() {
+ use std::cell::RefCell;
+ use std::rc::Rc;
+
+ let fragments: Rc>> = Rc::new(RefCell::new(Vec::new()));
+ let fragments_clone = Rc::clone(&fragments);
+
+ let settings = lol_html::Settings {
+ element_content_handlers: vec![lol_html::text!("script", move |text| {
+ fragments_clone
+ .borrow_mut()
+ .push((text.as_str().to_string(), text.last_in_text_node()));
+ Ok(())
+ })],
+ ..lol_html::Settings::default()
+ };
+
+ let mut adapter = HtmlRewriterAdapter::new_buffered(settings);
+
+ // Feed the same split chunks as the fragmentation test
+ let r1 = adapter
+ .process_chunk(b"", true)
+ .expect("should process chunk2");
+ assert!(
+ !r2.is_empty(),
+ "buffered adapter should emit output on is_last"
+ );
+
+ let frags = fragments.borrow();
+ // With buffered mode, the text handler should see the complete string
+ assert!(
+ frags
+ .iter()
+ .any(|(text, _)| text.contains("googletagmanager.com")),
+ "buffered adapter should deliver complete text to handler, got: {:?}",
+ *frags
+ );
+ }
+
#[test]
fn test_uncompressed_pipeline() {
let replacer = StreamingReplacer::new(vec![Replacement {
diff --git a/docs/superpowers/specs/2026-03-25-streaming-response-design.md b/docs/superpowers/specs/2026-03-25-streaming-response-design.md
index c42afd5c..034624b5 100644
--- a/docs/superpowers/specs/2026-03-25-streaming-response-design.md
+++ b/docs/superpowers/specs/2026-03-25-streaming-response-design.md
@@ -224,9 +224,10 @@ headers are sent, we are committed.
| `crates/trusted-server-core/src/publisher.rs` | Refactor `process_response_streaming` to accept `W: Write` instead of hardcoding `Vec`; split `handle_publisher_request` into streaming vs buffered paths; reorder synthetic ID/cookie logic before streaming | Medium |
| `crates/trusted-server-adapter-fastly/src/main.rs` | Migrate from `#[fastly::main]` to undecorated `main()` with `Request::from_client()`; explicit error handling via `to_error_response().send_to_client()`; call `finalize_response()` before streaming | Medium |
-**Not changed**: `html_processor.rs` (builds lol_html `Settings` passed to
-`HtmlRewriterAdapter`, works as-is), integration registration, JS build
-pipeline, tsjs module serving, auction handler, cookie/synthetic ID logic.
+**Minimal changes**: `html_processor.rs` now selects `HtmlRewriterAdapter` mode
+based on script rewriter presence (see [Text Node Fragmentation](#text-node-fragmentation-phase-3)),
+but is otherwise unchanged. Integration registration, JS build pipeline, tsjs
+module serving, auction handler, cookie/synthetic ID logic are not changed.
Note: `HtmlWithPostProcessing` wraps `HtmlRewriterAdapter` and applies
post-processors on `is_last`. In the streaming path the post-processor list is
@@ -235,9 +236,12 @@ remains in place — no need to bypass it.
Clarification: `script_rewriters` (used by Next.js and GTM) are distinct from
`html_post_processors`. Script rewriters run inside `lol_html` element handlers
-during streaming — they do not require buffering and are unaffected by this
-change. The streaming gate checks only `html_post_processors().is_empty()`, not
-script rewriters. Currently only Next.js registers a post-processor.
+and currently require buffered mode because `lol_html` fragments text nodes
+across chunk boundaries (see [Phase 3](#text-node-fragmentation-phase-3)).
+`html_post_processors` require the full document for post-processing.
+The streaming gate checks `html_post_processors().is_empty()` for the
+post-processor path; `create_html_processor` separately gates the adapter mode
+on `script_rewriters`. Currently only Next.js registers a post-processor.
## Text Node Fragmentation (Phase 3)
From 94f238a337f22d0fb9b98170204c41815f79a154 Mon Sep 17 00:00:00 2001
From: Aram Grigoryan <132480+aram356@users.noreply.github.com>
Date: Tue, 31 Mar 2026 13:38:29 -0700
Subject: [PATCH 27/29] Address PR review feedback on streaming response spec
- Replace html_post_processors().is_empty() with has_html_post_processors()
to avoid allocating Vec> in the streaming gate check
- Add step to implement has_html_post_processors() on IntegrationRegistry
- Add EC implementation coordination note on handle_publisher_request
restructuring step
- Renumber Phase 2 Task 8 steps accordingly
---
.../plans/2026-03-25-streaming-response.md | 33 +++++++++++++++----
.../2026-03-25-streaming-response-design.md | 24 +++++++-------
2 files changed, 39 insertions(+), 18 deletions(-)
diff --git a/docs/superpowers/plans/2026-03-25-streaming-response.md b/docs/superpowers/plans/2026-03-25-streaming-response.md
index 268517b8..1c547565 100644
--- a/docs/superpowers/plans/2026-03-25-streaming-response.md
+++ b/docs/superpowers/plans/2026-03-25-streaming-response.md
@@ -821,7 +821,20 @@ In `main.rs`, make `finalize_response` callable from the publisher path.
Either make it `pub` and move to `trusted-server-core`, or pass a
pre-finalized response to the streaming path.
-- [ ] **Step 2: Add streaming gate check**
+- [ ] **Step 2: Add `has_html_post_processors()` to `IntegrationRegistry`**
+
+Add a method that returns `bool` to avoid the allocation that
+`html_post_processors()` incurs (cloning `Vec>`):
+
+```rust
+pub fn has_html_post_processors(&self) -> bool {
+ !self.inner.html_post_processors.is_empty()
+}
+```
+
+**File:** `crates/trusted-server-core/src/integrations/registry.rs`
+
+- [ ] **Step 3: Add streaming gate check**
Add a helper in `publisher.rs`:
@@ -834,19 +847,25 @@ fn should_stream(
if !(200..300).contains(&status) {
return false;
}
+ // Use has_html_post_processors() to avoid allocating a Vec>
+ // just to check emptiness.
// Only html_post_processors gate streaming — NOT script_rewriters.
// Script rewriters (Next.js, GTM) run inside lol_html element handlers
// during streaming and do not require full-document buffering.
// Currently only Next.js registers a post-processor.
let is_html = content_type.contains("text/html");
- if is_html && !integration_registry.html_post_processors().is_empty() {
+ if is_html && integration_registry.has_html_post_processors() {
return false;
}
true
}
```
-- [ ] **Step 3: Restructure `handle_publisher_request` to support streaming**
+- [ ] **Step 4: Restructure `handle_publisher_request` to support streaming**
+
+> **Note:** This step may need adjustment to align with the EC (Edge Compute)
+> implementation. Coordinate with the EC work before finalizing the
+> restructuring approach.
Split the function into:
1. Pre-processing: request info, cookies, synthetic ID, consent, backend
@@ -879,7 +898,7 @@ if should_stream {
}
```
-- [ ] **Step 4: Handle binary pass-through in streaming path**
+- [ ] **Step 5: Handle binary pass-through in streaming path**
For non-text content when streaming is enabled:
@@ -895,19 +914,19 @@ if !should_process {
}
```
-- [ ] **Step 5: Run all tests**
+- [ ] **Step 6: Run all tests**
Run: `cargo test --workspace`
Expected: All tests pass.
-- [ ] **Step 6: Build for WASM target**
+- [ ] **Step 7: Build for WASM target**
Run: `cargo build --package trusted-server-adapter-fastly --release --target wasm32-wasip1`
Expected: Builds successfully.
-- [ ] **Step 7: Commit**
+- [ ] **Step 8: Commit**
```
git add crates/trusted-server-core/src/publisher.rs \
diff --git a/docs/superpowers/specs/2026-03-25-streaming-response-design.md b/docs/superpowers/specs/2026-03-25-streaming-response-design.md
index 72716b73..f132136f 100644
--- a/docs/superpowers/specs/2026-03-25-streaming-response-design.md
+++ b/docs/superpowers/specs/2026-03-25-streaming-response-design.md
@@ -27,10 +27,12 @@ JS, auction, discovery).
Before committing to `stream_to_client()`, check:
1. Backend status is success (2xx).
-2. For HTML content: `html_post_processors()` is empty — no registered
- post-processors. Non-HTML content types (text/JSON, RSC Flight, binary) can
- always stream regardless of post-processor registration, since
- post-processors only apply to HTML.
+2. For HTML content: `has_html_post_processors()` returns false — no registered
+ post-processors. This method returns a `bool` directly, avoiding the
+ allocation of cloning the `Vec>` that
+ `html_post_processors()` performs. Non-HTML content types (text/JSON, RSC
+ Flight, binary) can always stream regardless of post-processor registration,
+ since post-processors only apply to HTML.
If either check fails for the given content type, fall back to the current
buffered path. This keeps the optimization transparent: same behavior for all
@@ -100,8 +102,8 @@ Change the publisher proxy path to use Fastly's `StreamingBody` API:
1. Fetch from origin, receive response headers.
2. Validate status — if backend error, return buffered error response via
`send_to_client()`.
-3. Check streaming gate — if `html_post_processors()` is non-empty, fall back
- to buffered path.
+3. Check streaming gate — if `has_html_post_processors()` returns true, fall
+ back to buffered path.
4. Finalize all response headers. This requires reordering two things:
- **Synthetic ID/cookie headers**: today set _after_ body processing in
`handle_publisher_request`. Since they are body-independent (computed from
@@ -186,7 +188,7 @@ No decompression, no processing. Body streams through as read.
### Buffered fallback path (error responses or post-processors present)
```
-Origin returns 4xx/5xx OR html_post_processors() is non-empty
+Origin returns 4xx/5xx OR has_html_post_processors() is true
→ Current buffered path unchanged
→ send_to_client() with proper status and full body
```
@@ -236,8 +238,8 @@ remains in place — no need to bypass it.
Clarification: `script_rewriters` (used by Next.js and GTM) are distinct from
`html_post_processors`. Script rewriters run inside `lol_html` element handlers
during streaming — they do not require buffering and are unaffected by this
-change. The streaming gate checks only `html_post_processors().is_empty()`, not
-script rewriters. Currently only Next.js registers a post-processor.
+change. The streaming gate checks only `has_html_post_processors()`, not script
+rewriters. Currently only Next.js registers a post-processor.
## Rollback Strategy
@@ -260,9 +262,9 @@ improvements.
### Integration tests (publisher.rs)
-- Streaming gate: when `html_post_processors()` is non-empty, response is
+- Streaming gate: when `has_html_post_processors()` is true, response is
buffered.
-- Streaming gate: when `html_post_processors()` is empty, response streams.
+- Streaming gate: when `has_html_post_processors()` is false, response streams.
- Backend error (4xx/5xx) returns buffered error response with correct status.
- Binary content passes through without processing.
From 1f2091dc8e43c69cd88b94a83017828227322d40 Mon Sep 17 00:00:00 2001
From: Aram Grigoryan <132480+aram356@users.noreply.github.com>
Date: Tue, 31 Mar 2026 13:39:27 -0700
Subject: [PATCH 28/29] Move EC coordination note to Phase 2 / Step 2 level
Both review comments apply to Phase 2 as a whole, not individual steps.
Move the EC implementation note to the Phase 2 header in the plan and
the Step 2 header in the spec.
---
docs/superpowers/plans/2026-03-25-streaming-response.md | 7 +++----
.../specs/2026-03-25-streaming-response-design.md | 3 +++
2 files changed, 6 insertions(+), 4 deletions(-)
diff --git a/docs/superpowers/plans/2026-03-25-streaming-response.md b/docs/superpowers/plans/2026-03-25-streaming-response.md
index 1c547565..b9545813 100644
--- a/docs/superpowers/plans/2026-03-25-streaming-response.md
+++ b/docs/superpowers/plans/2026-03-25-streaming-response.md
@@ -652,6 +652,9 @@ Expected: Builds successfully.
## Phase 2: Stream Response to Client
+> **Note:** Phase 2 may need adjustment to align with the EC (Edge Compute)
+> implementation. Coordinate with the EC work before finalizing the approach.
+
### Task 6: Migrate entry point from `#[fastly::main]` to raw `main()`
**Files:**
@@ -863,10 +866,6 @@ fn should_stream(
- [ ] **Step 4: Restructure `handle_publisher_request` to support streaming**
-> **Note:** This step may need adjustment to align with the EC (Edge Compute)
-> implementation. Coordinate with the EC work before finalizing the
-> restructuring approach.
-
Split the function into:
1. Pre-processing: request info, cookies, synthetic ID, consent, backend
request — everything before `response.take_body()`
diff --git a/docs/superpowers/specs/2026-03-25-streaming-response-design.md b/docs/superpowers/specs/2026-03-25-streaming-response-design.md
index f132136f..9465f87d 100644
--- a/docs/superpowers/specs/2026-03-25-streaming-response-design.md
+++ b/docs/superpowers/specs/2026-03-25-streaming-response-design.md
@@ -97,6 +97,9 @@ before or with Step 1B.
### Step 2: Stream response to client
+> **Note:** Step 2 may need adjustment to align with the EC (Edge Compute)
+> implementation. Coordinate with the EC work before finalizing the approach.
+
Change the publisher proxy path to use Fastly's `StreamingBody` API:
1. Fetch from origin, receive response headers.
From d00fc5db80636bdd3739a27acfb8bf3ebc51632f Mon Sep 17 00:00:00 2001
From: Aram Grigoryan <132480+aram356@users.noreply.github.com>
Date: Tue, 31 Mar 2026 13:43:18 -0700
Subject: [PATCH 29/29] Formatting
---
.../plans/2026-03-25-streaming-response.md | 21 +++++++---
.../2026-03-25-streaming-response-design.md | 38 ++++++++++---------
2 files changed, 36 insertions(+), 23 deletions(-)
diff --git a/docs/superpowers/plans/2026-03-25-streaming-response.md b/docs/superpowers/plans/2026-03-25-streaming-response.md
index b9545813..3311b28f 100644
--- a/docs/superpowers/plans/2026-03-25-streaming-response.md
+++ b/docs/superpowers/plans/2026-03-25-streaming-response.md
@@ -25,11 +25,11 @@ rewriting), `flate2` (gzip/deflate), `brotli` (brotli compression).
## File Map
-| File | Role | Phase |
-|------|------|-------|
-| `crates/trusted-server-core/src/streaming_processor.rs` | `HtmlRewriterAdapter` rewrite, compression path fixes, encoder finalization | 1 |
-| `crates/trusted-server-core/src/publisher.rs` | `process_response_streaming` refactor to `W: Write`, streaming gate, header reordering | 2 |
-| `crates/trusted-server-adapter-fastly/src/main.rs` | Entry point migration from `#[fastly::main]` to raw `main()`, response routing | 2 |
+| File | Role | Phase |
+| ------------------------------------------------------- | -------------------------------------------------------------------------------------- | ----- |
+| `crates/trusted-server-core/src/streaming_processor.rs` | `HtmlRewriterAdapter` rewrite, compression path fixes, encoder finalization | 1 |
+| `crates/trusted-server-core/src/publisher.rs` | `process_response_streaming` refactor to `W: Write`, streaming gate, header reordering | 2 |
+| `crates/trusted-server-adapter-fastly/src/main.rs` | Entry point migration from `#[fastly::main]` to raw `main()`, response routing | 2 |
---
@@ -42,6 +42,7 @@ This is the prerequisite for Task 2. The current code calls `flush()` then
moving gzip to this path.
**Files:**
+
- Modify: `crates/trusted-server-core/src/streaming_processor.rs:334-393`
- Test: `crates/trusted-server-core/src/streaming_processor.rs` (test module)
@@ -202,6 +203,7 @@ git commit -m "Fix encoder finalization: explicit finish instead of drop"
### Task 2: Convert `process_gzip_to_gzip` to chunk-based processing
**Files:**
+
- Modify: `crates/trusted-server-core/src/streaming_processor.rs:183-225`
- Test: `crates/trusted-server-core/src/streaming_processor.rs` (test module)
@@ -303,6 +305,7 @@ git commit -m "Convert process_gzip_to_gzip to chunk-based processing"
### Task 3: Convert `decompress_and_process` to chunk-based processing
**Files:**
+
- Modify: `crates/trusted-server-core/src/streaming_processor.rs:227-262`
- Test: `crates/trusted-server-core/src/streaming_processor.rs` (test module)
@@ -441,6 +444,7 @@ git commit -m "Convert decompress_and_process to chunk-based processing"
### Task 4: Rewrite `HtmlRewriterAdapter` for incremental streaming
**Files:**
+
- Modify: `crates/trusted-server-core/src/streaming_processor.rs:396-472`
- Test: `crates/trusted-server-core/src/streaming_processor.rs` (test module)
@@ -658,6 +662,7 @@ Expected: Builds successfully.
### Task 6: Migrate entry point from `#[fastly::main]` to raw `main()`
**Files:**
+
- Modify: `crates/trusted-server-adapter-fastly/src/main.rs:32-68`
- [ ] **Step 1: Rewrite `main` function**
@@ -737,6 +742,7 @@ git commit -m "Migrate entry point from #[fastly::main] to raw main()"
### Task 7: Refactor `process_response_streaming` to accept `W: Write`
**Files:**
+
- Modify: `crates/trusted-server-core/src/publisher.rs:97-180`
- [ ] **Step 1: Change signature to accept generic writer**
@@ -792,6 +798,7 @@ git commit -m "Refactor process_response_streaming to accept generic writer"
### Task 8: Add streaming path to publisher proxy
**Files:**
+
- Modify: `crates/trusted-server-core/src/publisher.rs`
- Modify: `crates/trusted-server-adapter-fastly/src/main.rs`
@@ -867,6 +874,7 @@ fn should_stream(
- [ ] **Step 4: Restructure `handle_publisher_request` to support streaming**
Split the function into:
+
1. Pre-processing: request info, cookies, synthetic ID, consent, backend
request — everything before `response.take_body()`
2. Header finalization: synthetic ID/cookie headers, `finalize_response()`
@@ -875,6 +883,7 @@ Split the function into:
(`StreamingBody`)
The streaming path in the fastly adapter:
+
```rust
// After header finalization, before body processing:
if should_stream {
@@ -964,6 +973,7 @@ Expected: Builds.
Run: `fastly compute serve`
Test:
+
- `curl -s http://localhost:7676/ | sha256sum` — compare with baseline
- `curl -sI http://localhost:7676/` — verify headers present (geo, version,
synthetic ID cookie if consent configured)
@@ -995,6 +1005,7 @@ Repeat the same measurements after building the feature branch.
Create a comparison table and save to PR description or a results file.
Check for:
+
- TTLB improvement (primary goal)
- No TTFB regression
- Identical response body hash (correctness)
diff --git a/docs/superpowers/specs/2026-03-25-streaming-response-design.md b/docs/superpowers/specs/2026-03-25-streaming-response-design.md
index 9465f87d..b0a8d9c2 100644
--- a/docs/superpowers/specs/2026-03-25-streaming-response-design.md
+++ b/docs/superpowers/specs/2026-03-25-streaming-response-design.md
@@ -223,11 +223,11 @@ headers are sent, we are committed.
## Files Changed
-| File | Change | Risk |
-| ------------------------------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------ |
-| `crates/trusted-server-core/src/streaming_processor.rs` | Rewrite `HtmlRewriterAdapter` to stream incrementally (becomes single-use); convert all compression paths to chunk-based processing (`process_gzip_to_gzip` and `decompress_and_process`); fix `process_through_compression` to call `finish()` explicitly | High |
-| `crates/trusted-server-core/src/publisher.rs` | Refactor `process_response_streaming` to accept `W: Write` instead of hardcoding `Vec`; split `handle_publisher_request` into streaming vs buffered paths; reorder synthetic ID/cookie logic before streaming | Medium |
-| `crates/trusted-server-adapter-fastly/src/main.rs` | Migrate from `#[fastly::main]` to undecorated `main()` with `Request::from_client()`; explicit error handling via `to_error_response().send_to_client()`; call `finalize_response()` before streaming | Medium |
+| File | Change | Risk |
+| ------------------------------------------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------ |
+| `crates/trusted-server-core/src/streaming_processor.rs` | Rewrite `HtmlRewriterAdapter` to stream incrementally (becomes single-use); convert all compression paths to chunk-based processing (`process_gzip_to_gzip` and `decompress_and_process`); fix `process_through_compression` to call `finish()` explicitly | High |
+| `crates/trusted-server-core/src/publisher.rs` | Refactor `process_response_streaming` to accept `W: Write` instead of hardcoding `Vec`; split `handle_publisher_request` into streaming vs buffered paths; reorder synthetic ID/cookie logic before streaming | Medium |
+| `crates/trusted-server-adapter-fastly/src/main.rs` | Migrate from `#[fastly::main]` to undecorated `main()` with `Request::from_client()`; explicit error handling via `to_error_response().send_to_client()`; call `finalize_response()` before streaming | Medium |
**Not changed**: `html_processor.rs` (builds lol_html `Settings` passed to
`HtmlRewriterAdapter`, works as-is), integration registration, JS build
@@ -309,14 +309,14 @@ branch, then compare.
Repeat the same steps on the feature branch. Compare:
-| Metric | Source | Expected change |
-|--------|--------|-----------------|
-| TTFB (document) | Network timing | Minimal change (gated by backend response time) |
-| Time to last byte | Network timing (`responseEnd`) | Reduced — body streams incrementally |
-| LCP | Lighthouse | Improved — browser receives `` resources sooner |
-| Speed Index | Lighthouse | Improved — progressive rendering starts earlier |
-| Transfer size | Network timing | Unchanged (same content, same compression) |
-| Response body hash | `evaluate_script` with hash | Identical — correctness check |
+| Metric | Source | Expected change |
+| ------------------ | ------------------------------ | ----------------------------------------------------- |
+| TTFB (document) | Network timing | Minimal change (gated by backend response time) |
+| Time to last byte | Network timing (`responseEnd`) | Reduced — body streams incrementally |
+| LCP | Lighthouse | Improved — browser receives `` resources sooner |
+| Speed Index | Lighthouse | Improved — progressive rendering starts earlier |
+| Transfer size | Network timing | Unchanged (same content, same compression) |
+| Response body hash | `evaluate_script` with hash | Identical — correctness check |
#### Automated comparison script
@@ -325,11 +325,13 @@ correctness verification:
```js
// Run via evaluate_script after page load
-const response = await fetch(location.href);
-const buffer = await response.arrayBuffer();
-const hash = await crypto.subtle.digest('SHA-256', buffer);
-const hex = [...new Uint8Array(hash)].map(b => b.toString(16).padStart(2, '0')).join('');
-hex; // compare this between baseline and feature branch
+const response = await fetch(location.href)
+const buffer = await response.arrayBuffer()
+const hash = await crypto.subtle.digest('SHA-256', buffer)
+const hex = [...new Uint8Array(hash)]
+ .map((b) => b.toString(16).padStart(2, '0'))
+ .join('')
+hex // compare this between baseline and feature branch
```
#### What to watch for