diff --git a/crates/trusted-server-adapter-fastly/src/main.rs b/crates/trusted-server-adapter-fastly/src/main.rs index 72cae33e..6b7309fe 100644 --- a/crates/trusted-server-adapter-fastly/src/main.rs +++ b/crates/trusted-server-adapter-fastly/src/main.rs @@ -1,6 +1,6 @@ use error_stack::Report; use fastly::http::Method; -use fastly::{Error, Request, Response}; +use fastly::{Request, Response}; use log_fastly::Logger; use trusted_server_core::auction::endpoints::handle_auction; @@ -19,7 +19,9 @@ use trusted_server_core::proxy::{ handle_first_party_click, handle_first_party_proxy, handle_first_party_proxy_rebuild, handle_first_party_proxy_sign, }; -use trusted_server_core::publisher::{handle_publisher_request, handle_tsjs_dynamic}; +use trusted_server_core::publisher::{ + handle_publisher_request, handle_tsjs_dynamic, stream_publisher_body, PublisherResponse, +}; use trusted_server_core::request_signing::{ handle_deactivate_key, handle_rotate_key, handle_trusted_server_discovery, handle_verify_signature, @@ -35,31 +37,44 @@ mod route_tests; use crate::error::to_error_response; use crate::platform::{build_runtime_services, open_kv_store, UnavailableKvStore}; -#[fastly::main] -fn main(req: Request) -> Result { +/// Entry point for the Fastly Compute program. +/// +/// Uses an undecorated `main()` with `Request::from_client()` instead of +/// `#[fastly::main]` so we can call `stream_to_client()` or `send_to_client()` +/// explicitly. `#[fastly::main]` is syntactic sugar that auto-calls +/// `send_to_client()` on the returned `Response`, which is incompatible with +/// streaming. +fn main() { init_logger(); + let req = Request::from_client(); + // Keep the health probe independent from settings loading and routing so // readiness checks still get a cheap liveness response during startup. if req.get_method() == Method::GET && req.get_path() == "/health" { - return Ok(Response::from_status(200).with_body_text_plain("ok")); + Response::from_status(200) + .with_body_text_plain("ok") + .send_to_client(); + return; } let settings = match get_settings() { Ok(s) => s, Err(e) => { log::error!("Failed to load settings: {:?}", e); - return Ok(to_error_response(&e)); + to_error_response(&e).send_to_client(); + return; } }; log::debug!("Settings {settings:?}"); // Build the auction orchestrator once at startup let orchestrator = match build_orchestrator(&settings) { - Ok(orchestrator) => orchestrator, + Ok(o) => o, Err(e) => { log::error!("Failed to build auction orchestrator: {:?}", e); - return Ok(to_error_response(&e)); + to_error_response(&e).send_to_client(); + return; } }; @@ -67,7 +82,8 @@ fn main(req: Request) -> Result { Ok(r) => r, Err(e) => { log::error!("Failed to create integration registry: {:?}", e); - return Ok(to_error_response(&e)); + to_error_response(&e).send_to_client(); + return; } }; @@ -78,13 +94,17 @@ fn main(req: Request) -> Result { as std::sync::Arc; let runtime_services = build_runtime_services(&req, kv_store); - futures::executor::block_on(route_request( + // route_request may send the response directly (streaming path) or + // return it for us to send (buffered path). + if let Some(response) = futures::executor::block_on(route_request( &settings, &orchestrator, &integration_registry, &runtime_services, req, - )) + )) { + response.send_to_client(); + } } async fn route_request( @@ -93,7 +113,7 @@ async fn route_request( integration_registry: &IntegrationRegistry, runtime_services: &RuntimeServices, mut req: Request, -) -> Result { +) -> Option { // Strip client-spoofable forwarded headers at the edge. // On Fastly this service IS the first proxy — these headers from // clients are untrusted and can hijack URL rewriting (see #409). @@ -115,14 +135,14 @@ async fn route_request( match enforce_basic_auth(settings, &req) { Ok(Some(mut response)) => { finalize_response(settings, geo_info.as_ref(), &mut response); - return Ok(response); + return Some(response); } Ok(None) => {} Err(e) => { log::error!("Failed to evaluate basic auth: {:?}", e); let mut response = to_error_response(&e); finalize_response(settings, geo_info.as_ref(), &mut response); - return Ok(response); + return Some(response); } } @@ -193,7 +213,32 @@ async fn route_request( &publisher_services, req, ) { - Ok(response) => Ok(response), + Ok(PublisherResponse::Stream { + mut response, + body, + params, + }) => { + // Streaming path: finalize headers, then stream body to client. + finalize_response(settings, geo_info.as_ref(), &mut response); + let mut streaming_body = response.stream_to_client(); + if let Err(e) = stream_publisher_body( + body, + &mut streaming_body, + ¶ms, + settings, + integration_registry, + ) { + // Headers already committed. Log and abort — client + // sees a truncated response. Standard proxy behavior. + log::error!("Streaming processing failed: {e:?}"); + drop(streaming_body); + } else if let Err(e) = streaming_body.finish() { + log::error!("Failed to finish streaming body: {e}"); + } + // Response already sent via stream_to_client() + return None; + } + Ok(PublisherResponse::Buffered(response)) => Ok(response), Err(e) => { log::error!("Failed to proxy to publisher origin: {:?}", e); Err(e) @@ -210,7 +255,7 @@ async fn route_request( finalize_response(settings, geo_info.as_ref(), &mut response); - Ok(response) + Some(response) } fn runtime_services_for_consent_route( diff --git a/crates/trusted-server-core/src/integrations/registry.rs b/crates/trusted-server-core/src/integrations/registry.rs index 1e03937a..a0e27d7f 100644 --- a/crates/trusted-server-core/src/integrations/registry.rs +++ b/crates/trusted-server-core/src/integrations/registry.rs @@ -730,6 +730,15 @@ impl IntegrationRegistry { self.inner.script_rewriters.clone() } + /// Check whether any HTML post-processors are registered. + /// + /// Cheaper than [`html_post_processors()`](Self::html_post_processors) when + /// only the presence check is needed — avoids cloning `Vec>`. + #[must_use] + pub fn has_html_post_processors(&self) -> bool { + !self.inner.html_post_processors.is_empty() + } + /// Expose registered HTML post-processors. #[must_use] pub fn html_post_processors(&self) -> Vec> { diff --git a/crates/trusted-server-core/src/publisher.rs b/crates/trusted-server-core/src/publisher.rs index a642c60f..459c0c35 100644 --- a/crates/trusted-server-core/src/publisher.rs +++ b/crates/trusted-server-core/src/publisher.rs @@ -1,3 +1,5 @@ +use std::io::Write; + use error_stack::{Report, ResultExt}; use fastly::http::{header, StatusCode}; use fastly::{Body, Request, Response}; @@ -169,12 +171,21 @@ struct ProcessResponseParams<'a> { integration_registry: &'a IntegrationRegistry, } -/// Process response body in streaming fashion with compression preservation -fn process_response_streaming( +/// Process response body through the streaming pipeline. +/// +/// Selects the appropriate processor based on content type (HTML rewriter, +/// RSC Flight rewriter, or URL replacer) and pipes chunks from `body` +/// through it into `output`. The caller decides what `output` is — a +/// `Vec` for buffered responses, or a `StreamingBody` for streaming. +/// +/// # Errors +/// +/// Returns an error if processor creation or chunk processing fails. +fn process_response_streaming( body: Body, + output: &mut W, params: &ProcessResponseParams, -) -> Result> { - // Check if this is HTML content +) -> Result<(), Report> { let is_html = params.content_type.contains("text/html"); let is_rsc_flight = params.content_type.contains("text/x-component"); log::debug!( @@ -186,15 +197,14 @@ fn process_response_streaming( params.origin_host ); - // Determine compression type let compression = Compression::from_content_encoding(params.content_encoding); + let config = PipelineConfig { + input_compression: compression, + output_compression: compression, + chunk_size: 8192, + }; - // Create output body to collect results - let mut output = Vec::new(); - - // Choose processor based on content type if is_html { - // Use HTML rewriter for HTML content let processor = create_html_stream_processor( params.origin_host, params.request_host, @@ -202,57 +212,26 @@ fn process_response_streaming( params.settings, params.integration_registry, )?; - - let config = PipelineConfig { - input_compression: compression, - output_compression: compression, - chunk_size: 8192, - }; - - let mut pipeline = StreamingPipeline::new(config, processor); - pipeline.process(body, &mut output)?; + StreamingPipeline::new(config, processor).process(body, output)?; } else if is_rsc_flight { - // RSC Flight responses are length-prefixed (T rows). A naive string replacement will - // corrupt the stream by changing byte lengths without updating the prefixes. let processor = RscFlightUrlRewriter::new( params.origin_host, params.origin_url, params.request_host, params.request_scheme, ); - - let config = PipelineConfig { - input_compression: compression, - output_compression: compression, - chunk_size: 8192, - }; - - let mut pipeline = StreamingPipeline::new(config, processor); - pipeline.process(body, &mut output)?; + StreamingPipeline::new(config, processor).process(body, output)?; } else { - // Use simple text replacer for non-HTML content let replacer = create_url_replacer( params.origin_host, params.origin_url, params.request_host, params.request_scheme, ); - - let config = PipelineConfig { - input_compression: compression, - output_compression: compression, - chunk_size: 8192, - }; - - let mut pipeline = StreamingPipeline::new(config, replacer); - pipeline.process(body, &mut output)?; + StreamingPipeline::new(config, replacer).process(body, output)?; } - log::debug!( - "Streaming processing complete - output size: {} bytes", - output.len() - ); - Ok(Body::from(output)) + Ok(()) } /// Create a unified HTML stream processor @@ -276,29 +255,89 @@ fn create_html_stream_processor( Ok(create_html_processor(config)) } +/// Result of publisher request handling, indicating whether the response +/// body should be streamed or has already been buffered. +pub enum PublisherResponse { + /// Response is fully buffered and ready to send via `send_to_client()`. + Buffered(Response), + /// Response headers are ready. The caller must: + /// 1. Call `finalize_response()` on the response + /// 2. Call `response.stream_to_client()` to get a `StreamingBody` + /// 3. Call `stream_publisher_body()` with the body and streaming writer + /// 4. Call `StreamingBody::finish()` + Stream { + /// Response with all headers set (EC ID, cookies, etc.) + /// but body not yet written. `Content-Length` already removed. + response: Response, + /// Origin body to be piped through the streaming pipeline. + body: Body, + /// Parameters for `process_response_streaming`. + params: OwnedProcessResponseParams, + }, +} + +/// Owned version of [`ProcessResponseParams`] for returning from +/// `handle_publisher_request` without lifetime issues. +pub struct OwnedProcessResponseParams { + pub(crate) content_encoding: String, + pub(crate) origin_host: String, + pub(crate) origin_url: String, + pub(crate) request_host: String, + pub(crate) request_scheme: String, + pub(crate) content_type: String, +} + +/// Stream the publisher response body through the processing pipeline. +/// +/// Called by the adapter after `stream_to_client()` has committed the +/// response headers. Writes processed chunks directly to `output`. +/// +/// # Errors +/// +/// Returns an error if processing fails mid-stream. Since headers are +/// already committed, the caller should log the error and drop the +/// `StreamingBody` (client sees a truncated response). +pub fn stream_publisher_body( + body: Body, + output: &mut W, + params: &OwnedProcessResponseParams, + settings: &Settings, + integration_registry: &IntegrationRegistry, +) -> Result<(), Report> { + let borrowed = ProcessResponseParams { + content_encoding: ¶ms.content_encoding, + origin_host: ¶ms.origin_host, + origin_url: ¶ms.origin_url, + request_host: ¶ms.request_host, + request_scheme: ¶ms.request_scheme, + settings, + content_type: ¶ms.content_type, + integration_registry, + }; + process_response_streaming(body, output, &borrowed) +} + /// Proxies requests to the publisher's origin server. /// -/// This function forwards incoming requests to the configured origin URL, -/// preserving headers and request body. It's used as a fallback for routes -/// not explicitly handled by the trusted server. +/// Returns a [`PublisherResponse`] indicating whether the response can be +/// streamed or must be sent buffered. The streaming path is chosen when: +/// - The backend returns a 2xx status +/// - The response has a processable content type +/// - The response uses a supported `Content-Encoding` (gzip, deflate, br) +/// - No HTML post-processors are registered (the streaming gate) /// /// # Errors /// -/// Returns a [`TrustedServerError`] if: -/// - The proxy request fails -/// - The origin backend is unreachable +/// Returns a [`TrustedServerError`] if the proxy request fails or the +/// origin backend is unreachable. pub fn handle_publisher_request( settings: &Settings, integration_registry: &IntegrationRegistry, services: &RuntimeServices, mut req: Request, -) -> Result> { +) -> Result> { log::debug!("Proxying request to publisher_origin"); - // Prebid.js requests are not intercepted here anymore. The HTML processor removes - // publisher-supplied Prebid scripts; the unified TSJS bundle includes Prebid.js when enabled. - - // Extract request host and scheme (uses Host header and TLS detection after edge sanitization) let request_info = RequestInfo::from_request(&req); let request_host = &request_info.host; let request_scheme = &request_info.scheme; @@ -348,7 +387,7 @@ pub fn handle_publisher_request( .map(|_| services.kv_store()), }); let ec_allowed = allows_ec_creation(&consent_context); - log::trace!("Proxy EC ID: {}, ec_allowed: {}", ec_id, ec_allowed); + log::debug!("Proxy ec_allowed: {}", ec_allowed); let backend_name = BackendConfig::from_url( &settings.publisher.origin_url, @@ -371,99 +410,158 @@ pub fn handle_publisher_request( message: "Failed to proxy request to origin".to_string(), })?; - // Log all response headers for debugging log::debug!("Response headers:"); for (name, value) in response.get_headers() { log::debug!(" {}: {:?}", name, value); } - // Check if the response has a text-based content type that we should process + // Set EC ID / cookie headers BEFORE body processing. + // These are body-independent (computed from request cookies + consent). + apply_ec_headers( + settings, + services, + &mut response, + &ec_id, + ec_allowed, + existing_ec_cookie.as_deref(), + &consent_context, + ); + let content_type = response .get_header(header::CONTENT_TYPE) .map(|h| h.to_str().unwrap_or_default()) .unwrap_or_default() .to_string(); - let should_process = content_type.contains("text/") - || content_type.contains("application/javascript") - || content_type.contains("application/json"); + let should_process = is_processable_content_type(&content_type); + let is_success = response.get_status().is_success(); - if should_process && !request_host.is_empty() { - // Check if the response is compressed - let content_encoding = response - .get_header(header::CONTENT_ENCODING) - .map(|h| h.to_str().unwrap_or_default()) - .unwrap_or_default() - .to_lowercase(); + if !should_process || request_host.is_empty() || !is_success { + log::debug!( + "Skipping response processing - should_process: {}, request_host: '{}', status: {}", + should_process, + request_host, + response.get_status(), + ); + return Ok(PublisherResponse::Buffered(response)); + } - // Log response details for debugging + let content_encoding = response + .get_header(header::CONTENT_ENCODING) + .map(|h| h.to_str().unwrap_or_default()) + .unwrap_or_default() + .to_lowercase(); + + // Streaming gate: can we stream this response? + // - 2xx status (non-success already returned Buffered above) + // - Supported Content-Encoding (unsupported would fail mid-stream) + // - No HTML post-processors registered (they need the full document) + // - Non-HTML content always streams (post-processors only apply to HTML) + let is_html = content_type.contains("text/html"); + let has_post_processors = integration_registry.has_html_post_processors(); + let encoding_supported = is_supported_content_encoding(&content_encoding); + let can_stream = encoding_supported && (!is_html || !has_post_processors); + + if can_stream { log::debug!( - "Processing response - Content-Type: {}, Content-Encoding: {}, Request Host: {}, Origin Host: {}", + "Streaming response - Content-Type: {}, Content-Encoding: {}, Request Host: {}, Origin Host: {}", content_type, content_encoding, request_host, origin_host ); - // Take the response body for streaming processing let body = response.take_body(); + response.remove_header(header::CONTENT_LENGTH); + + return Ok(PublisherResponse::Stream { + response, + body, + params: OwnedProcessResponseParams { + content_encoding, + origin_host, + origin_url: settings.publisher.origin_url.clone(), + request_host: request_host.to_string(), + request_scheme: request_scheme.to_string(), + content_type, + }, + }); + } - // Process the body using streaming approach - let params = ProcessResponseParams { - content_encoding: &content_encoding, - origin_host: &origin_host, - origin_url: &settings.publisher.origin_url, - request_host, - request_scheme, - settings, - content_type: &content_type, - integration_registry, - }; - match process_response_streaming(body, ¶ms) { - Ok(processed_body) => { - // Set the processed body back - response.set_body(processed_body); + // Buffered fallback: post-processors need the full document. + log::debug!( + "Buffered response - Content-Type: {}, Content-Encoding: {}, Request Host: {}, Origin Host: {}", + content_type, content_encoding, request_host, origin_host + ); - // Remove Content-Length as the size has likely changed - response.remove_header(header::CONTENT_LENGTH); + let body = response.take_body(); + let params = ProcessResponseParams { + content_encoding: &content_encoding, + origin_host: &origin_host, + origin_url: &settings.publisher.origin_url, + request_host, + request_scheme, + settings, + content_type: &content_type, + integration_registry, + }; + let mut output = Vec::new(); + process_response_streaming(body, &mut output, ¶ms)?; - // Keep Content-Encoding header since we're returning compressed content - log::debug!( - "Preserved Content-Encoding: {} for compressed response", - content_encoding - ); + response.set_header(header::CONTENT_LENGTH, output.len().to_string()); + response.set_body(Body::from(output)); - log::debug!("Completed streaming processing of response body"); - } - Err(e) => { - log::error!("Failed to process response body: {:?}", e); - // Return an error response - return Err(e); - } - } - } else { - log::debug!( - "Skipping response processing - should_process: {}, request_host: '{}'", - should_process, - request_host - ); - } + Ok(PublisherResponse::Buffered(response)) +} - // Consent-gated EC creation: - // - Consent given → set EC ID header + cookie. - // - Consent absent + existing cookie → revoke (expire cookie + delete KV entry). - // - Consent absent + no cookie → do nothing. +/// Whether the content type requires processing (URL rewriting, HTML injection). +/// +/// Text-based and JavaScript/JSON responses are processable; binary types +/// (images, fonts, video, etc.) pass through unchanged. +fn is_processable_content_type(content_type: &str) -> bool { + content_type.contains("text/") + || content_type.contains("application/javascript") + || content_type.contains("application/json") +} + +/// Whether the `Content-Encoding` is one the streaming pipeline can handle. +/// +/// Unsupported encodings (e.g. `zstd` from a misbehaving origin) must fall +/// back to buffered mode so a processing failure produces a proper error +/// response instead of a truncated stream. +fn is_supported_content_encoding(encoding: &str) -> bool { + matches!(encoding, "" | "identity" | "gzip" | "deflate" | "br") +} + +/// Apply EC ID and cookie headers to the response. +/// +/// Extracted so headers can be set before streaming begins (headers must +/// be finalized before `stream_to_client()` commits them). +/// +/// Consent-gated EC creation: +/// - Consent given → set EC ID header + cookie. +/// - Consent absent + existing cookie → revoke (expire cookie + delete KV entry). +/// - Consent absent + no cookie → do nothing. +fn apply_ec_headers( + settings: &Settings, + services: &RuntimeServices, + response: &mut Response, + ec_id: &str, + ec_allowed: bool, + existing_ec_cookie: Option<&str>, + consent_context: &crate::consent::ConsentContext, +) { if ec_allowed { // Fastly's HeaderValue API rejects \r, \n, and \0, so the EC ID // cannot inject additional response headers. - response.set_header(HEADER_X_TS_EC, ec_id.as_str()); + response.set_header(HEADER_X_TS_EC, ec_id); // Cookie persistence is skipped if the EC ID contains RFC 6265-illegal // characters. The header is still emitted when consent allows it. - set_ec_cookie(settings, &mut response, ec_id.as_str()); - } else if let Some(cookie_ec_id) = existing_ec_cookie.as_deref() { + set_ec_cookie(settings, response, ec_id); + } else if let Some(cookie_ec_id) = existing_ec_cookie { log::info!( "EC revoked for '{}': consent withdrawn (jurisdiction={})", cookie_ec_id, consent_context.jurisdiction, ); - expire_ec_cookie(settings, &mut response); + expire_ec_cookie(settings, response); if settings.consent.consent_store.is_some() { crate::consent::kv::delete_consent_from_kv(services.kv_store(), cookie_ec_id); } @@ -473,8 +571,6 @@ pub fn handle_publisher_request( consent_context.jurisdiction, ); } - - Ok(response) } #[cfg(test)] @@ -502,21 +598,43 @@ mod tests { ("application/octet-stream", false), ]; - for (content_type, should_process) in test_cases { - let result = content_type.contains("text/html") - || content_type.contains("text/css") - || content_type.contains("text/javascript") - || content_type.contains("application/javascript") - || content_type.contains("application/json"); - + for (content_type, expected) in test_cases { assert_eq!( - result, should_process, - "Content-Type '{}' should_process: expected {}, got {}", - content_type, should_process, result + is_processable_content_type(content_type), + expected, + "Content-Type '{content_type}' should_process: expected {expected}", ); } } + #[test] + fn supported_content_encoding_accepts_known_values() { + assert!(is_supported_content_encoding(""), "should accept empty"); + assert!( + is_supported_content_encoding("identity"), + "should accept identity" + ); + assert!(is_supported_content_encoding("gzip"), "should accept gzip"); + assert!( + is_supported_content_encoding("deflate"), + "should accept deflate" + ); + assert!(is_supported_content_encoding("br"), "should accept br"); + } + + #[test] + fn supported_content_encoding_rejects_unknown_values() { + assert!(!is_supported_content_encoding("zstd"), "should reject zstd"); + assert!( + !is_supported_content_encoding("compress"), + "should reject compress" + ); + assert!( + !is_supported_content_encoding("snappy"), + "should reject snappy" + ); + } + #[test] fn test_publisher_origin_host_extraction() { let settings = create_test_settings(); diff --git a/docs/superpowers/specs/2026-03-25-streaming-response-design.md b/docs/superpowers/specs/2026-03-25-streaming-response-design.md index 364da0f4..e92f0514 100644 --- a/docs/superpowers/specs/2026-03-25-streaming-response-design.md +++ b/docs/superpowers/specs/2026-03-25-streaming-response-design.md @@ -243,7 +243,7 @@ Clarification: `script_rewriters` (used by Next.js and GTM) are distinct from and currently require buffered mode because `lol_html` fragments text nodes across chunk boundaries (see [Phase 3](#text-node-fragmentation-phase-3)). `html_post_processors` require the full document for post-processing. -The streaming gate checks `html_post_processors().is_empty()` for the +The streaming gate checks `has_html_post_processors()` for the post-processor path; `create_html_processor` separately gates the adapter mode on `script_rewriters`. Currently only Next.js registers a post-processor.