diff --git a/crates/trusted-server-core/src/html_processor.rs b/crates/trusted-server-core/src/html_processor.rs index 079681db..3b9e882f 100644 --- a/crates/trusted-server-core/src/html_processor.rs +++ b/crates/trusted-server-core/src/html_processor.rs @@ -455,7 +455,6 @@ pub fn create_html_processor(config: HtmlProcessorConfig) -> impl StreamProcesso }), ]; - let has_script_rewriters = !script_rewriters.is_empty(); for script_rewriter in script_rewriters { let selector = script_rewriter.selector(); let rewriter = script_rewriter.clone(); @@ -493,16 +492,7 @@ pub fn create_html_processor(config: HtmlProcessorConfig) -> impl StreamProcesso ..RewriterSettings::default() }; - // Use buffered mode when script rewriters are registered. lol_html fragments - // text nodes across input chunk boundaries, breaking rewriters that expect - // complete text (e.g., __NEXT_DATA__, GTM). Buffered mode feeds the entire - // document in one write() call, preserving text node integrity. - // Phase 3 will make rewriters fragment-safe, enabling streaming for all configs. - let inner = if has_script_rewriters { - HtmlRewriterAdapter::new_buffered(rewriter_settings) - } else { - HtmlRewriterAdapter::new(rewriter_settings) - }; + let inner = HtmlRewriterAdapter::new(rewriter_settings); HtmlWithPostProcessing { inner, diff --git a/crates/trusted-server-core/src/integrations/google_tag_manager.rs b/crates/trusted-server-core/src/integrations/google_tag_manager.rs index 64dc6cd5..79ede2c0 100644 --- a/crates/trusted-server-core/src/integrations/google_tag_manager.rs +++ b/crates/trusted-server-core/src/integrations/google_tag_manager.rs @@ -12,7 +12,7 @@ //! | `GET/POST` | `.../collect` | Proxies GA analytics beacons | //! | `GET/POST` | `.../g/collect` | Proxies GA4 analytics beacons | -use std::sync::{Arc, LazyLock}; +use std::sync::{Arc, LazyLock, Mutex}; use async_trait::async_trait; use error_stack::{Report, ResultExt}; @@ -132,11 +132,22 @@ fn validate_container_id(container_id: &str) -> Result<(), validator::Validation pub struct GoogleTagManagerIntegration { config: GoogleTagManagerConfig, + /// Accumulates text fragments when `lol_html` splits a text node across + /// chunk boundaries. Drained on `is_last_in_text_node`. + /// + /// Uses `Mutex` to satisfy the `Sync` bound on `IntegrationScriptRewriter`. + /// The pipeline is single-threaded (`lol_html::HtmlRewriter` is `!Send`), + /// so the lock is uncontended. `lol_html` delivers text chunks sequentially + /// per element — the buffer is always empty when a new element's text begins. + accumulated_text: Mutex, } impl GoogleTagManagerIntegration { fn new(config: GoogleTagManagerConfig) -> Arc { - Arc::new(Self { config }) + Arc::new(Self { + config, + accumulated_text: Mutex::new(String::new()), + }) } fn error(message: impl Into) -> TrustedServerError { @@ -488,14 +499,40 @@ impl IntegrationScriptRewriter for GoogleTagManagerIntegration { "script" // Match all scripts to find inline GTM snippets } - fn rewrite(&self, content: &str, _ctx: &IntegrationScriptContext<'_>) -> ScriptRewriteAction { + fn rewrite(&self, content: &str, ctx: &IntegrationScriptContext<'_>) -> ScriptRewriteAction { + let mut buf = self + .accumulated_text + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + + if !ctx.is_last_in_text_node { + // Intermediate fragment — accumulate and suppress output. + buf.push_str(content); + return ScriptRewriteAction::RemoveNode; + } + + // Last fragment. If we accumulated prior fragments, combine them. + let full_content: Option = if buf.is_empty() { + None + } else { + buf.push_str(content); + Some(std::mem::take(&mut *buf)) + }; + let text = full_content.as_deref().unwrap_or(content); + // Look for the GTM snippet pattern. // Standard snippet contains: "googletagmanager.com/gtm.js" // Note: analytics.google.com is intentionally excluded — gtag.js stores // that domain as a bare string and constructs URLs dynamically, so // rewriting it in scripts produces broken URLs. - if content.contains("googletagmanager.com") || content.contains("google-analytics.com") { - return ScriptRewriteAction::replace(Self::rewrite_gtm_urls(content)); + if text.contains("googletagmanager.com") || text.contains("google-analytics.com") { + return ScriptRewriteAction::replace(Self::rewrite_gtm_urls(text)); + } + + // No GTM content — if we accumulated fragments, emit them unchanged. + // Intermediate fragments were already suppressed via RemoveNode. + if full_content.is_some() { + return ScriptRewriteAction::replace(text.to_string()); } ScriptRewriteAction::keep() @@ -1632,4 +1669,224 @@ container_id = "GTM-DEFAULT" other => panic!("Expected Integration error, got {:?}", other), } } + + #[test] + fn fragmented_gtm_snippet_is_accumulated_and_rewritten() { + let config = GoogleTagManagerConfig { + enabled: true, + container_id: "GTM-FRAG1".to_string(), + upstream_url: "https://www.googletagmanager.com".to_string(), + cache_max_age: default_cache_max_age(), + max_beacon_body_size: default_max_beacon_body_size(), + }; + let integration = GoogleTagManagerIntegration::new(config); + + let document_state = IntegrationDocumentState::default(); + + // Simulate lol_html splitting the GTM snippet mid-domain. + let fragment1 = r#"(function(w,d,s,l,i){j.src='https://www.google"#; + let fragment2 = r#"tagmanager.com/gtm.js?id='+i;f.parentNode.insertBefore(j,f);})(window,document,'script','dataLayer','GTM-FRAG1');"#; + + let ctx_intermediate = IntegrationScriptContext { + selector: "script", + request_host: "publisher.example.com", + request_scheme: "https", + origin_host: "origin.example.com", + is_last_in_text_node: false, + document_state: &document_state, + }; + let ctx_last = IntegrationScriptContext { + is_last_in_text_node: true, + ..ctx_intermediate + }; + + // Intermediate fragment: should be suppressed. + let action1 = + IntegrationScriptRewriter::rewrite(&*integration, fragment1, &ctx_intermediate); + assert_eq!( + action1, + ScriptRewriteAction::RemoveNode, + "should suppress intermediate fragment" + ); + + // Last fragment: should emit full rewritten content. + let action2 = IntegrationScriptRewriter::rewrite(&*integration, fragment2, &ctx_last); + match action2 { + ScriptRewriteAction::Replace(rewritten) => { + assert!( + rewritten.contains("/integrations/google_tag_manager/gtm.js"), + "should rewrite GTM URL. Got: {rewritten}" + ); + assert!( + !rewritten.contains("googletagmanager.com"), + "should not contain original GTM domain. Got: {rewritten}" + ); + } + other => panic!("expected Replace for fragmented GTM, got {other:?}"), + } + } + + #[test] + fn non_gtm_fragmented_script_is_passed_through() { + let config = GoogleTagManagerConfig { + enabled: true, + container_id: "GTM-PASS1".to_string(), + upstream_url: "https://www.googletagmanager.com".to_string(), + cache_max_age: default_cache_max_age(), + max_beacon_body_size: default_max_beacon_body_size(), + }; + let integration = GoogleTagManagerIntegration::new(config); + + let document_state = IntegrationDocumentState::default(); + + let fragment1 = "console.log('hel"; + let fragment2 = "lo world');"; + + let ctx_intermediate = IntegrationScriptContext { + selector: "script", + request_host: "publisher.example.com", + request_scheme: "https", + origin_host: "origin.example.com", + is_last_in_text_node: false, + document_state: &document_state, + }; + let ctx_last = IntegrationScriptContext { + is_last_in_text_node: true, + ..ctx_intermediate + }; + + let action1 = + IntegrationScriptRewriter::rewrite(&*integration, fragment1, &ctx_intermediate); + assert_eq!( + action1, + ScriptRewriteAction::RemoveNode, + "should suppress intermediate" + ); + + // Last fragment: should emit full unchanged content since it's not GTM. + let action2 = IntegrationScriptRewriter::rewrite(&*integration, fragment2, &ctx_last); + match action2 { + ScriptRewriteAction::Replace(content) => { + assert_eq!( + content, "console.log('hello world');", + "should emit full accumulated non-GTM content" + ); + } + other => panic!("expected Replace with passthrough, got {other:?}"), + } + } + + /// Verify the accumulation buffer drains correctly between two consecutive + /// `"#; + + let mut output = Vec::new(); + pipeline + .process(Cursor::new(html_input.as_bytes()), &mut output) + .expect("should process with small chunks"); + let processed = String::from_utf8_lossy(&output); + + assert!( + processed.contains("/integrations/google_tag_manager/gtm.js"), + "should rewrite fragmented GTM URL. Got: {processed}" + ); + assert!( + !processed.contains("googletagmanager.com"), + "should not contain original GTM domain. Got: {processed}" + ); + } } diff --git a/crates/trusted-server-core/src/integrations/nextjs/mod.rs b/crates/trusted-server-core/src/integrations/nextjs/mod.rs index 50244438..6524ee58 100644 --- a/crates/trusted-server-core/src/integrations/nextjs/mod.rs +++ b/crates/trusted-server-core/src/integrations/nextjs/mod.rs @@ -599,4 +599,55 @@ mod tests { final_html ); } + + /// Regression test: with a small chunk size, `lol_html` fragments the + /// `__NEXT_DATA__` text node across chunks. The rewriter must accumulate + /// fragments and produce correct output. + #[test] + fn small_chunk_next_data_rewrite_survives_fragmentation() { + // Build a __NEXT_DATA__ payload large enough to cross a 32-byte chunk boundary. + let html = r#""#; + + let mut settings = create_test_settings(); + settings + .integrations + .insert_config( + "nextjs", + &json!({ + "enabled": true, + "rewrite_attributes": ["href", "link", "url"], + }), + ) + .expect("should update nextjs config"); + let registry = IntegrationRegistry::new(&settings).expect("should create registry"); + let config = config_from_settings(&settings, ®istry); + let processor = create_html_processor(config); + + // Use a very small chunk size to force fragmentation. + let pipeline_config = PipelineConfig { + input_compression: Compression::None, + output_compression: Compression::None, + chunk_size: 32, + }; + let mut pipeline = StreamingPipeline::new(pipeline_config, processor); + + let mut output = Vec::new(); + pipeline + .process(Cursor::new(html.as_bytes()), &mut output) + .expect("should process with small chunks"); + + let processed = String::from_utf8_lossy(&output); + assert!( + processed.contains("test.example.com") && processed.contains("/reviews"), + "should rewrite fragmented __NEXT_DATA__ href. Got: {processed}" + ); + assert!( + !processed.contains("origin.example.com/reviews"), + "should not contain original origin href. Got: {processed}" + ); + assert!( + processed.contains("Hello World"), + "should preserve non-URL content. Got: {processed}" + ); + } } diff --git a/crates/trusted-server-core/src/integrations/nextjs/rsc_placeholders.rs b/crates/trusted-server-core/src/integrations/nextjs/rsc_placeholders.rs index 1aa0b391..10101a70 100644 --- a/crates/trusted-server-core/src/integrations/nextjs/rsc_placeholders.rs +++ b/crates/trusted-server-core/src/integrations/nextjs/rsc_placeholders.rs @@ -54,12 +54,13 @@ impl IntegrationScriptRewriter for NextJsRscPlaceholderRewriter { return ScriptRewriteAction::keep(); } - // Only process complete (unfragmented) scripts during streaming. - // Fragmented scripts are handled by the post-processor which re-parses the final HTML. - // This avoids corrupting non-RSC scripts that happen to be fragmented during streaming. + // Deliberately does not accumulate fragments (unlike NextJsNextDataRewriter + // and GoogleTagManagerIntegration which use Mutex buffers). RSC + // placeholder processing has a post-processor fallback that re-parses + // the final HTML at end-of-document, so fragmented scripts are safely + // deferred. Accumulation here would also risk corrupting non-RSC scripts + // that happen to be fragmented during streaming. if !ctx.is_last_in_text_node { - // Script is fragmented - skip placeholder processing. - // The post-processor will handle RSC scripts at end-of-document. return ScriptRewriteAction::keep(); } diff --git a/crates/trusted-server-core/src/integrations/nextjs/script_rewriter.rs b/crates/trusted-server-core/src/integrations/nextjs/script_rewriter.rs index 72617c3e..eaf00a16 100644 --- a/crates/trusted-server-core/src/integrations/nextjs/script_rewriter.rs +++ b/crates/trusted-server-core/src/integrations/nextjs/script_rewriter.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use error_stack::Report; use regex::{escape, Regex}; @@ -14,6 +14,14 @@ use super::{NextJsIntegrationConfig, NEXTJS_INTEGRATION_ID}; pub(super) struct NextJsNextDataRewriter { config: Arc, rewriter: UrlRewriter, + /// Accumulates text fragments when `lol_html` splits a text node across + /// chunk boundaries. Drained on `is_last_in_text_node`. + /// + /// Uses `Mutex` to satisfy the `Sync` bound on `IntegrationScriptRewriter`. + /// The pipeline is single-threaded (`lol_html::HtmlRewriter` is `!Send`), + /// so the lock is uncontended. `lol_html` delivers text chunks sequentially + /// per element — the buffer is always empty when a new element's text begins. + accumulated_text: Mutex, } impl NextJsNextDataRewriter { @@ -23,6 +31,7 @@ impl NextJsNextDataRewriter { Ok(Self { rewriter: UrlRewriter::new(&config.rewrite_attributes)?, config, + accumulated_text: Mutex::new(String::new()), }) } @@ -65,7 +74,33 @@ impl IntegrationScriptRewriter for NextJsNextDataRewriter { return ScriptRewriteAction::keep(); } - self.rewrite_structured(content, ctx) + let mut buf = self + .accumulated_text + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + + if !ctx.is_last_in_text_node { + // Intermediate fragment — accumulate and suppress output. + buf.push_str(content); + return ScriptRewriteAction::RemoveNode; + } + + // Last fragment. If nothing was accumulated, process directly. + if buf.is_empty() { + return self.rewrite_structured(content, ctx); + } + + // Complete the accumulated text and process the full content. + // If rewrite_structured returns Keep, we must still emit the full + // accumulated text via Replace — intermediate fragments were already + // removed from lol_html's output via RemoveNode. + buf.push_str(content); + let full_content = std::mem::take(&mut *buf); + let action = self.rewrite_structured(&full_content, ctx); + if matches!(action, ScriptRewriteAction::Keep) { + return ScriptRewriteAction::replace(full_content); + } + action } } @@ -464,4 +499,119 @@ mod tests { assert!(rewritten.contains("https://proxy.example.com/news")); assert!(rewritten.contains("//proxy.example.com/assets/logo.png")); } + + #[test] + fn fragmented_next_data_is_accumulated_and_rewritten() { + let rewriter = NextJsNextDataRewriter::new(test_config()).expect("should build rewriter"); + let document_state = IntegrationDocumentState::default(); + + let fragment1 = r#"{"props":{"pageProps":{"href":"https://origin."#; + let fragment2 = r#"example.com/reviews"}}}"#; + + let ctx_intermediate = IntegrationScriptContext { + selector: "script#__NEXT_DATA__", + request_host: "ts.example.com", + request_scheme: "https", + origin_host: "origin.example.com", + is_last_in_text_node: false, + document_state: &document_state, + }; + let ctx_last = IntegrationScriptContext { + is_last_in_text_node: true, + ..ctx_intermediate + }; + + let action1 = rewriter.rewrite(fragment1, &ctx_intermediate); + assert_eq!( + action1, + ScriptRewriteAction::RemoveNode, + "should suppress intermediate fragment" + ); + + let action2 = rewriter.rewrite(fragment2, &ctx_last); + match action2 { + ScriptRewriteAction::Replace(rewritten) => { + assert!( + rewritten.contains("ts.example.com"), + "should rewrite origin to proxy host. Got: {rewritten}" + ); + assert!( + rewritten.contains("/reviews"), + "should preserve path. Got: {rewritten}" + ); + assert!( + !rewritten.contains("origin.example.com"), + "should not contain original host. Got: {rewritten}" + ); + } + other => panic!("expected Replace, got {other:?}"), + } + } + + #[test] + fn unfragmented_next_data_works_without_accumulation() { + let rewriter = NextJsNextDataRewriter::new(test_config()).expect("should build rewriter"); + let document_state = IntegrationDocumentState::default(); + let payload = r#"{"props":{"pageProps":{"href":"https://origin.example.com/page"}}}"#; + + let ctx_single = IntegrationScriptContext { + selector: "script#__NEXT_DATA__", + request_host: "ts.example.com", + request_scheme: "https", + origin_host: "origin.example.com", + is_last_in_text_node: true, + document_state: &document_state, + }; + + let action = rewriter.rewrite(payload, &ctx_single); + match action { + ScriptRewriteAction::Replace(rewritten) => { + assert!( + rewritten.contains("ts.example.com"), + "should rewrite. Got: {rewritten}" + ); + } + other => panic!("expected Replace, got {other:?}"), + } + } + + #[test] + fn fragmented_next_data_without_rewritable_urls_preserves_content() { + let rewriter = NextJsNextDataRewriter::new(test_config()).expect("should build rewriter"); + let document_state = IntegrationDocumentState::default(); + + // __NEXT_DATA__ JSON with no origin URLs — rewrite_structured returns Keep. + let fragment1 = r#"{"props":{"pageProps":{"title":"Hello"#; + let fragment2 = r#" World","count":42}}}"#; + + let ctx_intermediate = IntegrationScriptContext { + selector: "script#__NEXT_DATA__", + request_host: "ts.example.com", + request_scheme: "https", + origin_host: "origin.example.com", + is_last_in_text_node: false, + document_state: &document_state, + }; + let ctx_last = IntegrationScriptContext { + is_last_in_text_node: true, + ..ctx_intermediate + }; + + let action1 = rewriter.rewrite(fragment1, &ctx_intermediate); + assert_eq!(action1, ScriptRewriteAction::RemoveNode); + + // Last fragment: even though no URLs to rewrite, must emit full content + // because intermediate fragments were removed. + let action2 = rewriter.rewrite(fragment2, &ctx_last); + match action2 { + ScriptRewriteAction::Replace(content) => { + let expected = format!("{fragment1}{fragment2}"); + assert_eq!( + content, expected, + "should emit full accumulated content unchanged" + ); + } + other => panic!("expected Replace with passthrough, got {other:?}"), + } + } } diff --git a/crates/trusted-server-core/src/publisher.rs b/crates/trusted-server-core/src/publisher.rs index 459c0c35..2c3b2099 100644 --- a/crates/trusted-server-core/src/publisher.rs +++ b/crates/trusted-server-core/src/publisher.rs @@ -686,11 +686,52 @@ mod tests { } } - // Note: test_streaming_compressed_content removed as it directly tested private function - // process_response_streaming. The functionality is tested through handle_publisher_request. + #[test] + fn streaming_gate_allows_2xx_html_without_post_processors() { + let is_html = true; + let has_post_processors = false; + let encoding_supported = is_supported_content_encoding("gzip"); + assert!( + encoding_supported && (!is_html || !has_post_processors), + "should stream 2xx HTML without post-processors" + ); + } + + #[test] + fn streaming_gate_blocks_html_with_post_processors() { + let is_html = true; + let has_post_processors = true; + let encoding_supported = is_supported_content_encoding("gzip"); + let can_stream = encoding_supported && (!is_html || !has_post_processors); + assert!( + !can_stream, + "should not stream HTML when post-processors are registered" + ); + } - // Note: test_streaming_brotli_content removed as it directly tested private function - // process_response_streaming. The functionality is tested through handle_publisher_request. + #[test] + fn streaming_gate_allows_non_html_with_post_processors() { + let is_html = false; + let has_post_processors = true; + let encoding_supported = is_supported_content_encoding("gzip"); + let can_stream = encoding_supported && (!is_html || !has_post_processors); + assert!( + can_stream, + "should stream non-HTML even with post-processors (they only apply to HTML)" + ); + } + + #[test] + fn streaming_gate_blocks_unsupported_encoding() { + let is_html = false; + let has_post_processors = false; + let encoding_supported = is_supported_content_encoding("zstd"); + let can_stream = encoding_supported && (!is_html || !has_post_processors); + assert!( + !can_stream, + "should not stream when content-encoding is unsupported" + ); + } #[test] fn test_content_encoding_detection() { @@ -940,4 +981,56 @@ mod tests { "should reject unknown module names" ); } + + #[test] + fn stream_publisher_body_preserves_gzip_round_trip() { + use flate2::write::GzEncoder; + use std::io::Write; + + let settings = create_test_settings(); + let registry = + IntegrationRegistry::new(&settings).expect("should create integration registry"); + + // Compress CSS containing an origin URL that should be rewritten. + // CSS uses the text URL replacer (not lol_html), so inline URLs are rewritten. + let html = b"body { background: url('https://origin.example.com/page'); }"; + let mut compressed = Vec::new(); + { + let mut encoder = GzEncoder::new(&mut compressed, flate2::Compression::default()); + encoder.write_all(html).expect("should compress"); + encoder.finish().expect("should finish compression"); + } + + let body = Body::from(compressed); + let params = OwnedProcessResponseParams { + content_encoding: "gzip".to_string(), + origin_host: "origin.example.com".to_string(), + origin_url: "https://origin.example.com".to_string(), + request_host: "proxy.example.com".to_string(), + request_scheme: "https".to_string(), + content_type: "text/css".to_string(), + }; + + let mut output = Vec::new(); + stream_publisher_body(body, &mut output, ¶ms, &settings, ®istry) + .expect("should process gzip CSS"); + + // Decompress output + use flate2::read::GzDecoder; + use std::io::Read; + let mut decoder = GzDecoder::new(&output[..]); + let mut decompressed = String::new(); + decoder + .read_to_string(&mut decompressed) + .expect("should decompress output"); + + assert!( + decompressed.contains("proxy.example.com"), + "should rewrite origin to proxy. Got: {decompressed}" + ); + assert!( + !decompressed.contains("origin.example.com"), + "should not contain original host. Got: {decompressed}" + ); + } } diff --git a/crates/trusted-server-core/src/streaming_processor.rs b/crates/trusted-server-core/src/streaming_processor.rs index 5a4ea290..20665d7a 100644 --- a/crates/trusted-server-core/src/streaming_processor.rs +++ b/crates/trusted-server-core/src/streaming_processor.rs @@ -275,33 +275,19 @@ impl lol_html::OutputSink for RcVecSink { /// Adapter to use `lol_html` [`HtmlRewriter`](lol_html::HtmlRewriter) as a [`StreamProcessor`]. /// -/// Operates in one of two modes: -/// -/// - **Streaming** ([`new`](Self::new)): output is emitted incrementally on every -/// [`process_chunk`](StreamProcessor::process_chunk) call. Use when no script -/// rewriters are registered. -/// - **Buffered** ([`new_buffered`](Self::new_buffered)): input is accumulated and -/// processed in a single `write()` call on `is_last`. Use when script rewriters -/// are registered, because `lol_html` fragments text nodes across chunk boundaries -/// and rewriters that expect complete text content would silently miss rewrites on -/// split fragments. (See Phase 3 plan for making rewriters fragment-safe.) +/// Output is emitted incrementally on every [`process_chunk`](StreamProcessor::process_chunk) +/// call. Script rewriters that receive text from `lol_html` must be fragment-safe — +/// they accumulate text fragments internally until `is_last_in_text_node` is true. /// /// The adapter is single-use: one adapter per request. Calling [`StreamProcessor::reset`] /// is a no-op because the rewriter consumes its settings on construction. pub struct HtmlRewriterAdapter { rewriter: Option>, output: Rc>>, - /// When true, input is accumulated and fed to `lol_html` in one pass on `is_last`. - buffered: bool, - /// Accumulated input for the buffered path. - accumulated_input: Vec, } impl HtmlRewriterAdapter { /// Create a new HTML rewriter adapter that streams output per chunk. - /// - /// Use [`Self::new_buffered`] when script rewriters are registered to - /// avoid text node fragmentation. #[must_use] pub fn new(settings: lol_html::Settings<'static, 'static>) -> Self { let output = Rc::new(RefCell::new(Vec::new())); @@ -310,75 +296,28 @@ impl HtmlRewriterAdapter { Self { rewriter: Some(rewriter), output, - buffered: false, - accumulated_input: Vec::new(), - } - } - - /// Create a new HTML rewriter adapter that buffers all input before processing. - /// - /// This avoids `lol_html` text node fragmentation that breaks script rewriters - /// expecting complete text content. The entire document is fed to the rewriter - /// in a single `write()` call when `is_last` is true. - #[must_use] - pub fn new_buffered(settings: lol_html::Settings<'static, 'static>) -> Self { - let output = Rc::new(RefCell::new(Vec::new())); - let sink = RcVecSink(Rc::clone(&output)); - let rewriter = lol_html::HtmlRewriter::new(settings, sink); - Self { - rewriter: Some(rewriter), - output, - buffered: true, - accumulated_input: Vec::new(), } } } impl StreamProcessor for HtmlRewriterAdapter { fn process_chunk(&mut self, chunk: &[u8], is_last: bool) -> Result, io::Error> { - if self.buffered { - // Buffered mode: accumulate input, process all at once on is_last. - if !chunk.is_empty() { - if self.rewriter.is_none() { - log::warn!( - "HtmlRewriterAdapter: {} bytes received after finalization, data will be lost", - chunk.len() - ); - } else { - self.accumulated_input.extend_from_slice(chunk); - } - } - if !is_last { - return Ok(Vec::new()); - } - if let Some(rewriter) = &mut self.rewriter { - if !self.accumulated_input.is_empty() { - let input = std::mem::take(&mut self.accumulated_input); - rewriter.write(&input).map_err(|e| { - log::error!("Failed to process HTML: {e}"); + match &mut self.rewriter { + Some(rewriter) => { + if !chunk.is_empty() { + rewriter.write(chunk).map_err(|e| { + log::error!("Failed to process HTML chunk: {e}"); io::Error::other(format!("HTML processing failed: {e}")) })?; } } - } else { - // Streaming mode: feed chunks to `lol_html` incrementally. - match &mut self.rewriter { - Some(rewriter) => { - if !chunk.is_empty() { - rewriter.write(chunk).map_err(|e| { - log::error!("Failed to process HTML chunk: {e}"); - io::Error::other(format!("HTML processing failed: {e}")) - })?; - } - } - None if !chunk.is_empty() => { - log::warn!( - "HtmlRewriterAdapter: {} bytes received after finalization, data will be lost", - chunk.len() - ); - } - None => {} + None if !chunk.is_empty() => { + log::warn!( + "HtmlRewriterAdapter: {} bytes received after finalization, data will be lost", + chunk.len() + ); } + None => {} } if is_last { @@ -417,10 +356,8 @@ mod tests { use crate::streaming_replacer::{Replacement, StreamingReplacer}; /// Verify that `lol_html` fragments text nodes when input chunks split - /// mid-text-node. This is critical: if `lol_html` does fragment, then - /// script rewriters (`NextJS` `__NEXT_DATA__`, `GTM`) that expect full - /// text content will silently miss rewrites when the streaming adapter - /// feeds chunks incrementally. + /// mid-text-node. Script rewriters must be fragment-safe — they accumulate + /// text fragments internally until `is_last_in_text_node` is true. #[test] fn lol_html_fragments_text_across_chunk_boundaries() { use std::cell::RefCell; @@ -469,57 +406,6 @@ mod tests { ); } - /// Companion to [`lol_html_fragments_text_across_chunk_boundaries`]: - /// proves that `new_buffered()` prevents fragmentation by feeding the - /// entire document to `lol_html` in one `write()` call. - #[test] - fn buffered_adapter_prevents_text_fragmentation() { - use std::cell::RefCell; - use std::rc::Rc; - - let fragments: Rc>> = Rc::new(RefCell::new(Vec::new())); - let fragments_clone = Rc::clone(&fragments); - - let settings = lol_html::Settings { - element_content_handlers: vec![lol_html::text!("script", move |text| { - fragments_clone - .borrow_mut() - .push((text.as_str().to_string(), text.last_in_text_node())); - Ok(()) - })], - ..lol_html::Settings::default() - }; - - let mut adapter = HtmlRewriterAdapter::new_buffered(settings); - - // Feed the same split chunks as the fragmentation test - let r1 = adapter - .process_chunk(b"", true) - .expect("should process chunk2"); - assert!( - !r2.is_empty(), - "buffered adapter should emit output on is_last" - ); - - let frags = fragments.borrow(); - // With buffered mode, the text handler should see the complete string - assert!( - frags - .iter() - .any(|(text, _)| text.contains("googletagmanager.com")), - "buffered adapter should deliver complete text to handler, got: {:?}", - *frags - ); - } - #[test] fn test_uncompressed_pipeline() { let replacer = StreamingReplacer::new(vec![Replacement { diff --git a/docs/superpowers/plans/2026-03-25-streaming-response.md b/docs/superpowers/plans/2026-03-25-streaming-response.md index 8515ba32..39f9914a 100644 --- a/docs/superpowers/plans/2026-03-25-streaming-response.md +++ b/docs/superpowers/plans/2026-03-25-streaming-response.md @@ -1018,3 +1018,103 @@ Check for: - No TTFB regression - Identical response body hash (correctness) - LCP/Speed Index improvement (secondary) + +--- + +## Phase 3: Make Script Rewriters Fragment-Safe (PR #591) + +> **Implementation note (2026-03-27):** All tasks completed. Script rewriters +> accumulate text fragments via `Mutex` until `last_in_text_node` is +> true. Buffered mode removed from `HtmlRewriterAdapter`. 2xx streaming gate +> added. Small-chunk (32 byte) pipeline regression tests added for both +> NextJS `__NEXT_DATA__` and GTM inline scripts. + +### Task 11: Make `NextJsNextDataRewriter` fragment-safe + +**Files:** `crates/trusted-server-core/src/integrations/nextjs/script_rewriter.rs` + +- [x] Add `accumulated_text: Mutex` field +- [x] Accumulate intermediate fragments, return `RemoveNode` +- [x] On last fragment, process full accumulated text +- [x] Handle Keep-after-accumulation (emit `Replace(full_content)`) +- [x] Add regression tests + +### Task 12: Make `GoogleTagManagerIntegration` rewrite fragment-safe + +**Files:** `crates/trusted-server-core/src/integrations/google_tag_manager.rs` + +- [x] Add `accumulated_text: Mutex` field +- [x] Accumulate intermediate fragments, return `RemoveNode` +- [x] On last fragment, match and rewrite on complete text +- [x] Non-GTM accumulated scripts emitted unchanged via `Replace` +- [x] Add regression tests + +### Task 13: Remove buffered mode from `HtmlRewriterAdapter` + +**Files:** `crates/trusted-server-core/src/streaming_processor.rs` + +- [x] Delete `new_buffered()`, `buffered` flag, `accumulated_input` +- [x] Simplify `process_chunk` to streaming-only path +- [x] Remove `buffered_adapter_prevents_text_fragmentation` test +- [x] Update doc comments + +### Task 14: Always use streaming adapter in `create_html_processor` + +**Files:** `crates/trusted-server-core/src/html_processor.rs` + +- [x] Remove `has_script_rewriters` check +- [x] Always call `HtmlRewriterAdapter::new(settings)` + +### Task 15: Full verification, regression tests, and performance measurement + +- [x] Add 2xx streaming gate (`response.get_status().is_success()`) +- [x] Add streaming gate unit tests (5 tests) +- [x] Add `stream_publisher_body` gzip round-trip test +- [x] Add small-chunk (32 byte) pipeline tests for NextJS and GTM +- [x] `cargo test --workspace` — 766 passed +- [x] `cargo clippy` — clean +- [x] `cargo fmt --check` — clean +- [x] WASM release build — success +- [x] Staging performance comparison (see results below) + +### Performance Results (getpurpose.ai, median over 5 runs, Chrome 1440x900) + +| Metric | Production (v135, buffered) | Staging (v136, streaming) | Delta | +| -------------------------- | --------------------------- | ------------------------- | ------------------ | +| **TTFB** | 54 ms | 35 ms | **-19 ms (-35%)** | +| **First Paint** | 186 ms | 160 ms | -26 ms (-14%) | +| **First Contentful Paint** | 186 ms | 160 ms | -26 ms (-14%) | +| **DOM Content Loaded** | 286 ms | 282 ms | -4 ms (~same) | +| **DOM Complete** | 1060 ms | 663 ms | **-397 ms (-37%)** | + +--- + +## Phase 4: Stream Binary Pass-Through Responses + +Non-processable content (images, fonts, video, `application/octet-stream`) +currently passes through `handle_publisher_request` unchanged via the +`Buffered` path. This buffers the entire response body in memory — wasteful +for large binaries that need no processing. Phase 4 adds a `PassThrough` +variant that streams the body directly via `io::copy` into `StreamingBody`. + +### Task 16: Stream binary pass-through responses via `io::copy` + +**Files:** + +- `crates/trusted-server-core/src/publisher.rs` +- `crates/trusted-server-adapter-fastly/src/main.rs` + +- [ ] Add `PublisherResponse::PassThrough { response, body }` variant +- [ ] Return `PassThrough` when `!should_process` and backend returned 2xx +- [ ] Handle in `main.rs`: `stream_to_client()` + `io::copy(body, &mut streaming_body)` +- [ ] Keep `Buffered` for non-2xx responses and `request_host.is_empty()` +- [ ] Preserve `Content-Length` for pass-through (body is unmodified) + +### Task 17: Binary pass-through tests and verification + +- [ ] Publisher-level test: image content type returns `PassThrough` +- [ ] Publisher-level test: 4xx image stays `Buffered` +- [ ] `cargo test --workspace` +- [ ] `cargo clippy` + `cargo fmt --check` +- [ ] WASM release build +- [ ] Staging performance comparison (DOM Complete for image-heavy pages) diff --git a/docs/superpowers/specs/2026-03-25-streaming-response-design.md b/docs/superpowers/specs/2026-03-25-streaming-response-design.md index e92f0514..414c4954 100644 --- a/docs/superpowers/specs/2026-03-25-streaming-response-design.md +++ b/docs/superpowers/specs/2026-03-25-streaming-response-design.md @@ -240,12 +240,11 @@ remains in place — no need to bypass it. Clarification: `script_rewriters` (used by Next.js and GTM) are distinct from `html_post_processors`. Script rewriters run inside `lol_html` element handlers -and currently require buffered mode because `lol_html` fragments text nodes -across chunk boundaries (see [Phase 3](#text-node-fragmentation-phase-3)). -`html_post_processors` require the full document for post-processing. -The streaming gate checks `has_html_post_processors()` for the -post-processor path; `create_html_processor` separately gates the adapter mode -on `script_rewriters`. Currently only Next.js registers a post-processor. +during streaming and are now fragment-safe (resolved in +[Phase 3](#text-node-fragmentation-phase-3)). `html_post_processors` require +the full document for post-processing. The streaming gate checks +`has_html_post_processors()` for the post-processor path. Currently only +Next.js registers a post-processor. ## Text Node Fragmentation (Phase 3) @@ -254,14 +253,16 @@ HTML incrementally. Script rewriters (`NextJsNextDataRewriter`, `GoogleTagManagerIntegration`) expect complete text content — if a domain string is split across chunks, the rewrite silently fails. -**Phase 1 workaround**: `HtmlRewriterAdapter` has two modes. `new()` streams -per chunk (no script rewriters). `new_buffered()` accumulates input and -processes in one `write()` call (script rewriters registered). -`create_html_processor` selects the mode automatically. +**Resolved in Phase 3**: Each script rewriter is now fragment-safe. They +accumulate text fragments internally via `Mutex` until +`is_last_in_text_node` is true, then process the complete text. Intermediate +fragments return `RemoveNode` (suppressed from output); the final fragment +emits the full rewritten content via `Replace`. If no rewrite is needed, +the full accumulated content is still emitted via `Replace` (since +intermediate fragments were already removed from the output). -**Phase 3** will make each script rewriter fragment-safe by accumulating text -fragments internally via `is_last_in_text_node`. This removes the buffered -fallback and enables streaming for all configurations. See #584. +The `HtmlRewriterAdapter` buffered mode (`new_buffered()`) has been removed. +`create_html_processor` always uses the streaming adapter. ## Rollback Strategy @@ -371,3 +372,54 @@ hex // compare this between baseline and feature branch - Compare against Viceroy results to account for real network conditions. - Monitor WASM heap usage via Fastly dashboard. - Verify no regressions on static endpoints or auction. + +### Results (getpurpose.ai, median over 5 runs, Chrome 1440x900) + +Measured via Chrome DevTools Protocol against prod (v135, buffered) and +staging (v136, streaming). Chrome `--host-resolver-rules` used to route +`getpurpose.ai` to the staging Fastly edge (167.82.83.52). + +| Metric | Production (v135, buffered) | Staging (v136, streaming) | Delta | +| -------------------------- | --------------------------- | ------------------------- | ------------------ | +| **TTFB** | 54 ms | 35 ms | **-19 ms (-35%)** | +| **First Paint** | 186 ms | 160 ms | -26 ms (-14%) | +| **First Contentful Paint** | 186 ms | 160 ms | -26 ms (-14%) | +| **DOM Content Loaded** | 286 ms | 282 ms | -4 ms (~same) | +| **DOM Complete** | 1060 ms | 663 ms | **-397 ms (-37%)** | + +## Phase 4: Binary Pass-Through Streaming + +Non-processable content (images, fonts, video, `application/octet-stream`) +currently passes through `handle_publisher_request` unchanged via the +`Buffered` path, buffering the entire body in memory before sending. For +large binaries (1-10 MB images), this is wasteful. + +Phase 4 adds a `PublisherResponse::PassThrough` variant that signals the +adapter to stream the body directly via `io::copy` into `StreamingBody` +with no processing pipeline. This eliminates peak memory for binary +responses and improves DOM Complete for image-heavy pages. + +### Streaming gate (updated) + +``` +is_success (2xx) +├── should_process && (!is_html || !has_post_processors) → Stream (pipeline) +├── should_process && is_html && has_post_processors → Buffered (post-processors) +└── !should_process → PassThrough (io::copy) + +!is_success +└── any content type → Buffered (error page) +``` + +### `PublisherResponse` enum (updated) + +```rust +pub enum PublisherResponse { + Buffered(Response), + Stream { response, body, params }, + PassThrough { response, body }, +} +``` + +`Content-Length` is preserved for `PassThrough` since the body is +unmodified — no need for chunked transfer encoding.