diff --git a/crates/trusted-server-adapter-fastly/src/main.rs b/crates/trusted-server-adapter-fastly/src/main.rs index 58d163da..2ae86ff5 100644 --- a/crates/trusted-server-adapter-fastly/src/main.rs +++ b/crates/trusted-server-adapter-fastly/src/main.rs @@ -180,20 +180,13 @@ async fn route_request( handle_first_party_proxy_rebuild(settings, runtime_services, req).await } (m, path) if integration_registry.has_route(&m, path) => integration_registry - .handle_proxy( - &m, - path, - settings, - runtime_services, - compat::to_fastly_request(req), - ) + .handle_proxy(&m, path, settings, runtime_services, req) .await .unwrap_or_else(|| { Err(Report::new(TrustedServerError::BadRequest { message: format!("Unknown integration route: {path}"), })) - }) - .map(compat::from_fastly_response), + }), // No known route matched, proxy to publisher origin as fallback _ => { diff --git a/crates/trusted-server-core/src/auction/README.md b/crates/trusted-server-core/src/auction/README.md index 15bee0ca..4c265288 100644 --- a/crates/trusted-server-core/src/auction/README.md +++ b/crates/trusted-server-core/src/auction/README.md @@ -24,7 +24,8 @@ The auction orchestration system allows you to: ▼ ┌─────────────────────────────────────────────────────────┐ │ AuctionProvider Trait │ -│ - request_bids() │ +│ - request_bids() async │ +│ - parse_response() │ │ - provider_name() │ │ - timeout_ms() │ │ - is_enabled() │ @@ -54,7 +55,7 @@ When a request arrives at the `/auction` endpoint, it goes through the following ┌──────────────────────────────────────────────────────────────────────┐ │ 2. Route Matching (crates/trusted-server-adapter-fastly/src/main.rs:84) │ │ - Pattern: (Method::POST, "/auction") │ -│ - Handler: handle_auction(settings, &orchestrator, &storage, req)│ +│ - Handler: handle_auction(settings, &orchestrator, runtime_services, req)│ └──────────────────────────────────────────────────────────────────────┘ │ ▼ @@ -496,6 +497,7 @@ timeout_ms = 500 use async_trait::async_trait; use crate::auction::provider::AuctionProvider; use crate::auction::types::{AuctionContext, AuctionRequest, AuctionResponse}; +use crate::platform::{PlatformPendingRequest, PlatformResponse}; pub struct YourAuctionProvider { config: YourConfig, @@ -511,11 +513,19 @@ impl AuctionProvider for YourAuctionProvider { &self, request: &AuctionRequest, _context: &AuctionContext<'_>, - ) -> Result> { + ) -> Result> { // 1. Transform AuctionRequest to your provider's format - // 2. Make HTTP request to your provider - // 3. Parse response - // 4. Return AuctionResponse with bids + // 2. Launch HTTP request through services.http_client().send_async(...) + // 3. Return PlatformPendingRequest for the orchestrator to await + todo!() + } + + fn parse_response( + &self, + response: PlatformResponse, + response_time_ms: u64, + ) -> Result> { + // 4. Parse PlatformResponse into AuctionResponse todo!() } @@ -551,7 +561,7 @@ let orchestrator = AuctionOrchestrator::new(config); orchestrator.register_provider(Arc::new(PrebidAuctionProvider::new(prebid_config))); orchestrator.register_provider(Arc::new(ApsAuctionProvider::new(aps_config))); -let result = orchestrator.run_auction(&request, &context).await?; +let result = orchestrator.run_auction(&request, &context, &services).await?; // Check results assert_eq!(result.winning_bids.len(), 2); diff --git a/crates/trusted-server-core/src/auction/endpoints.rs b/crates/trusted-server-core/src/auction/endpoints.rs index 2b0c3171..6fd6d1d3 100644 --- a/crates/trusted-server-core/src/auction/endpoints.rs +++ b/crates/trusted-server-core/src/auction/endpoints.rs @@ -5,7 +5,6 @@ use error_stack::{Report, ResultExt}; use http::{Request, Response}; use crate::auction::formats::AdRequest; -use crate::compat; use crate::consent; use crate::cookies::handle_request_cookies; use crate::error::TrustedServerError; @@ -87,13 +86,11 @@ pub async fn handle_auction( geo, )?; - let fastly_req = compat::to_fastly_request_ref(&http_req); - // Create auction context let context = AuctionContext { settings, - request: &fastly_req, - client_info: &services.client_info, + request: &http_req, + client_info: services.client_info(), timeout_ms: settings.auction.timeout_ms, provider_responses: None, services, diff --git a/crates/trusted-server-core/src/auction/orchestrator.rs b/crates/trusted-server-core/src/auction/orchestrator.rs index 077a8763..b66ef9b8 100644 --- a/crates/trusted-server-core/src/auction/orchestrator.rs +++ b/crates/trusted-server-core/src/auction/orchestrator.rs @@ -12,32 +12,6 @@ use super::config::AuctionConfig; use super::provider::AuctionProvider; use super::types::{AuctionContext, AuctionRequest, AuctionResponse, Bid, BidStatus}; -// # PR 15 removal target -// -// Mirrors compat::to_fastly_response — both should stay in sync until PR 15 -// removes the compat layer entirely. -fn platform_response_to_fastly( - platform_resp: crate::platform::PlatformResponse, -) -> fastly::Response { - let (parts, body) = platform_resp.response.into_parts(); - debug_assert!( - matches!(&body, edgezero_core::body::Body::Once(_)), - "unexpected Body::Stream in platform response conversion: body will be empty" - ); - let body_bytes = match body { - edgezero_core::body::Body::Once(bytes) => bytes.to_vec(), - edgezero_core::body::Body::Stream(_) => { - log::warn!("streaming platform response body; body will be empty"); - vec![] - } - }; - let mut resp = fastly::Response::from_status(parts.status.as_u16()); - for (name, value) in parts.headers.iter() { - resp.append_header(name.as_str(), value.as_bytes()); - } - resp.set_body(body_bytes); - resp -} /// Compute the remaining time budget from a deadline. /// @@ -180,13 +154,14 @@ impl AuctionOrchestrator { let start_time = Instant::now(); let pending = mediator .request_bids(request, &mediator_context) + .await .change_context(TrustedServerError::Auction { message: format!("Mediator {} failed to launch", mediator.provider_name()), })?; let select_result = services .http_client() - .select(vec![PlatformPendingRequest::new(pending)]) + .select(vec![pending]) .await .change_context(TrustedServerError::Auction { message: format!("Mediator {} request failed", mediator.provider_name()), @@ -197,11 +172,10 @@ impl AuctionOrchestrator { .change_context(TrustedServerError::Auction { message: format!("Mediator {} request failed", mediator.provider_name()), })?; - let backend_response = platform_response_to_fastly(platform_resp); let response_time_ms = start_time.elapsed().as_millis() as u64; let mediator_resp = mediator - .parse_response(backend_response, response_time_ms) + .parse_response(platform_resp, response_time_ms) .change_context(TrustedServerError::Auction { message: format!("Mediator {} parse failed", mediator.provider_name()), })?; @@ -269,7 +243,7 @@ impl AuctionOrchestrator { /// Run all providers in parallel and collect responses. /// - /// Uses `fastly::http::request::select()` to process responses as they + /// Uses `services.http_client().select(...)` to process responses as they /// become ready, rather than waiting for each response sequentially. async fn run_providers_parallel( &self, @@ -363,14 +337,17 @@ impl AuctionOrchestrator { ); let start_time = Instant::now(); - match provider.request_bids(request, &provider_context) { + match provider.request_bids(request, &provider_context).await { Ok(pending) => { + let request_backend_name = pending + .backend_name() + .map(str::to_string) + .unwrap_or_else(|| backend_name.clone()); backend_to_provider.insert( - backend_name.clone(), + request_backend_name.clone(), (provider.provider_name(), start_time, provider.as_ref()), ); - pending_requests - .push(PlatformPendingRequest::new(pending).with_backend_name(backend_name)); + pending_requests.push(pending); log::debug!( "Request to '{}' launched successfully", provider.provider_name() @@ -418,14 +395,13 @@ impl AuctionOrchestrator { Ok(platform_response) => { // Identify the provider from the backend name let backend_name = platform_response.backend_name.clone().unwrap_or_default(); - let response = platform_response_to_fastly(platform_response); if let Some((provider_name, start_time, provider)) = backend_to_provider.remove(&backend_name) { let response_time_ms = start_time.elapsed().as_millis() as u64; - match provider.parse_response(response, response_time_ms) { + match provider.parse_response(platform_response, response_time_ms) { Ok(auction_response) => { log::info!( "Provider '{}' returned {} bids (status: {:?}, time: {}ms)", @@ -648,7 +624,6 @@ mod tests { }; use crate::platform::test_support::noop_services; use crate::test_support::tests::crate_test_settings_str; - use fastly::Request; use std::collections::{HashMap, HashSet}; use super::AuctionOrchestrator; @@ -702,7 +677,7 @@ mod tests { fn create_test_context<'a>( settings: &'a crate::settings::Settings, - req: &'a Request, + req: &'a http::Request, client_info: &'a crate::platform::ClientInfo, ) -> AuctionContext<'a> { let services: &'static crate::platform::RuntimeServices = @@ -774,8 +749,9 @@ mod tests { } // TODO: Re-enable provider integration tests after implementing mock support - // for send_async(). Mock providers can't create PendingRequest without real - // Fastly backends. + // for `PlatformHttpClient::send_async()`. Mock providers currently cannot + // create realistic pending requests for the select loop without real + // platform-backed transport handles. // // Untested timeout enforcement paths (require real backends): // - Deadline check in select() loop (drops remaining requests) @@ -783,9 +759,9 @@ mod tests { // - Provider skip when effective_timeout == 0 (budget exhausted before launch) // - Provider context receives reduced timeout_ms per remaining budget // - // Follow-up: introduce a thin abstraction over `select()` (e.g. a trait) + // Follow-up: introduce a thin abstraction over `PlatformHttpClient::select()` // so the deadline/drop logic can be unit-tested with mock futures instead - // of requiring real Fastly backends. An `#[ignore]` integration test + // of requiring real platform backends. An `#[ignore]` integration test // exercising the full path via Viceroy would also catch regressions. #[tokio::test] @@ -803,7 +779,11 @@ mod tests { let request = create_test_auction_request(); let settings = create_test_settings(); - let req = Request::get("https://test.com/test"); + let req = http::Request::builder() + .method(http::Method::GET) + .uri("https://test.com/test") + .body(edgezero_core::body::Body::empty()) + .expect("should build request"); let context = create_test_context( &settings, &req, diff --git a/crates/trusted-server-core/src/auction/provider.rs b/crates/trusted-server-core/src/auction/provider.rs index cd3fcfc3..8a925fa7 100644 --- a/crates/trusted-server-core/src/auction/provider.rs +++ b/crates/trusted-server-core/src/auction/provider.rs @@ -1,13 +1,15 @@ //! Trait definition for auction providers. +use async_trait::async_trait; use error_stack::Report; -use fastly::http::request::PendingRequest; use crate::error::TrustedServerError; +use crate::platform::{PlatformPendingRequest, PlatformResponse}; use super::types::{AuctionContext, AuctionRequest, AuctionResponse}; /// Trait implemented by all auction providers (Prebid, APS, GAM, etc.). +#[async_trait(?Send)] pub trait AuctionProvider: Send + Sync { /// Unique identifier for this provider (e.g., "prebid", "aps", "gam"). fn provider_name(&self) -> &'static str; @@ -16,31 +18,32 @@ pub trait AuctionProvider: Send + Sync { /// /// Implementations should: /// - Transform `AuctionRequest` to provider-specific format - /// - Make HTTP call to provider endpoint using `send_async()` - /// - Return `PendingRequest` for orchestrator to await + /// - Make an HTTP call through `context.services.http_client().send_async(...)` + /// - Return [`PlatformPendingRequest`] for the orchestrator to await /// /// The orchestrator will handle waiting for responses and parsing them. /// /// # Errors /// /// Returns an error if the request cannot be created or if the provider endpoint - /// cannot be reached (though usually network errors happen during `PendingRequest` await). - fn request_bids( + /// cannot be reached (though usually network errors happen while the returned + /// [`PlatformPendingRequest`] is polled). + async fn request_bids( &self, request: &AuctionRequest, context: &AuctionContext<'_>, - ) -> Result>; + ) -> Result>; /// Parse the response from the provider into an `AuctionResponse`. /// - /// Called by the orchestrator after the `PendingRequest` completes. + /// Called by the orchestrator after the [`PlatformPendingRequest`] completes. /// /// # Errors /// /// Returns an error if the response cannot be parsed into a valid `AuctionResponse`. fn parse_response( &self, - response: fastly::Response, + response: PlatformResponse, response_time_ms: u64, ) -> Result>; @@ -62,7 +65,7 @@ pub trait AuctionProvider: Send + Sync { /// /// `timeout_ms` is the effective timeout that will be used when the backend /// is registered in [`request_bids`](Self::request_bids). It must be - /// forwarded to [`BackendConfig::backend_name_for_url()`] so the predicted + /// forwarded to [`crate::backend::BackendConfig::backend_name_for_url`] so the predicted /// name matches the actual registration (the timeout is part of the name). fn backend_name(&self, _timeout_ms: u32) -> Option { None diff --git a/crates/trusted-server-core/src/auction/types.rs b/crates/trusted-server-core/src/auction/types.rs index 1882fe62..bcecf144 100644 --- a/crates/trusted-server-core/src/auction/types.rs +++ b/crates/trusted-server-core/src/auction/types.rs @@ -1,6 +1,7 @@ //! Core types for auction requests and responses. -use fastly::Request; +use edgezero_core::body::Body as EdgeBody; +use http::Request; use serde::{Deserialize, Serialize}; use std::collections::HashMap; @@ -102,7 +103,7 @@ pub struct SiteInfo { /// Context passed to auction providers. pub struct AuctionContext<'a> { pub settings: &'a Settings, - pub request: &'a Request, + pub request: &'a Request, pub client_info: &'a ClientInfo, pub timeout_ms: u32, /// Provider responses from the bidding phase, used by mediators. diff --git a/crates/trusted-server-core/src/integrations/adserver_mock.rs b/crates/trusted-server-core/src/integrations/adserver_mock.rs index f4ce745d..6bf8355d 100644 --- a/crates/trusted-server-core/src/integrations/adserver_mock.rs +++ b/crates/trusted-server-core/src/integrations/adserver_mock.rs @@ -4,9 +4,10 @@ //! This integration acts as a mediator in the auction flow, selecting winning bids //! based on price (highest price wins). +use async_trait::async_trait; +use edgezero_core::body::Body as EdgeBody; use error_stack::{Report, ResultExt}; -use fastly::http::Method; -use fastly::Request; +use http::{header, Method}; use serde::{Deserialize, Serialize}; use serde_json::{json, Value as Json}; use std::collections::{BTreeMap, HashMap}; @@ -21,6 +22,7 @@ use crate::auction::types::{ }; use crate::backend::BackendConfig; use crate::error::TrustedServerError; +use crate::platform::{PlatformHttpRequest, PlatformPendingRequest, PlatformResponse}; use crate::settings::{IntegrationConfig, Settings}; // ============================================================================ @@ -269,16 +271,17 @@ impl AdServerMockProvider { } } +#[async_trait(?Send)] impl AuctionProvider for AdServerMockProvider { fn provider_name(&self) -> &'static str { "adserver_mock" } - fn request_bids( + async fn request_bids( &self, request: &AuctionRequest, context: &AuctionContext<'_>, - ) -> Result> { + ) -> Result> { // Get bidder responses from context (passed by orchestrator for mediation) let bidder_responses = context.provider_responses.unwrap_or(&[]); @@ -301,7 +304,18 @@ impl AuctionProvider for AdServerMockProvider { let endpoint_url = self.build_endpoint_url(request); // Create HTTP POST request - let mut req = Request::new(Method::POST, &endpoint_url); + let mediation_body = + serde_json::to_vec(&mediation_req).change_context(TrustedServerError::Auction { + message: "Failed to serialize mediation request".to_string(), + })?; + let mut req = http::Request::builder() + .method(Method::POST) + .uri(&endpoint_url) + .header(header::CONTENT_TYPE, "application/json") + .body(EdgeBody::from(mediation_body)) + .change_context(TrustedServerError::Auction { + message: "Failed to build mediation request".to_string(), + })?; // Set Host header with port to ensure mocktioneer generates correct iframe URLs if let Ok(url) = url::Url::parse(&self.config.endpoint) { @@ -311,15 +325,14 @@ impl AuctionProvider for AdServerMockProvider { } else { host.to_string() }; - req.set_header("Host", &host_with_port); + req.headers_mut().insert( + header::HOST, + header::HeaderValue::from_str(&host_with_port) + .expect("should build host header"), + ); } } - req.set_body_json(&mediation_req) - .change_context(TrustedServerError::Auction { - message: "Failed to set mediation request body".to_string(), - })?; - // Send async with auction-scoped timeout let backend_name = BackendConfig::from_url_with_first_byte_timeout( &self.config.endpoint, @@ -333,8 +346,11 @@ impl AuctionProvider for AdServerMockProvider { ), })?; - let pending = req - .send_async(backend_name) + let pending = context + .services + .http_client() + .send_async(PlatformHttpRequest::new(req, backend_name)) + .await .change_context(TrustedServerError::Auction { message: "Failed to send mediation request".to_string(), })?; @@ -344,18 +360,17 @@ impl AuctionProvider for AdServerMockProvider { fn parse_response( &self, - mut response: fastly::Response, + response: PlatformResponse, response_time_ms: u64, ) -> Result> { - if !response.get_status().is_success() { - log::warn!( - "AdServer Mock returned non-success: {}", - response.get_status() - ); + let response = response.response; + + if !response.status().is_success() { + log::warn!("AdServer Mock returned non-success: {}", response.status()); return Ok(AuctionResponse::error("adserver_mock", response_time_ms)); } - let body_bytes = response.take_body_bytes(); + let body_bytes = response.into_body().into_bytes(); let response_json: Json = serde_json::from_slice(&body_bytes).change_context(TrustedServerError::Auction { message: "Failed to parse mediation response".to_string(), diff --git a/crates/trusted-server-core/src/integrations/aps.rs b/crates/trusted-server-core/src/integrations/aps.rs index af66dd06..9b0f1996 100644 --- a/crates/trusted-server-core/src/integrations/aps.rs +++ b/crates/trusted-server-core/src/integrations/aps.rs @@ -2,9 +2,10 @@ //! //! This module provides the APS auction provider for server-side bidding. +use async_trait::async_trait; +use edgezero_core::body::Body as EdgeBody; use error_stack::{Report, ResultExt}; -use fastly::http::Method; -use fastly::Request; +use http::{header, Method}; use serde::{Deserialize, Serialize}; use serde_json::{json, Value as Json}; use std::collections::HashMap; @@ -15,6 +16,7 @@ use crate::auction::provider::AuctionProvider; use crate::auction::types::{AuctionContext, AuctionRequest, AuctionResponse, Bid, MediaType}; use crate::backend::BackendConfig; use crate::error::TrustedServerError; +use crate::platform::{PlatformHttpRequest, PlatformPendingRequest, PlatformResponse}; use crate::settings::IntegrationConfig; // ============================================================================ @@ -468,16 +470,17 @@ impl ApsAuctionProvider { } } +#[async_trait(?Send)] impl AuctionProvider for ApsAuctionProvider { fn provider_name(&self) -> &'static str { "aps" } - fn request_bids( + async fn request_bids( &self, request: &AuctionRequest, context: &AuctionContext<'_>, - ) -> Result> { + ) -> Result> { log::info!( "APS: requesting bids for {} slots (pub_id: {})", request.slots.len(), @@ -496,12 +499,17 @@ impl AuctionProvider for ApsAuctionProvider { log::trace!("APS: sending bid request: {:?}", aps_json); // Create HTTP POST request - let mut aps_req = Request::new(Method::POST, &self.config.endpoint); - - aps_req - .set_body_json(&aps_json) + let aps_body = + serde_json::to_vec(&aps_json).change_context(TrustedServerError::Auction { + message: "Failed to serialize APS request body".to_string(), + })?; + let aps_req = http::Request::builder() + .method(Method::POST) + .uri(&self.config.endpoint) + .header(header::CONTENT_TYPE, "application/json") + .body(EdgeBody::from(aps_body)) .change_context(TrustedServerError::Auction { - message: "Failed to set APS request body".to_string(), + message: "Failed to build APS request".to_string(), })?; // Send request asynchronously with auction-scoped timeout @@ -517,29 +525,33 @@ impl AuctionProvider for ApsAuctionProvider { ), })?; - let pending = - aps_req - .send_async(backend_name) - .change_context(TrustedServerError::Auction { - message: "Failed to send async request to APS".to_string(), - })?; + let pending = context + .services + .http_client() + .send_async(PlatformHttpRequest::new(aps_req, backend_name)) + .await + .change_context(TrustedServerError::Auction { + message: "Failed to send async request to APS".to_string(), + })?; Ok(pending) } fn parse_response( &self, - mut response: fastly::Response, + response: PlatformResponse, response_time_ms: u64, ) -> Result> { + let response = response.response; + // Check status code - if !response.get_status().is_success() { - log::warn!("APS returned non-success status: {}", response.get_status()); + if !response.status().is_success() { + log::warn!("APS returned non-success status: {}", response.status()); return Ok(AuctionResponse::error("aps", response_time_ms)); } // Parse response body - let body_bytes = response.take_body_bytes(); + let body_bytes = response.into_body().into_bytes(); let response_json: Json = serde_json::from_slice(&body_bytes).change_context(TrustedServerError::Auction { message: "Failed to parse APS response JSON".to_string(), diff --git a/crates/trusted-server-core/src/integrations/datadome.rs b/crates/trusted-server-core/src/integrations/datadome.rs index 0ce83b2f..74f7eaf7 100644 --- a/crates/trusted-server-core/src/integrations/datadome.rs +++ b/crates/trusted-server-core/src/integrations/datadome.rs @@ -58,20 +58,20 @@ use std::sync::{Arc, LazyLock}; use async_trait::async_trait; +use edgezero_core::body::Body as EdgeBody; use error_stack::{Report, ResultExt}; -use fastly::http::{header, Method, StatusCode}; -use fastly::{Request, Response}; +use http::header; +use http::{Method, StatusCode}; use regex::Regex; use serde::Deserialize; use validator::Validate; -use crate::backend::BackendConfig; use crate::error::TrustedServerError; use crate::integrations::{ - AttributeRewriteAction, IntegrationAttributeContext, IntegrationAttributeRewriter, - IntegrationEndpoint, IntegrationProxy, IntegrationRegistration, + ensure_integration_backend, AttributeRewriteAction, IntegrationAttributeContext, + IntegrationAttributeRewriter, IntegrationEndpoint, IntegrationProxy, IntegrationRegistration, }; -use crate::platform::RuntimeServices; +use crate::platform::{PlatformHttpRequest, RuntimeServices}; use crate::settings::{IntegrationConfig, Settings}; const DATADOME_INTEGRATION_ID: &str = "datadome"; @@ -249,87 +249,122 @@ impl DataDomeIntegration { } /// Handle the /tags.js endpoint - fetch and rewrite the `DataDome` SDK. - async fn handle_tags_js(&self, req: Request) -> Result> { - let target_url = self.build_sdk_url("/tags.js", req.get_query_str()); + async fn handle_tags_js( + &self, + services: &RuntimeServices, + req: http::Request, + ) -> Result, Report> { + let target_url = self.build_sdk_url("/tags.js", req.uri().query()); log::info!("[datadome] Fetching tags.js from {}", target_url); - let backend = BackendConfig::from_url(&target_url, true) + let backend = Self::backend_name_for_url(services, &target_url) .change_context(Self::error("Invalid SDK URL"))?; let sdk_host = Self::extract_host(&self.config.sdk_origin); - let mut backend_req = Request::new(Method::GET, &target_url); - backend_req.set_header(header::HOST, sdk_host); - backend_req.set_header(header::ACCEPT, "application/javascript, */*"); + let mut backend_req = http::Request::builder() + .method(Method::GET) + .uri(&target_url) + .header(header::HOST, sdk_host) + .header(header::ACCEPT, "application/javascript, */*") + .body(EdgeBody::empty()) + .change_context(Self::error("Failed to build DataDome SDK request"))?; // Copy relevant headers from original request - if let Some(ua) = req.get_header(header::USER_AGENT) { - backend_req.set_header(header::USER_AGENT, ua); + if let Some(ua) = req.headers().get(header::USER_AGENT) { + backend_req + .headers_mut() + .insert(header::USER_AGENT, ua.clone()); } - let mut backend_resp = backend_req - .send(&backend) + let backend_resp = services + .http_client() + .send(PlatformHttpRequest::new(backend_req, backend)) + .await .change_context(Self::error("Failed to fetch tags.js from DataDome"))?; - if backend_resp.get_status() != StatusCode::OK { + if backend_resp.response.status() != StatusCode::OK { log::warn!( "[datadome] tags.js fetch returned status {}", - backend_resp.get_status() + backend_resp.response.status() ); - return Ok(backend_resp); + return Ok(backend_resp.response); } // Read and rewrite the script content - let body = backend_resp.take_body_str(); - let rewritten = self.rewrite_script_content(&body); + let cors_header = backend_resp + .response + .headers() + .get(header::ACCESS_CONTROL_ALLOW_ORIGIN) + .cloned(); + let body = backend_resp.response.into_body().into_bytes(); + let rewritten = self.rewrite_script_content(&String::from_utf8_lossy(&body)); // Build response with caching headers - let mut response = Response::new(); - response.set_status(StatusCode::OK); - response.set_header( - header::CONTENT_TYPE, - "application/javascript; charset=utf-8", - ); - response.set_header( - header::CACHE_CONTROL, - format!("public, max-age={}", self.config.cache_ttl_seconds), - ); + let mut response = http::Response::builder() + .status(StatusCode::OK) + .header( + header::CONTENT_TYPE, + "application/javascript; charset=utf-8", + ) + .header( + header::CACHE_CONTROL, + format!("public, max-age={}", self.config.cache_ttl_seconds), + ) + .body(EdgeBody::from(rewritten.into_bytes())) + .change_context(Self::error("Failed to build DataDome SDK response"))?; // Copy CORS headers if present - if let Some(cors) = backend_resp.get_header(header::ACCESS_CONTROL_ALLOW_ORIGIN) { - response.set_header(header::ACCESS_CONTROL_ALLOW_ORIGIN, cors); + if let Some(cors) = cors_header { + response + .headers_mut() + .insert(header::ACCESS_CONTROL_ALLOW_ORIGIN, cors); } - response.set_body(rewritten); Ok(response) } /// Handle the /js/* signal collection endpoint - proxy pass-through to api-js.datadome.co. - async fn handle_js_api(&self, req: Request) -> Result> { - let original_path = req.get_path(); + async fn handle_js_api( + &self, + services: &RuntimeServices, + req: http::Request, + ) -> Result, Report> { + let (parts, body) = req.into_parts(); + let original_path = parts.uri.path().to_string(); // Strip our prefix to get the DataDome path let datadome_path = original_path .strip_prefix("/integrations/datadome") - .unwrap_or(original_path); + .unwrap_or(&original_path); // Use api_origin (api-js.datadome.co) for signal collection requests - let target_url = self.build_api_url(datadome_path, req.get_query_str()); + let target_url = self.build_api_url(datadome_path, parts.uri.query()); let api_host = Self::extract_host(&self.config.api_origin); log::info!( "[datadome] Proxying signal request to {} (method: {}, host: {})", target_url, - req.get_method(), + parts.method, api_host ); - let backend = BackendConfig::from_url(&target_url, true) + let backend = Self::backend_name_for_url(services, &target_url) .change_context(Self::error("Invalid API URL"))?; - let mut backend_req = Request::new(req.get_method().clone(), &target_url); - backend_req.set_header(header::HOST, api_host); + let request_body = if parts.method == Method::POST || parts.method == Method::PUT { + body + } else { + EdgeBody::empty() + }; + + let mut backend_req = http::Request::builder() + .method(parts.method.clone()) + .uri(&target_url) + .header(header::HOST, api_host) + .body(request_body) + .change_context(Self::error("Failed to build DataDome API request"))?; // Copy relevant headers let headers_to_copy = [ @@ -344,27 +379,23 @@ impl DataDomeIntegration { ]; for h in &headers_to_copy { - if let Some(value) = req.get_header(h) { - backend_req.set_header(h, value); + if let Some(value) = parts.headers.get(h) { + backend_req.headers_mut().insert(h, value.clone()); } } - // Copy body for POST/PUT requests - if req.get_method() == Method::POST || req.get_method() == Method::PUT { - let body = req.into_body(); - backend_req.set_body(body); - } - - let backend_resp = backend_req - .send(&backend) + let backend_resp = services + .http_client() + .send(PlatformHttpRequest::new(backend_req, backend)) + .await .change_context(Self::error("Failed to proxy signal request to DataDome"))?; log::info!( "[datadome] Signal request returned status {}", - backend_resp.get_status() + backend_resp.response.status() ); - Ok(backend_resp) + Ok(backend_resp.response) } /// Extract the path portion after the `DataDome` domain from a URL. @@ -381,6 +412,13 @@ impl DataDomeIntegration { }) .unwrap_or("/tags.js") } + + fn backend_name_for_url( + services: &RuntimeServices, + target_url: &str, + ) -> Result> { + ensure_integration_backend(services, target_url, DATADOME_INTEGRATION_ID) + } } #[async_trait(?Send)] @@ -405,15 +443,15 @@ impl IntegrationProxy for DataDomeIntegration { async fn handle( &self, _settings: &Settings, - _services: &RuntimeServices, - req: Request, - ) -> Result> { - let path = req.get_path(); + services: &RuntimeServices, + req: http::Request, + ) -> Result, Report> { + let path = req.uri().path().to_string(); if path == "/integrations/datadome/tags.js" { - self.handle_tags_js(req).await + self.handle_tags_js(services, req).await } else if path.starts_with("/integrations/datadome/js/") { - self.handle_js_api(req).await + self.handle_js_api(services, req).await } else { Err(Report::new(Self::error(format!( "Unknown DataDome route: {}", @@ -503,7 +541,11 @@ pub fn register( #[cfg(test)] mod tests { + use std::sync::Arc; + use super::*; + use crate::platform::test_support::{build_services_with_http_client, StubHttpClient}; + use crate::test_support::tests::create_test_settings; fn test_config() -> DataDomeConfig { DataDomeConfig { @@ -820,4 +862,34 @@ mod tests { _ => panic!("Expected Replace action for bare domain"), } } + + #[test] + fn datadome_proxy_uses_platform_http_client() { + let stub = Arc::new(StubHttpClient::new()); + stub.push_response(200, b"ok".to_vec()); + let services = build_services_with_http_client( + Arc::clone(&stub) as Arc + ); + let settings = create_test_settings(); + let integration = DataDomeIntegration::new(test_config()); + let req = http::Request::builder() + .method(http::Method::GET) + .uri("https://publisher.example/integrations/datadome/js/check") + .body(EdgeBody::empty()) + .expect("should build request"); + + let response = futures::executor::block_on(integration.handle(&settings, &services, req)) + .expect("should proxy request"); + + assert_eq!( + response.status(), + http::StatusCode::OK, + "should return stubbed response" + ); + assert_eq!( + stub.recorded_backend_names(), + vec!["stub-backend".to_string()], + "should route outbound request through PlatformHttpClient" + ); + } } diff --git a/crates/trusted-server-core/src/integrations/didomi.rs b/crates/trusted-server-core/src/integrations/didomi.rs index 2dcb5e91..8de3b25d 100644 --- a/crates/trusted-server-core/src/integrations/didomi.rs +++ b/crates/trusted-server-core/src/integrations/didomi.rs @@ -1,17 +1,19 @@ use std::sync::Arc; use async_trait::async_trait; +use edgezero_core::body::Body as EdgeBody; use error_stack::{Report, ResultExt}; -use fastly::http::{header, Method}; -use fastly::{Request, Response}; +use http::header::{self, HeaderMap, HeaderValue}; +use http::Method; use serde::{Deserialize, Serialize}; use url::Url; use validator::Validate; -use crate::backend::BackendConfig; use crate::error::TrustedServerError; -use crate::integrations::{IntegrationEndpoint, IntegrationProxy, IntegrationRegistration}; -use crate::platform::RuntimeServices; +use crate::integrations::{ + ensure_integration_backend, IntegrationEndpoint, IntegrationProxy, IntegrationRegistration, +}; +use crate::platform::{PlatformHttpRequest, RuntimeServices}; use crate::settings::{IntegrationConfig, Settings}; const DIDOMI_INTEGRATION_ID: &str = "didomi"; @@ -102,33 +104,41 @@ impl DidomiIntegration { &self, backend: &DidomiBackend, client_ip: Option, - original_req: &Request, - proxy_req: &mut Request, + original_headers: &HeaderMap, + proxy_headers: &mut HeaderMap, ) { if let Some(ip) = client_ip { - proxy_req.set_header("X-Forwarded-For", ip.to_string()); + proxy_headers.insert( + "X-Forwarded-For", + HeaderValue::from_str(&ip.to_string()) + .expect("should format X-Forwarded-For header"), + ); } for header_name in [ header::ACCEPT, header::ACCEPT_LANGUAGE, header::ACCEPT_ENCODING, + header::CONTENT_TYPE, header::USER_AGENT, header::REFERER, header::ORIGIN, header::AUTHORIZATION, ] { - if let Some(value) = original_req.get_header(&header_name) { - proxy_req.set_header(&header_name, value); + if let Some(value) = original_headers.get(&header_name) { + proxy_headers.insert(header_name, value.clone()); } } if matches!(backend, DidomiBackend::Sdk) { - Self::copy_geo_headers(original_req, proxy_req); + Self::copy_geo_headers(original_headers, proxy_headers); } } - fn copy_geo_headers(original_req: &Request, proxy_req: &mut Request) { + fn copy_geo_headers( + original_headers: &HeaderMap, + proxy_headers: &mut HeaderMap, + ) { let geo_headers = [ ("X-Geo-Country", "FastlyGeo-CountryCode"), ("X-Geo-Region", "FastlyGeo-Region"), @@ -136,23 +146,33 @@ impl DidomiIntegration { ]; for (target, source) in geo_headers { - if let Some(value) = original_req.get_header(source) { - proxy_req.set_header(target, value); + if let Some(value) = original_headers.get(source) { + proxy_headers.insert(target, value.clone()); } } } - fn add_cors_headers(response: &mut Response) { - response.set_header(header::ACCESS_CONTROL_ALLOW_ORIGIN, "*"); - response.set_header( + fn add_cors_headers(response: &mut http::Response) { + response.headers_mut().insert( + header::ACCESS_CONTROL_ALLOW_ORIGIN, + HeaderValue::from_static("*"), + ); + response.headers_mut().insert( header::ACCESS_CONTROL_ALLOW_HEADERS, - "Content-Type, Authorization, X-Requested-With", + HeaderValue::from_static("Content-Type, Authorization, X-Requested-With"), ); - response.set_header( + response.headers_mut().insert( header::ACCESS_CONTROL_ALLOW_METHODS, - "GET, POST, PUT, DELETE, OPTIONS", + HeaderValue::from_static("GET, POST, PUT, DELETE, OPTIONS"), ); } + + fn backend_name_for_origin( + services: &RuntimeServices, + origin: &str, + ) -> Result> { + ensure_integration_backend(services, origin, DIDOMI_INTEGRATION_ID) + } } fn build( @@ -201,10 +221,11 @@ impl IntegrationProxy for DidomiIntegration { &self, _settings: &Settings, services: &RuntimeServices, - req: Request, - ) -> Result> { - let path = req.get_path(); - let consent_path = path.strip_prefix(DIDOMI_PREFIX).unwrap_or(path); + req: http::Request, + ) -> Result, Report> { + let (parts, body) = req.into_parts(); + let path = parts.uri.path().to_string(); + let consent_path = path.strip_prefix(DIDOMI_PREFIX).unwrap_or(&path); let backend = self.backend_for_path(consent_path); let base_origin = match backend { DidomiBackend::Sdk => self.config.sdk_origin.as_str(), @@ -212,44 +233,52 @@ impl IntegrationProxy for DidomiIntegration { }; let target_url = self - .build_target_url(base_origin, consent_path, req.get_query_str()) + .build_target_url(base_origin, consent_path, parts.uri.query()) .change_context(Self::error("Failed to build Didomi target URL"))?; - let backend_name = BackendConfig::from_url(base_origin, true) + let backend_name = Self::backend_name_for_origin(services, base_origin) .change_context(Self::error("Failed to configure Didomi backend"))?; - let mut proxy_req = Request::new(req.get_method().clone(), &target_url); + let request_body = if matches!(parts.method, Method::POST | Method::PUT) { + body + } else { + EdgeBody::empty() + }; + + let mut proxy_req = http::Request::builder() + .method(parts.method.clone()) + .uri(&target_url) + .body(request_body) + .change_context(Self::error("Failed to build Didomi proxy request"))?; self.copy_headers( &backend, services.client_info.client_ip, - &req, - &mut proxy_req, + &parts.headers, + proxy_req.headers_mut(), ); - if matches!(req.get_method(), &Method::POST | &Method::PUT) { - if let Some(content_type) = req.get_header(header::CONTENT_TYPE) { - proxy_req.set_header(header::CONTENT_TYPE, content_type); - } - proxy_req.set_body(req.into_body()); - } - - let mut response = proxy_req - .send(&backend_name) + let mut response = services + .http_client() + .send(PlatformHttpRequest::new(proxy_req, backend_name)) + .await .change_context(Self::error("Didomi upstream request failed"))?; if matches!(backend, DidomiBackend::Sdk) { - Self::add_cors_headers(&mut response); + Self::add_cors_headers(&mut response.response); } - Ok(response) + Ok(response.response) } } #[cfg(test)] mod tests { + use std::sync::Arc; + use super::*; use crate::integrations::IntegrationRegistry; + use crate::platform::test_support::{build_services_with_http_client, StubHttpClient}; use crate::test_support::tests::create_test_settings; - use fastly::http::Method; + use http::Method; use std::net::{IpAddr, Ipv4Addr}; fn config(enabled: bool) -> DidomiIntegrationConfig { @@ -300,15 +329,29 @@ mod tests { fn copy_headers_sets_x_forwarded_for_from_client_ip() { let integration = DidomiIntegration::new(Arc::new(config(true))); let backend = DidomiBackend::Sdk; - let original_req = Request::new(Method::GET, "https://example.com/test"); - let mut proxy_req = Request::new(Method::GET, "https://sdk.privacy-center.org/test"); + let original_req = http::Request::builder() + .method(Method::GET) + .uri("https://example.com/test") + .body(EdgeBody::empty()) + .expect("should build original request"); + let mut proxy_req = http::Request::builder() + .method(Method::GET) + .uri("https://sdk.privacy-center.org/test") + .body(EdgeBody::empty()) + .expect("should build proxy request"); let client_ip = Some(IpAddr::V4(Ipv4Addr::new(1, 2, 3, 4))); - integration.copy_headers(&backend, client_ip, &original_req, &mut proxy_req); + integration.copy_headers( + &backend, + client_ip, + original_req.headers(), + proxy_req.headers_mut(), + ); assert_eq!( proxy_req - .get_header("X-Forwarded-For") + .headers() + .get("X-Forwarded-For") .and_then(|v| v.to_str().ok()), Some("1.2.3.4"), "should set X-Forwarded-For from client_ip" @@ -319,14 +362,57 @@ mod tests { fn copy_headers_omits_x_forwarded_for_when_no_client_ip() { let integration = DidomiIntegration::new(Arc::new(config(true))); let backend = DidomiBackend::Sdk; - let original_req = Request::new(Method::GET, "https://example.com/test"); - let mut proxy_req = Request::new(Method::GET, "https://sdk.privacy-center.org/test"); - - integration.copy_headers(&backend, None, &original_req, &mut proxy_req); + let original_req = http::Request::builder() + .method(Method::GET) + .uri("https://example.com/test") + .body(EdgeBody::empty()) + .expect("should build original request"); + let mut proxy_req = http::Request::builder() + .method(Method::GET) + .uri("https://sdk.privacy-center.org/test") + .body(EdgeBody::empty()) + .expect("should build proxy request"); + + integration.copy_headers( + &backend, + None, + original_req.headers(), + proxy_req.headers_mut(), + ); assert!( - proxy_req.get_header("X-Forwarded-For").is_none(), + proxy_req.headers().get("X-Forwarded-For").is_none(), "should omit X-Forwarded-For when client_ip is None" ); } + + #[test] + fn didomi_proxy_uses_platform_http_client() { + let stub = Arc::new(StubHttpClient::new()); + stub.push_response(200, b"ok".to_vec()); + let services = build_services_with_http_client( + Arc::clone(&stub) as Arc + ); + let settings = create_test_settings(); + let integration = DidomiIntegration::new(Arc::new(config(true))); + let req = http::Request::builder() + .method(http::Method::GET) + .uri("https://publisher.example/integrations/didomi/consent/api/events") + .body(EdgeBody::empty()) + .expect("should build request"); + + let response = futures::executor::block_on(integration.handle(&settings, &services, req)) + .expect("should proxy request"); + + assert_eq!( + response.status(), + http::StatusCode::OK, + "should return stubbed response" + ); + assert_eq!( + stub.recorded_backend_names(), + vec!["stub-backend".to_string()], + "should route outbound request through PlatformHttpClient" + ); + } } diff --git a/crates/trusted-server-core/src/integrations/google_tag_manager.rs b/crates/trusted-server-core/src/integrations/google_tag_manager.rs index a125410a..fc816474 100644 --- a/crates/trusted-server-core/src/integrations/google_tag_manager.rs +++ b/crates/trusted-server-core/src/integrations/google_tag_manager.rs @@ -15,17 +15,17 @@ use std::sync::{Arc, LazyLock}; use async_trait::async_trait; +use edgezero_core::body::Body as EdgeBody; +use futures::StreamExt as _; use error_stack::{Report, ResultExt}; -use fastly::http::{Method, StatusCode}; -use fastly::{Request, Response}; +use http::{header, Method, Request, Response, StatusCode}; use regex::Regex; use serde::{Deserialize, Serialize}; use validator::Validate; -use crate::compat; use crate::error::TrustedServerError; use crate::integrations::{ - AttributeRewriteAction, IntegrationAttributeContext, IntegrationAttributeRewriter, + collect_body, AttributeRewriteAction, IntegrationAttributeContext, IntegrationAttributeRewriter, IntegrationEndpoint, IntegrationProxy, IntegrationRegistration, IntegrationScriptContext, IntegrationScriptRewriter, ScriptRewriteAction, }; @@ -224,7 +224,7 @@ impl GoogleTagManagerIntegration { path.ends_with("/gtm.js") || path.ends_with("/gtag/js") || path.ends_with("/gtag.js") } - fn build_target_url(&self, req: &Request, path: &str) -> Option { + fn build_target_url(&self, req: &Request, path: &str) -> Option { let upstream_base = self.upstream_url(); let mut target_url = if path.ends_with("/gtm.js") { @@ -239,7 +239,7 @@ impl GoogleTagManagerIntegration { return None; }; - if let Some(query) = req.get_url().query() { + if let Some(query) = req.uri().query() { target_url = format!("{}?{}", target_url, query); } else if path.ends_with("/gtm.js") { target_url = format!("{}?id={}", target_url, self.config.container_id); @@ -248,10 +248,10 @@ impl GoogleTagManagerIntegration { Some(target_url) } - fn build_proxy_config<'a>( + async fn build_proxy_config<'a>( &self, path: &str, - req: &mut Request, + req: &mut Request, target_url: &'a str, ) -> Result, PayloadSizeError> { let mut proxy_config = ProxyRequestConfig::new(target_url); @@ -259,42 +259,11 @@ impl GoogleTagManagerIntegration { // If it's a POST request (e.g. /collect beacon), we must manually attach the body // because ProxyRequestConfig doesn't automatically copy it from the source request. - if req.get_method() == Method::POST { + if req.method() == Method::POST { // Read body with size cap to prevent unbounded memory allocation. - // Read in chunks and reject early if body exceeds max_beacon_body_size. - let mut body = req.take_body(); - let mut body_bytes = Vec::new(); - let max_size = self.config.max_beacon_body_size; - const CHUNK_SIZE: usize = 8192; // 8KB chunks - - for chunk_result in body.read_chunks(CHUNK_SIZE) { - let chunk = chunk_result.map_err(|e| { - log::error!("Error reading request body: {}", e); - // Convert I/O error to size error for uniform handling - PayloadSizeError::TooLarge { - actual: 0, - max: max_size, - } - })?; - - // Check if adding this chunk would exceed the limit - // This prevents buffering oversized bodies into memory - if body_bytes.len() + chunk.len() > max_size { - let total_size = body_bytes.len() + chunk.len(); - log::warn!( - "POST body size {} exceeds max {} (rejected during chunked read)", - total_size, - max_size - ); - return Err(PayloadSizeError::TooLarge { - actual: total_size, - max: max_size, - }); - } - - body_bytes.extend_from_slice(&chunk); - } - + let body = std::mem::replace(req.body_mut(), EdgeBody::empty()); + let body_bytes = + Self::collect_request_body_bounded(body, self.config.max_beacon_body_size).await?; proxy_config.body = Some(body_bytes); } @@ -302,18 +271,69 @@ impl GoogleTagManagerIntegration { // The empty value will override any existing header during proxy forwarding. proxy_config = proxy_config.with_header( crate::constants::HEADER_X_FORWARDED_FOR, - fastly::http::HeaderValue::from_static(""), + http::HeaderValue::from_static(""), ); if self.is_rewritable_script(path) { proxy_config = proxy_config.with_header( - fastly::http::header::ACCEPT_ENCODING, - fastly::http::HeaderValue::from_static("identity"), + header::ACCEPT_ENCODING, + http::HeaderValue::from_static("identity"), ); } Ok(proxy_config) } + + async fn collect_request_body_bounded( + body: EdgeBody, + max_size: usize, + ) -> Result, PayloadSizeError> { + match body { + EdgeBody::Once(bytes) => { + if bytes.len() > max_size { + log::warn!( + "POST body size {} exceeds max {} (rejected before proxy)", + bytes.len(), + max_size + ); + return Err(PayloadSizeError::TooLarge { + actual: bytes.len(), + max: max_size, + }); + } + Ok(bytes.to_vec()) + } + EdgeBody::Stream(mut stream) => { + let mut body_bytes = Vec::new(); + while let Some(chunk_result) = stream.next().await { + let chunk = chunk_result.map_err(|error| { + log::error!("Error reading request body: {}", error); + PayloadSizeError::TooLarge { + actual: 0, + max: max_size, + } + })?; + + if body_bytes.len() + chunk.len() > max_size { + let total_size = body_bytes.len() + chunk.len(); + log::warn!( + "POST body size {} exceeds max {} (rejected during stream read)", + total_size, + max_size + ); + return Err(PayloadSizeError::TooLarge { + actual: total_size, + max: max_size, + }); + } + + body_bytes.extend_from_slice(&chunk); + } + Ok(body_bytes) + } + } + } + } fn build( @@ -375,17 +395,20 @@ impl IntegrationProxy for GoogleTagManagerIntegration { &self, settings: &Settings, services: &RuntimeServices, - mut req: Request, - ) -> Result> { - let path = req.get_path().to_string(); - let method = req.get_method(); + req: http::Request, + ) -> Result, Report> { + let mut req = req; + let path = req.uri().path().to_string(); + let method = req.method().clone(); log::debug!("Handling GTM request: {} {}", method, path); // Validate body size for POST requests to prevent memory pressure // Check Content-Length header if present for early rejection if method == Method::POST { - if let Some(content_length_str) = - req.get_header_str(fastly::http::header::CONTENT_LENGTH) + if let Some(content_length_str) = req + .headers() + .get(header::CONTENT_LENGTH) + .and_then(|value| value.to_str().ok()) { match content_length_str.parse::() { Ok(content_length) => { @@ -396,13 +419,23 @@ impl IntegrationProxy for GoogleTagManagerIntegration { content_length, self.config.max_beacon_body_size ); - return Ok(Response::from_status(StatusCode::PAYLOAD_TOO_LARGE)); + return Response::builder() + .status(StatusCode::PAYLOAD_TOO_LARGE) + .body(EdgeBody::empty()) + .change_context(Self::error( + "Failed to build GTM payload-too-large response", + )); } } Err(_) => { // Invalid Content-Length header log::warn!("POST request with malformed Content-Length header"); - return Ok(Response::from_status(StatusCode::BAD_REQUEST)); + return Response::builder() + .status(StatusCode::BAD_REQUEST) + .body(EdgeBody::empty()) + .change_context(Self::error( + "Failed to build GTM bad-request response", + )); } } } @@ -411,13 +444,16 @@ impl IntegrationProxy for GoogleTagManagerIntegration { } let Some(target_url) = self.build_target_url(&req, &path) else { - return Ok(Response::from_status(StatusCode::NOT_FOUND)); + return Response::builder() + .status(StatusCode::NOT_FOUND) + .body(EdgeBody::empty()) + .change_context(Self::error("Failed to build GTM not-found response")); }; log::debug!("Proxying to upstream: {}", target_url); // Handle payload size errors explicitly to return 413 instead of 502 - let proxy_config = match self.build_proxy_config(&path, &mut req, &target_url) { + let proxy_config = match self.build_proxy_config(&path, &mut req, &target_url).await { Ok(config) => config, Err(PayloadSizeError::TooLarge { actual, max }) => { // This catches cases where Content-Length was incorrect @@ -426,40 +462,43 @@ impl IntegrationProxy for GoogleTagManagerIntegration { actual, max ); - return Ok(Response::from_status(StatusCode::PAYLOAD_TOO_LARGE)); + return Response::builder() + .status(StatusCode::PAYLOAD_TOO_LARGE) + .body(EdgeBody::empty()) + .change_context(Self::error( + "Failed to build GTM payload-too-large response", + )); } }; - let mut response = compat::to_fastly_response( - proxy_request( - settings, - compat::from_fastly_request(req), - proxy_config, - services, - ) + let response = proxy_request(settings, req, proxy_config, services) .await - .change_context(Self::error("Failed to proxy GTM request"))?, - ); + .change_context(Self::error("Failed to proxy GTM request"))?; // If we are serving gtm.js or gtag.js, rewrite internal URLs to route beacons through us. if self.is_rewritable_script(&path) { - if !response.get_status().is_success() { - log::warn!("GTM upstream returned status {}", response.get_status()); + if !response.status().is_success() { + log::warn!("GTM upstream returned status {}", response.status()); return Ok(response); } log::debug!("Rewriting GTM/gtag script content"); - let body_str = response.take_body_str(); + let status = response.status(); + let body_bytes = collect_body(response.into_body(), GTM_INTEGRATION_ID).await?; + let body_str = String::from_utf8_lossy(&body_bytes); let rewritten_body = Self::rewrite_gtm_urls(&body_str); - response = Response::from_body(rewritten_body) - .with_header( - fastly::http::header::CONTENT_TYPE, + return Response::builder() + .status(status) + .header( + header::CONTENT_TYPE, "application/javascript; charset=utf-8", ) - .with_header( - fastly::http::header::CACHE_CONTROL, + .header( + header::CACHE_CONTROL, format!("public, max-age={}", self.config.cache_max_age), - ); + ) + .body(EdgeBody::from(rewritten_body.into_bytes())) + .change_context(Self::error("Failed to build rewritten GTM response")); } Ok(response) @@ -526,9 +565,17 @@ mod tests { use crate::platform::test_support::noop_services; use crate::test_support::tests::crate_test_settings_str; - use fastly::http::Method; + use http::Method; use std::io::Cursor; + fn build_http_request(method: Method, uri: &str, body: EdgeBody) -> http::Request { + http::Request::builder() + .method(method) + .uri(uri) + .body(body) + .expect("should build HTTP request") + } + #[test] fn test_rewrite_gtm_urls() { // All URL patterns should be rewritten via the shared regex @@ -929,8 +976,8 @@ j=d.createElement(s),dl=l!='dataLayer'?'&l='+l:'';j.async=true;j.src= .any(|r| r.path == "/integrations/google_tag_manager/g/collect")); } - #[test] - fn test_post_collect_proxy_config_includes_payload() { + #[tokio::test] + async fn test_post_collect_proxy_config_includes_payload() { let config = GoogleTagManagerConfig { enabled: true, container_id: "GTM-TEST1234".to_string(), @@ -941,18 +988,19 @@ j=d.createElement(s),dl=l!='dataLayer'?'&l='+l:'';j.async=true;j.src= let integration = GoogleTagManagerIntegration::new(config); let payload = b"v=2&tid=G-TEST&cid=123&en=page_view".to_vec(); - let mut req = Request::new( + let mut req = build_http_request( Method::POST, "https://edge.example.com/integrations/google_tag_manager/g/collect?v=2&tid=G-TEST", + EdgeBody::from(payload.clone()), ); - req.set_body(payload.clone()); - let path = req.get_path().to_string(); + let path = req.uri().path().to_string(); let target_url = integration .build_target_url(&req, &path) .expect("should resolve collect target URL"); let proxy_config = integration .build_proxy_config(&path, &mut req, &target_url) + .await .expect("should build proxy config"); assert_eq!( @@ -962,8 +1010,8 @@ j=d.createElement(s),dl=l!='dataLayer'?'&l='+l:'';j.async=true;j.src= ); } - #[test] - fn test_oversized_post_body_rejected() { + #[tokio::test] + async fn test_oversized_post_body_rejected() { let max_size = default_max_beacon_body_size(); let config = GoogleTagManagerConfig { enabled: true, @@ -976,19 +1024,21 @@ j=d.createElement(s),dl=l!='dataLayer'?'&l='+l:'';j.async=true;j.src= // Create a payload larger than the configured max size (64KB by default) let oversized_payload = vec![b'X'; max_size + 1]; - let mut req = Request::new( + let mut req = build_http_request( Method::POST, "https://edge.example.com/integrations/google_tag_manager/collect", + EdgeBody::from(oversized_payload.clone()), ); - req.set_body(oversized_payload.clone()); - let path = req.get_path().to_string(); + let path = req.uri().path().to_string(); let target_url = integration .build_target_url(&req, &path) .expect("should resolve collect target URL"); // Attempt to build proxy config should fail due to oversized body - let result = integration.build_proxy_config(&path, &mut req, &target_url); + let result = integration + .build_proxy_config(&path, &mut req, &target_url) + .await; assert!(result.is_err(), "Oversized POST body should be rejected"); @@ -1000,8 +1050,8 @@ j=d.createElement(s),dl=l!='dataLayer'?'&l='+l:'';j.async=true;j.src= } } - #[test] - fn test_custom_max_beacon_body_size() { + #[tokio::test] + async fn test_custom_max_beacon_body_size() { // Test with a custom smaller limit let custom_max_size = 1024; // 1KB let config = GoogleTagManagerConfig { @@ -1015,41 +1065,45 @@ j=d.createElement(s),dl=l!='dataLayer'?'&l='+l:'';j.async=true;j.src= // Payload just under the custom limit should succeed let acceptable_payload = vec![b'X'; custom_max_size - 1]; - let mut req1 = Request::new( + let mut req1 = build_http_request( Method::POST, "https://edge.example.com/integrations/google_tag_manager/collect", + EdgeBody::from(acceptable_payload.clone()), ); - req1.set_body(acceptable_payload.clone()); - let path = req1.get_path().to_string(); + let path = req1.uri().path().to_string(); let target_url = integration .build_target_url(&req1, &path) .expect("should resolve collect target URL"); - let result = integration.build_proxy_config(&path, &mut req1, &target_url); + let result = integration + .build_proxy_config(&path, &mut req1, &target_url) + .await; assert!(result.is_ok(), "Payload under custom limit should succeed"); // Payload over the custom limit should fail let oversized_payload = vec![b'X'; custom_max_size + 1]; - let mut req2 = Request::new( + let mut req2 = build_http_request( Method::POST, "https://edge.example.com/integrations/google_tag_manager/collect", + EdgeBody::from(oversized_payload), ); - req2.set_body(oversized_payload); let target_url2 = integration .build_target_url(&req2, &path) .expect("should resolve collect target URL"); - let result2 = integration.build_proxy_config(&path, &mut req2, &target_url2); + let result2 = integration + .build_proxy_config(&path, &mut req2, &target_url2) + .await; assert!( result2.is_err(), "Payload over custom limit should be rejected" ); } - #[test] - fn test_incorrect_content_length_returns_413() { + #[tokio::test] + async fn test_incorrect_content_length_returns_413() { // Verify that when Content-Length is incorrect (smaller than actual body), // we still catch it and return 413 (not 502) let max_size = default_max_beacon_body_size(); @@ -1064,24 +1118,27 @@ j=d.createElement(s),dl=l!='dataLayer'?'&l='+l:'';j.async=true;j.src= // Create oversized payload but with incorrect (small) Content-Length let oversized_payload = vec![b'X'; max_size + 1]; - let mut req = Request::new( + let mut req = build_http_request( Method::POST, "https://edge.example.com/integrations/google_tag_manager/collect", + EdgeBody::from(oversized_payload.clone()), ); - req.set_body(oversized_payload.clone()); // Set Content-Length to a small value (incorrect) - req.set_header( - fastly::http::header::CONTENT_LENGTH, - (max_size / 2).to_string(), + req.headers_mut().insert( + http::header::CONTENT_LENGTH, + http::HeaderValue::from_str(&(max_size / 2).to_string()) + .expect("should build Content-Length header"), ); - let path = req.get_path().to_string(); + let path = req.uri().path().to_string(); let target_url = integration .build_target_url(&req, &path) .expect("should resolve collect target URL"); // build_proxy_config should detect the mismatch and return PayloadSizeError - let result = integration.build_proxy_config(&path, &mut req, &target_url); + let result = integration + .build_proxy_config(&path, &mut req, &target_url) + .await; assert!( result.is_err(), @@ -1112,14 +1169,15 @@ j=d.createElement(s),dl=l!='dataLayer'?'&l='+l:'';j.async=true;j.src= // Create oversized payload with correct Content-Length let oversized_payload = vec![b'X'; max_size + 1]; - let mut req = Request::new( - Method::POST, - "https://edge.example.com/integrations/google_tag_manager/collect", - ); - req.set_body(oversized_payload.clone()); - req.set_header( - fastly::http::header::CONTENT_LENGTH, - oversized_payload.len().to_string(), + let mut req = http::Request::builder() + .method(Method::POST) + .uri("https://edge.example.com/integrations/google_tag_manager/collect") + .body(EdgeBody::from(oversized_payload.clone())) + .expect("should build oversized request"); + req.headers_mut().insert( + http::header::CONTENT_LENGTH, + http::HeaderValue::from_str(&oversized_payload.len().to_string()) + .expect("should build Content-Length header"), ); let settings = make_settings(); @@ -1130,7 +1188,7 @@ j=d.createElement(s),dl=l!='dataLayer'?'&l='+l:'';j.async=true;j.src= // Verify we get 413 Payload Too Large, not 502 Bad Gateway assert_eq!( - response.get_status(), + response.status(), StatusCode::PAYLOAD_TOO_LARGE, "Should return 413 for oversized POST body" ); @@ -1150,12 +1208,15 @@ j=d.createElement(s),dl=l!='dataLayer'?'&l='+l:'';j.async=true;j.src= // Create POST request with invalid Content-Length header let payload = b"v=2&tid=G-TEST&cid=123".to_vec(); - let mut req = Request::new( - Method::POST, - "https://edge.example.com/integrations/google_tag_manager/collect", + let mut req = http::Request::builder() + .method(Method::POST) + .uri("https://edge.example.com/integrations/google_tag_manager/collect") + .body(EdgeBody::from(payload)) + .expect("should build malformed request"); + req.headers_mut().insert( + http::header::CONTENT_LENGTH, + http::HeaderValue::from_static("not-a-number"), ); - req.set_body(payload); - req.set_header(fastly::http::header::CONTENT_LENGTH, "not-a-number"); let settings = make_settings(); let response = integration @@ -1165,7 +1226,7 @@ j=d.createElement(s),dl=l!='dataLayer'?'&l='+l:'';j.async=true;j.src= // Verify we get 400 Bad Request for malformed Content-Length assert_eq!( - response.get_status(), + response.status(), StatusCode::BAD_REQUEST, "Should return 400 for malformed Content-Length" ); @@ -1187,20 +1248,22 @@ j=d.createElement(s),dl=l!='dataLayer'?'&l='+l:'';j.async=true;j.src= // Create small POST request without Content-Length header let small_payload = b"v=2&tid=G-TEST&cid=123".to_vec(); - let mut req = Request::new( + let mut req = build_http_request( Method::POST, "https://edge.example.com/integrations/google_tag_manager/collect", + EdgeBody::from(small_payload), ); - req.set_body(small_payload); // Intentionally NOT setting Content-Length header (HTTP/2 scenario) - let path = req.get_path().to_string(); + let path = req.uri().path().to_string(); let target_url = integration .build_target_url(&req, &path) .expect("should resolve collect target URL"); // build_proxy_config should accept small payloads even without Content-Length - let result = integration.build_proxy_config(&path, &mut req, &target_url); + let result = integration + .build_proxy_config(&path, &mut req, &target_url) + .await; assert!( result.is_ok(), @@ -1208,8 +1271,8 @@ j=d.createElement(s),dl=l!='dataLayer'?'&l='+l:'';j.async=true;j.src= ); } - #[test] - fn test_collect_proxy_config_strips_client_ip_forwarding() { + #[tokio::test] + async fn test_collect_proxy_config_strips_client_ip_forwarding() { let config = GoogleTagManagerConfig { enabled: true, container_id: "GTM-TEST1234".to_string(), @@ -1219,18 +1282,23 @@ j=d.createElement(s),dl=l!='dataLayer'?'&l='+l:'';j.async=true;j.src= }; let integration = GoogleTagManagerIntegration::new(config); - let mut req = Request::new( + let mut req = build_http_request( Method::GET, "https://edge.example.com/integrations/google_tag_manager/collect?v=2", + EdgeBody::empty(), + ); + req.headers_mut().insert( + crate::constants::HEADER_X_FORWARDED_FOR, + http::HeaderValue::from_static("198.51.100.42"), ); - req.set_header(crate::constants::HEADER_X_FORWARDED_FOR, "198.51.100.42"); - let path = req.get_path().to_string(); + let path = req.uri().path().to_string(); let target_url = integration .build_target_url(&req, &path) .expect("should resolve collect target URL"); let proxy_config = integration .build_proxy_config(&path, &mut req, &target_url) + .await .expect("should build proxy config"); // We check if X-Forwarded-For is explicitly overridden with an empty string, @@ -1247,8 +1315,8 @@ j=d.createElement(s),dl=l!='dataLayer'?'&l='+l:'';j.async=true;j.src= ); } - #[test] - fn test_gtag_proxy_config_requests_identity_encoding() { + #[tokio::test] + async fn test_gtag_proxy_config_requests_identity_encoding() { let config = GoogleTagManagerConfig { enabled: true, container_id: "GT-123".to_string(), @@ -1258,22 +1326,25 @@ j=d.createElement(s),dl=l!='dataLayer'?'&l='+l:'';j.async=true;j.src= }; let integration = GoogleTagManagerIntegration::new(config); - let mut req = Request::new( + let mut req = build_http_request( Method::GET, "https://edge.example.com/integrations/google_tag_manager/gtag/js?id=G-123", + EdgeBody::empty(), ); - let path = req.get_path().to_string(); + let path = req.uri().path().to_string(); let target_url = integration .build_target_url(&req, &path) .expect("should resolve gtag target URL"); let proxy_config = integration .build_proxy_config(&path, &mut req, &target_url) + .await .expect("should build proxy config"); - let has_identity = proxy_config.headers.iter().any(|(name, value)| { - name == fastly::http::header::ACCEPT_ENCODING && value == "identity" - }); + let has_identity = proxy_config + .headers + .iter() + .any(|(name, value)| name == http::header::ACCEPT_ENCODING && value == "identity"); assert!( has_identity, diff --git a/crates/trusted-server-core/src/integrations/gpt.rs b/crates/trusted-server-core/src/integrations/gpt.rs index 190ce5bf..818eeda8 100644 --- a/crates/trusted-server-core/src/integrations/gpt.rs +++ b/crates/trusted-server-core/src/integrations/gpt.rs @@ -35,14 +35,13 @@ use std::sync::Arc; use async_trait::async_trait; +use edgezero_core::body::Body as EdgeBody; use error_stack::{Report, ResultExt}; -use fastly::http::header; -use fastly::{Request, Response}; +use http::{header, Request, Response}; use serde::{Deserialize, Serialize}; use url::Url; use validator::Validate; -use crate::compat; use crate::constants::{HEADER_ACCEPT, HEADER_ACCEPT_ENCODING, HEADER_ACCEPT_LANGUAGE}; use crate::error::TrustedServerError; use crate::integrations::{ @@ -126,7 +125,10 @@ impl GptIntegration { )) } - fn build_proxy_config<'a>(target_url: &'a str, req: &Request) -> ProxyRequestConfig<'a> { + fn build_proxy_config<'a>( + target_url: &'a str, + req: &Request, + ) -> ProxyRequestConfig<'a> { let mut config = ProxyRequestConfig::new(target_url) .with_streaming() .without_forward_headers(); @@ -138,33 +140,33 @@ impl GptIntegration { fn apply_request_header_allowlist<'a>( mut config: ProxyRequestConfig<'a>, - req: &Request, + req: &Request, ) -> ProxyRequestConfig<'a> { for header_name in [ &HEADER_ACCEPT, &HEADER_ACCEPT_LANGUAGE, &HEADER_ACCEPT_ENCODING, ] { - if let Some(value) = req.get_header(header_name).cloned() { + if let Some(value) = req.headers().get(header_name).cloned() { config = config.with_header(header_name.clone(), value); } } config.with_header( header::USER_AGENT, - fastly::http::HeaderValue::from_static("TrustedServer/1.0"), + http::HeaderValue::from_static("TrustedServer/1.0"), ) } fn ensure_successful_gpt_asset_response( - response: &Response, + response: &Response, context: &str, ) -> Result<(), Report> { - if response.get_status().is_success() { + if response.status().is_success() { return Ok(()); } - let status = response.get_status(); + let status = response.status(); log::error!( "GPT proxy upstream returned status {} for {}", status, @@ -175,45 +177,62 @@ impl GptIntegration { )))) } - fn finalize_gpt_asset_response(&self, mut response: Response) -> Response { - let status = response.get_status(); - let content_type = response.get_header(header::CONTENT_TYPE).cloned(); - let content_encoding = response.get_header(header::CONTENT_ENCODING).cloned(); - let etag = response.get_header(header::ETAG).cloned(); - let last_modified = response.get_header(header::LAST_MODIFIED).cloned(); - let upstream_vary = response - .get_header(header::VARY) + fn finalize_gpt_asset_response(&self, response: Response) -> Response { + let (parts, body) = response.into_parts(); + let status = parts.status; + let content_type = parts.headers.get(header::CONTENT_TYPE).cloned(); + let content_encoding = parts.headers.get(header::CONTENT_ENCODING).cloned(); + let etag = parts.headers.get(header::ETAG).cloned(); + let last_modified = parts.headers.get(header::LAST_MODIFIED).cloned(); + let upstream_vary = parts + .headers + .get(header::VARY) .and_then(|value| value.to_str().ok()) .map(str::to_owned); - let body = response.take_body(); - let mut finalized = Response::from_status(status).with_body(body); - finalized.set_header("X-GPT-Proxy", "true"); + let mut finalized = Response::new(body); + *finalized.status_mut() = status; + finalized + .headers_mut() + .insert("X-GPT-Proxy", http::HeaderValue::from_static("true")); if let Some(content_type) = content_type { - finalized.set_header(header::CONTENT_TYPE, content_type); + finalized + .headers_mut() + .insert(header::CONTENT_TYPE, content_type); } if let Some(etag) = etag { - finalized.set_header(header::ETAG, etag); + finalized.headers_mut().insert(header::ETAG, etag); } if let Some(last_modified) = last_modified { - finalized.set_header(header::LAST_MODIFIED, last_modified); + finalized + .headers_mut() + .insert(header::LAST_MODIFIED, last_modified); } if let Some(content_encoding) = content_encoding { - finalized.set_header(header::CONTENT_ENCODING, content_encoding); - finalized.set_header( + finalized + .headers_mut() + .insert(header::CONTENT_ENCODING, content_encoding); + finalized.headers_mut().insert( header::VARY, - Self::vary_with_accept_encoding(upstream_vary.as_deref()), + http::HeaderValue::from_str(&Self::vary_with_accept_encoding( + upstream_vary.as_deref(), + )) + .expect("should build GPT Vary header"), ); } if status.is_success() { - finalized.set_header( + finalized.headers_mut().insert( header::CACHE_CONTROL, - format!("public, max-age={}", self.config.cache_ttl_seconds), + http::HeaderValue::from_str(&format!( + "public, max-age={}", + self.config.cache_ttl_seconds + )) + .expect("should build GPT Cache-Control header"), ); } @@ -241,16 +260,14 @@ impl GptIntegration { &self, settings: &Settings, services: &RuntimeServices, - req: Request, + req: Request, target_url: &str, context: &str, - ) -> Result> { + ) -> Result, Report> { let config = Self::build_proxy_config(target_url, &req); - let response = compat::to_fastly_response( - proxy_request(settings, compat::from_fastly_request(req), config, services) - .await - .change_context(Self::error(context))?, - ); + let response = proxy_request(settings, req, config, services) + .await + .change_context(Self::error(context))?; Self::ensure_successful_gpt_asset_response(&response, context)?; Ok(self.finalize_gpt_asset_response(response)) @@ -293,8 +310,8 @@ impl GptIntegration { &self, settings: &Settings, services: &RuntimeServices, - req: Request, - ) -> Result> { + req: Request, + ) -> Result, Report> { let script_url = &self.config.script_url; log::info!("Fetching GPT script from: {}", script_url); self.proxy_gpt_asset( @@ -317,12 +334,12 @@ impl GptIntegration { &self, settings: &Settings, services: &RuntimeServices, - req: Request, - ) -> Result> { - let original_path = req.get_path(); - let query = req.get_url().query(); + req: Request, + ) -> Result, Report> { + let original_path = req.uri().path().to_string(); + let query = req.uri().query(); - let target_url = Self::build_upstream_url(original_path, query) + let target_url = Self::build_upstream_url(&original_path, query) .ok_or_else(|| Self::error(format!("Invalid GPT pagead path: {}", original_path)))?; log::info!("GPT proxy: forwarding to {}", target_url); @@ -386,9 +403,9 @@ impl IntegrationProxy for GptIntegration { &self, settings: &Settings, services: &RuntimeServices, - req: Request, - ) -> Result> { - let path = req.get_path(); + req: http::Request, + ) -> Result, Report> { + let path = req.uri().path().to_string(); if path == "/integrations/gpt/script" { self.handle_script_serving(settings, services, req).await @@ -476,7 +493,7 @@ mod tests { use crate::constants::HEADER_X_FORWARDED_FOR; use crate::integrations::IntegrationDocumentState; use crate::test_support::tests::create_test_settings; - use fastly::http::Method; + use http::Method; fn test_config() -> GptConfig { GptConfig { @@ -496,6 +513,14 @@ mod tests { } } + fn build_http_request(method: Method, uri: &str) -> http::Request { + http::Request::builder() + .method(method) + .uri(uri) + .body(EdgeBody::empty()) + .expect("should build HTTP request") + } + // -- URL detection -- #[test] @@ -636,7 +661,7 @@ mod tests { #[test] fn build_proxy_config_uses_streaming_without_synthetic_forwarding_or_redirects() { - let req = Request::new( + let req = build_http_request( Method::GET, "https://edge.example.com/integrations/gpt/script", ); @@ -661,13 +686,22 @@ mod tests { #[test] fn build_proxy_config_forwards_only_required_headers() { - let mut req = Request::new( + let mut req = build_http_request( Method::GET, "https://edge.example.com/integrations/gpt/script", ); - req.set_header(HEADER_ACCEPT, "application/javascript"); - req.set_header(HEADER_ACCEPT_LANGUAGE, "en-US,en;q=0.9"); - req.set_header(HEADER_ACCEPT_ENCODING, "gzip"); + req.headers_mut().insert( + HEADER_ACCEPT, + http::HeaderValue::from_static("application/javascript"), + ); + req.headers_mut().insert( + HEADER_ACCEPT_LANGUAGE, + http::HeaderValue::from_static("en-US,en;q=0.9"), + ); + req.headers_mut().insert( + HEADER_ACCEPT_ENCODING, + http::HeaderValue::from_static("gzip"), + ); let config = GptIntegration::build_proxy_config( "https://securepubads.g.doubleclick.net/tag/js/gpt.js", @@ -737,7 +771,7 @@ mod tests { #[test] fn build_proxy_config_does_not_advertise_accept_encoding_when_client_omits_it() { - let req = Request::new( + let req = build_http_request( Method::GET, "https://edge.example.com/integrations/gpt/script", ); @@ -761,67 +795,94 @@ mod tests { #[test] fn finalize_gpt_asset_response_rebuilds_successful_responses_with_safe_headers() { let integration = GptIntegration::new(test_config()); - let response = Response::from_status(fastly::http::StatusCode::OK) - .with_header( + let response = http::Response::builder() + .status(http::StatusCode::OK) + .header( header::CONTENT_TYPE, "application/javascript; charset=utf-8", ) - .with_header(header::ETAG, "\"gpt-etag\"") - .with_header(header::LAST_MODIFIED, "Thu, 13 Mar 2025 08:00:00 GMT") - .with_header(header::CONTENT_ENCODING, "br") - .with_header(header::VARY, "Origin") - .with_header(header::SET_COOKIE, "gpt=1; Secure"); + .header(header::ETAG, "\"gpt-etag\"") + .header(header::LAST_MODIFIED, "Thu, 13 Mar 2025 08:00:00 GMT") + .header(header::CONTENT_ENCODING, "br") + .header(header::VARY, "Origin") + .header(header::SET_COOKIE, "gpt=1; Secure") + .body(EdgeBody::empty()) + .expect("should build GPT response"); let response = integration.finalize_gpt_asset_response(response); assert_eq!( - response.get_status(), - fastly::http::StatusCode::OK, + response.status(), + http::StatusCode::OK, "should preserve successful upstream statuses" ); assert_eq!( - response.get_header_str("X-GPT-Proxy"), + response + .headers() + .get("X-GPT-Proxy") + .and_then(|value| value.to_str().ok()), Some("true"), "should tag proxied GPT responses" ); assert_eq!( - response.get_header_str(header::CONTENT_TYPE), + response + .headers() + .get(header::CONTENT_TYPE) + .and_then(|value| value.to_str().ok()), Some("application/javascript; charset=utf-8"), "should preserve upstream content type for GPT assets" ); assert_eq!( - response.get_header_str(header::ETAG), + response + .headers() + .get(header::ETAG) + .and_then(|value| value.to_str().ok()), Some("\"gpt-etag\""), "should preserve upstream ETag validators for GPT assets" ); assert_eq!( - response.get_header_str(header::LAST_MODIFIED), + response + .headers() + .get(header::LAST_MODIFIED) + .and_then(|value| value.to_str().ok()), Some("Thu, 13 Mar 2025 08:00:00 GMT"), "should preserve upstream Last-Modified validators for GPT assets" ); assert_eq!( - response.get_header_str(header::CONTENT_ENCODING), + response + .headers() + .get(header::CONTENT_ENCODING) + .and_then(|value| value.to_str().ok()), Some("br"), "should preserve upstream content encoding for GPT assets" ); assert_eq!( - response.get_header_str(header::VARY), + response + .headers() + .get(header::VARY) + .and_then(|value| value.to_str().ok()), Some("Origin, Accept-Encoding"), "should normalize Vary when returning encoded GPT assets" ); assert_eq!( - response.get_header_str(header::CACHE_CONTROL), + response + .headers() + .get(header::CACHE_CONTROL) + .and_then(|value| value.to_str().ok()), Some("public, max-age=3600"), "should add cache headers for successful GPT asset responses" ); assert!( - response.get_header(header::SET_COOKIE).is_none(), + response.headers().get(header::SET_COOKIE).is_none(), "should not project unrelated upstream headers to first-party clients" ); } #[test] fn ensure_successful_gpt_asset_response_rejects_non_success_statuses() { - let response = Response::from_status(fastly::http::StatusCode::SERVICE_UNAVAILABLE); + let response = http::Response::builder() + .status(http::StatusCode::SERVICE_UNAVAILABLE) + .body(EdgeBody::empty()) + .expect("should build service unavailable response"); let err = GptIntegration::ensure_successful_gpt_asset_response( &response, "Failed to fetch GPT script from https://securepubads.g.doubleclick.net/tag/js/gpt.js", diff --git a/crates/trusted-server-core/src/integrations/lockr.rs b/crates/trusted-server-core/src/integrations/lockr.rs index 8e63345f..9e868bba 100644 --- a/crates/trusted-server-core/src/integrations/lockr.rs +++ b/crates/trusted-server-core/src/integrations/lockr.rs @@ -10,20 +10,21 @@ use std::sync::Arc; use async_trait::async_trait; +use edgezero_core::body::Body as EdgeBody; use error_stack::{Report, ResultExt}; -use fastly::http::{header, Method, StatusCode}; -use fastly::{Request, Response}; +use http::header::{self, HeaderMap, HeaderValue}; +use http::{Method, StatusCode}; use serde::Deserialize; use validator::Validate; -use crate::backend::BackendConfig; -use crate::compat; +use crate::constants::INTERNAL_HEADERS; +use crate::cookies::{strip_cookies, CONSENT_COOKIE_NAMES}; use crate::error::TrustedServerError; use crate::integrations::{ - AttributeRewriteAction, IntegrationAttributeContext, IntegrationAttributeRewriter, - IntegrationEndpoint, IntegrationProxy, IntegrationRegistration, + ensure_integration_backend, AttributeRewriteAction, IntegrationAttributeContext, + IntegrationAttributeRewriter, IntegrationEndpoint, IntegrationProxy, IntegrationRegistration, }; -use crate::platform::RuntimeServices; +use crate::platform::{PlatformHttpRequest, RuntimeServices}; use crate::settings::{IntegrationConfig, Settings}; const LOCKR_INTEGRATION_ID: &str = "lockr"; @@ -106,67 +107,77 @@ impl LockrIntegration { async fn handle_sdk_serving( &self, _settings: &Settings, - _req: Request, - ) -> Result> { + services: &RuntimeServices, + ) -> Result, Report> { let sdk_url = &self.config.sdk_url; log::info!("Fetching Lockr SDK from {}", sdk_url); // TODO: Check KV store cache first (future enhancement) - let mut lockr_req = Request::new(Method::GET, sdk_url); - lockr_req.set_header(header::USER_AGENT, "TrustedServer/1.0"); - lockr_req.set_header(header::ACCEPT, "application/javascript, */*"); + let lockr_req = http::Request::builder() + .method(Method::GET) + .uri(sdk_url) + .header(header::USER_AGENT, "TrustedServer/1.0") + .header(header::ACCEPT, "application/javascript, */*") + .body(EdgeBody::empty()) + .change_context(Self::error("Failed to build Lockr SDK request"))?; - let backend_name = BackendConfig::from_url(sdk_url, true) + let backend_name = Self::backend_name_for_url(services, sdk_url) .change_context(Self::error("Failed to determine backend for SDK fetch"))?; - let mut lockr_response = - lockr_req - .send(backend_name) - .change_context(Self::error(format!( - "Failed to fetch Lockr SDK from {}", - sdk_url - )))?; - - if !lockr_response.get_status().is_success() { + let lockr_response = services + .http_client() + .send(PlatformHttpRequest::new(lockr_req, backend_name)) + .await + .change_context(Self::error(format!( + "Failed to fetch Lockr SDK from {}", + sdk_url + )))? + .response; + + if !lockr_response.status().is_success() { log::error!( "Lockr SDK fetch failed with status {}", - lockr_response.get_status() + lockr_response.status() ); return Err(Report::new(Self::error(format!( "Lockr SDK returned error status: {}", - lockr_response.get_status() + lockr_response.status() )))); } - let sdk_body = lockr_response.take_body_bytes(); + let sdk_body = lockr_response.into_body().into_bytes(); log::info!("Fetched Lockr SDK ({} bytes)", sdk_body.len()); // TODO: Cache in KV store (future enhancement) - Ok(Response::from_status(StatusCode::OK) - .with_header( + http::Response::builder() + .status(StatusCode::OK) + .header( header::CONTENT_TYPE, "application/javascript; charset=utf-8", ) - .with_header( + .header( header::CACHE_CONTROL, format!("public, max-age={}", self.config.cache_ttl_seconds), ) - .with_header("X-Lockr-SDK-Proxy", "true") - .with_header("X-Lockr-SDK-Mode", "trust-server") - .with_header("X-SDK-Source", sdk_url) - .with_body(sdk_body)) + .header("X-Lockr-SDK-Proxy", "true") + .header("X-Lockr-SDK-Mode", "trust-server") + .header("X-SDK-Source", sdk_url) + .body(EdgeBody::from(sdk_body.to_vec())) + .change_context(Self::error("Failed to build Lockr SDK response")) } /// Handle API proxy — forward requests to the configured Lockr API endpoint. async fn handle_api_proxy( &self, _settings: &Settings, - mut req: Request, - ) -> Result> { - let original_path = req.get_path(); - let method = req.get_method(); + services: &RuntimeServices, + req: http::Request, + ) -> Result, Report> { + let (parts, body) = req.into_parts(); + let original_path = parts.uri.path().to_string(); + let method = parts.method.clone(); log::info!("Proxying Lockr API request: {} {}", method, original_path); @@ -176,8 +187,8 @@ impl LockrIntegration { .strip_prefix("/integrations/lockr/api") .ok_or_else(|| Self::error(format!("Invalid Lockr API path: {}", original_path)))?; - let query = req - .get_url() + let query = parts + .uri .query() .map(|q| format!("?{}", q)) .unwrap_or_default(); @@ -185,30 +196,33 @@ impl LockrIntegration { log::info!("Forwarding to Lockr API: {}", target_url); - let mut target_req = Request::new(method.clone(), &target_url); - self.copy_request_headers(&req, &mut target_req); + let request_body = if matches!(method, Method::POST | Method::PUT | Method::PATCH) { + body + } else { + EdgeBody::empty() + }; - if matches!(method, &Method::POST | &Method::PUT | &Method::PATCH) { - let body = req.take_body(); - target_req.set_body(body); - } + let mut target_req = http::Request::builder() + .method(method.clone()) + .uri(&target_url) + .body(request_body) + .change_context(Self::error("Failed to build Lockr API proxy request"))?; + self.copy_request_headers(&parts.headers, target_req.headers_mut())?; - let backend_name = BackendConfig::from_url(&self.config.api_endpoint, true) + let backend_name = Self::backend_name_for_url(services, &self.config.api_endpoint) .change_context(Self::error("Failed to determine backend for API proxy"))?; - let response = match target_req.send(backend_name) { - Ok(res) => res, - Err(e) => { - return Err(Self::error(format!( - "failed to forward request to {}, {}", - target_url, - e.root_cause() - )) - .into()); - } - }; + let response = services + .http_client() + .send(PlatformHttpRequest::new(target_req, backend_name)) + .await + .change_context(Self::error(format!( + "Failed to forward request to {}", + target_url + )))? + .response; - log::info!("Lockr API responded with status {}", response.get_status()); + log::info!("Lockr API responded with status {}", response.status()); Ok(response) } @@ -218,7 +232,11 @@ impl LockrIntegration { /// Consent cookies are always stripped — consent signals are forwarded /// through the `OpenRTB` body by the Prebid integration, not through /// Lockr's cookie-based API calls. - fn copy_request_headers(&self, from: &Request, to: &mut Request) { + fn copy_request_headers( + &self, + from: &HeaderMap, + to: &mut HeaderMap, + ) -> Result<(), Report> { let headers_to_copy = [ header::CONTENT_TYPE, header::ACCEPT, @@ -229,25 +247,71 @@ impl LockrIntegration { ]; for header_name in &headers_to_copy { - if let Some(value) = from.get_header(header_name) { - to.set_header(header_name, value); + if let Some(value) = from.get(header_name) { + to.insert(header_name, value.clone()); } } // Always strip consent cookies — consent travels through the OpenRTB body - compat::forward_fastly_cookie_header(from, to, true); + self.copy_cookie_header(from, to)?; // Use origin override if configured, otherwise forward original - let origin = self - .config - .origin_override - .as_deref() - .or_else(|| from.get_header_str(header::ORIGIN)); + let origin = self.config.origin_override.as_deref().or_else(|| { + from.get(header::ORIGIN) + .and_then(|value| value.to_str().ok()) + }); if let Some(origin) = origin { - to.set_header(header::ORIGIN, origin); + to.insert( + header::ORIGIN, + HeaderValue::from_str(origin) + .expect("should build origin header from valid configuration"), + ); + } + + for (name, value) in from { + let name_str = name.as_str(); + if name_str.starts_with("x-") && !INTERNAL_HEADERS.contains(&name_str) + { + to.append(name.clone(), value.clone()); + } } - compat::copy_fastly_custom_headers(from, to); + Ok(()) + } + + fn copy_cookie_header( + &self, + from: &HeaderMap, + to: &mut HeaderMap, + ) -> Result<(), Report> { + let Some(cookie_value) = from.get(header::COOKIE) else { + return Ok(()); + }; + + match cookie_value.to_str() { + Ok(value) => { + let stripped = strip_cookies(value, CONSENT_COOKIE_NAMES); + if stripped.is_empty() { + return Ok(()); + } + + let cookie_header = HeaderValue::from_str(&stripped) + .change_context(Self::error("Failed to rebuild stripped cookie header"))?; + to.insert(header::COOKIE, cookie_header); + } + Err(_) => { + to.insert(header::COOKIE, cookie_value.clone()); + } + } + + Ok(()) + } + + fn backend_name_for_url( + services: &RuntimeServices, + target_url: &str, + ) -> Result> { + ensure_integration_backend(services, target_url, LOCKR_INTEGRATION_ID) } } @@ -304,15 +368,15 @@ impl IntegrationProxy for LockrIntegration { async fn handle( &self, settings: &Settings, - _services: &RuntimeServices, - req: Request, - ) -> Result> { - let path = req.get_path(); + services: &RuntimeServices, + req: http::Request, + ) -> Result, Report> { + let path = req.uri().path().to_string(); if path == "/integrations/lockr/sdk" { - self.handle_sdk_serving(settings, req).await + self.handle_sdk_serving(settings, services).await } else if path.starts_with("/integrations/lockr/api/") { - self.handle_api_proxy(settings, req).await + self.handle_api_proxy(settings, services, req).await } else { Err(Report::new(Self::error(format!( "Unknown Lockr route: {}", @@ -376,9 +440,13 @@ fn default_rewrite_sdk() -> bool { #[cfg(test)] mod tests { + use std::sync::Arc; + use super::*; + use edgezero_core::http::Method as HttpMethod; use serde_json::json; + use crate::platform::test_support::{build_services_with_http_client, StubHttpClient}; use crate::test_support::tests::create_test_settings; fn test_config() -> LockrConfig { @@ -490,6 +558,36 @@ mod tests { ); } + #[test] + fn lockr_proxy_uses_platform_http_client() { + let stub = Arc::new(StubHttpClient::new()); + stub.push_response(200, b"ok".to_vec()); + let services = build_services_with_http_client( + Arc::clone(&stub) as Arc + ); + let settings = create_test_settings(); + let integration = LockrIntegration::new(test_config()); + let req = http::Request::builder() + .method(HttpMethod::GET) + .uri("https://publisher.example/integrations/lockr/api/publisher/app/v1/identityLockr/settings") + .body(EdgeBody::empty()) + .expect("should build request"); + + let response = futures::executor::block_on(integration.handle(&settings, &services, req)) + .expect("should proxy request"); + + assert_eq!( + response.status(), + http::StatusCode::OK, + "should return stubbed response" + ); + assert_eq!( + stub.recorded_backend_names().len(), + 1, + "should route one outbound request through PlatformHttpClient" + ); + } + #[test] fn test_api_path_extraction_preserves_casing() { let test_cases = [ diff --git a/crates/trusted-server-core/src/integrations/mod.rs b/crates/trusted-server-core/src/integrations/mod.rs index 92f30219..11665f16 100644 --- a/crates/trusted-server-core/src/integrations/mod.rs +++ b/crates/trusted-server-core/src/integrations/mod.rs @@ -1,8 +1,12 @@ //! Integration module registry and sample implementations. -use error_stack::Report; +use edgezero_core::body::Body as EdgeBody; +use error_stack::{Report, ResultExt}; +use futures::StreamExt as _; +use url::Url; use crate::error::TrustedServerError; +use crate::platform::{PlatformBackendSpec, RuntimeServices}; use crate::settings::Settings; pub mod adserver_mock; @@ -26,6 +30,76 @@ pub use registry::{ IntegrationRegistry, IntegrationScriptContext, IntegrationScriptRewriter, ScriptRewriteAction, }; +/// Registers or retrieves a platform backend for the given URL. +/// +/// Parses `url`, builds a [`PlatformBackendSpec`] with TLS enabled and a +/// 15-second first-byte timeout, and delegates to +/// [`crate::platform::PlatformBackend::ensure`]. +/// +/// # Errors +/// +/// Returns an error when `url` cannot be parsed, is missing a host, or the +/// backend registration fails. +pub(crate) fn ensure_integration_backend( + services: &RuntimeServices, + url: &str, + integration: &'static str, +) -> Result> { + let parsed = Url::parse(url).change_context(TrustedServerError::Integration { + integration: integration.to_string(), + message: "Invalid upstream URL".to_string(), + })?; + + services + .backend() + .ensure(&PlatformBackendSpec { + scheme: parsed.scheme().to_string(), + host: parsed + .host_str() + .ok_or_else(|| { + Report::new(TrustedServerError::Integration { + integration: integration.to_string(), + message: "Upstream URL missing host".to_string(), + }) + })? + .to_string(), + port: parsed.port(), + certificate_check: true, + first_byte_timeout: std::time::Duration::from_secs(15), + }) + .change_context(TrustedServerError::Integration { + integration: integration.to_string(), + message: "Failed to register backend".to_string(), + }) +} + +/// Drains an [`EdgeBody`] into a byte vector. +/// +/// # Errors +/// +/// Returns an error when a streaming body chunk cannot be read. +pub(crate) async fn collect_body( + body: EdgeBody, + integration: &'static str, +) -> Result, Report> { + match body { + EdgeBody::Once(bytes) => Ok(bytes.to_vec()), + EdgeBody::Stream(mut stream) => { + let mut body_bytes = Vec::new(); + while let Some(chunk_result) = stream.next().await { + let chunk = chunk_result.map_err(|error| { + Report::new(TrustedServerError::Integration { + integration: integration.to_string(), + message: format!("Failed to read response body: {error}"), + }) + })?; + body_bytes.extend_from_slice(&chunk); + } + Ok(body_bytes) + } + } +} + type IntegrationBuilder = fn(&Settings) -> Result, Report>; diff --git a/crates/trusted-server-core/src/integrations/permutive.rs b/crates/trusted-server-core/src/integrations/permutive.rs index 41d7e3bf..d6f7e255 100644 --- a/crates/trusted-server-core/src/integrations/permutive.rs +++ b/crates/trusted-server-core/src/integrations/permutive.rs @@ -6,20 +6,20 @@ use std::sync::Arc; use async_trait::async_trait; +use edgezero_core::body::Body as EdgeBody; use error_stack::{Report, ResultExt}; -use fastly::http::{header, Method, StatusCode}; -use fastly::{Request, Response}; +use http::header::{self, HeaderMap, HeaderValue}; +use http::{Method, StatusCode}; use serde::Deserialize; use validator::Validate; -use crate::backend::BackendConfig; -use crate::compat; +use crate::constants::INTERNAL_HEADERS; use crate::error::TrustedServerError; use crate::integrations::{ - AttributeRewriteAction, IntegrationAttributeContext, IntegrationAttributeRewriter, - IntegrationEndpoint, IntegrationProxy, IntegrationRegistration, + ensure_integration_backend, AttributeRewriteAction, IntegrationAttributeContext, + IntegrationAttributeRewriter, IntegrationEndpoint, IntegrationProxy, IntegrationRegistration, }; -use crate::platform::RuntimeServices; +use crate::platform::{PlatformHttpRequest, RuntimeServices}; use crate::settings::{IntegrationConfig, Settings}; const PERMUTIVE_INTEGRATION_ID: &str = "permutive"; @@ -106,8 +106,8 @@ impl PermutiveIntegration { async fn handle_sdk_serving( &self, _settings: &Settings, - _req: Request, - ) -> Result> { + services: &RuntimeServices, + ) -> Result, Report> { log::info!("Handling Permutive SDK request"); let sdk_url = self.sdk_url(); @@ -116,33 +116,39 @@ impl PermutiveIntegration { // TODO: Check KV store cache first (future enhancement) // Fetch SDK from Permutive CDN - let mut permutive_req = Request::new(Method::GET, &sdk_url); - permutive_req.set_header(header::USER_AGENT, "TrustedServer/1.0"); - permutive_req.set_header(header::ACCEPT, "application/javascript, */*"); - - let backend_name = BackendConfig::from_url(&sdk_url, true) + let permutive_req = http::Request::builder() + .method(Method::GET) + .uri(&sdk_url) + .header(header::USER_AGENT, "TrustedServer/1.0") + .header(header::ACCEPT, "application/javascript, */*") + .body(EdgeBody::empty()) + .change_context(Self::error("Failed to build Permutive SDK request"))?; + + let backend_name = Self::backend_name_for_url(services, &sdk_url) .change_context(Self::error("Failed to determine backend for SDK fetch"))?; - let mut permutive_response = - permutive_req - .send(backend_name) - .change_context(Self::error(format!( - "Failed to fetch Permutive SDK from {}", - sdk_url - )))?; + let permutive_response = services + .http_client() + .send(PlatformHttpRequest::new(permutive_req, backend_name)) + .await + .change_context(Self::error(format!( + "Failed to fetch Permutive SDK from {}", + sdk_url + )))? + .response; - if !permutive_response.get_status().is_success() { + if !permutive_response.status().is_success() { log::error!( "Permutive SDK fetch failed with status: {}", - permutive_response.get_status() + permutive_response.status() ); return Err(Report::new(Self::error(format!( "Permutive SDK returned error status: {}", - permutive_response.get_status() + permutive_response.status() )))); } - let sdk_body = permutive_response.take_body_bytes(); + let sdk_body = permutive_response.into_body().into_bytes(); log::info!( "Successfully fetched Permutive SDK: {} bytes", sdk_body.len() @@ -150,335 +156,96 @@ impl PermutiveIntegration { // TODO: Cache in KV store (future enhancement) - Ok(Response::from_status(StatusCode::OK) - .with_header( + http::Response::builder() + .status(StatusCode::OK) + .header( header::CONTENT_TYPE, "application/javascript; charset=utf-8", ) - .with_header( + .header( header::CACHE_CONTROL, format!("public, max-age={}", self.config.cache_ttl_seconds), ) - .with_header("X-Permutive-SDK-Proxy", "true") - .with_header("X-SDK-Source", &sdk_url) - .with_body(sdk_body)) + .header("X-Permutive-SDK-Proxy", "true") + .header("X-SDK-Source", &sdk_url) + .body(EdgeBody::from(sdk_body.to_vec())) + .change_context(Self::error("Failed to build Permutive SDK response")) } - /// Handle API proxy - forward requests to api.permutive.com. - async fn handle_api_proxy( + async fn forward_proxy_request( &self, - _settings: &Settings, - mut req: Request, - ) -> Result> { - let original_path = req.get_path(); - let method = req.get_method(); + services: &RuntimeServices, + req: http::Request, + route_prefix: &str, + upstream_base: &str, + route_name: &str, + ) -> Result, Report> { + let (parts, body) = req.into_parts(); + let original_path = parts.uri.path().to_string(); + let method = parts.method.clone(); log::info!( - "Proxying Permutive API request: {} {}", + "Proxying {} request: {} {}", + route_name, method, original_path ); - // Extract path after /integrations/permutive/api - let api_path = original_path - .strip_prefix("/integrations/permutive/api") - .ok_or_else(|| Self::error(format!("Invalid Permutive API path: {}", original_path)))?; + let upstream_path = original_path.strip_prefix(route_prefix).ok_or_else(|| { + Self::error(format!("Invalid {} path: {}", route_name, original_path)) + })?; - // Build full target URL with query parameters - let query = req - .get_url() + let query = parts + .uri .query() .map(|q| format!("?{}", q)) .unwrap_or_default(); - let target_url = format!("{}{}{}", self.config.api_endpoint, api_path, query); - - log::info!("Forwarding to Permutive API: {}", target_url); - - // Create new request - let mut target_req = Request::new(method.clone(), &target_url); + let target_url = format!("{}{}{}", upstream_base, upstream_path, query); - // Copy headers - self.copy_request_headers(&req, &mut target_req); + log::info!("Forwarding {} to {}", route_name, target_url); - // Copy body for POST/PUT/PATCH - if matches!(method, &Method::POST | &Method::PUT | &Method::PATCH) { - let body = req.take_body(); - target_req.set_body(body); - } - - // Get backend and forward - let backend_name = BackendConfig::from_url(&self.config.api_endpoint, true) - .change_context(Self::error("Failed to determine backend for API proxy"))?; + let request_body = if matches!(method, Method::POST | Method::PUT | Method::PATCH) { + body + } else { + EdgeBody::empty() + }; - let response = target_req - .send(backend_name) + let mut target_req = http::Request::builder() + .method(method) + .uri(&target_url) + .body(request_body) .change_context(Self::error(format!( - "Failed to forward request to {}", - target_url + "Failed to build {} proxy request", + route_name )))?; + self.copy_request_headers(&parts.headers, target_req.headers_mut()); - log::info!( - "Permutive API responded with status: {}", - response.get_status() - ); - - Ok(response) - } - - /// Handle Secure Signals proxy - forward requests to secure-signals.permutive.app. - async fn handle_secure_signals_proxy( - &self, - _settings: &Settings, - mut req: Request, - ) -> Result> { - let original_path = req.get_path(); - let method = req.get_method(); - - log::info!( - "Proxying Permutive Secure Signals request: {} {}", - method, - original_path - ); - - // Extract path after /integrations/permutive/secure-signal - let signal_path = original_path - .strip_prefix("/integrations/permutive/secure-signal") - .ok_or_else(|| { - Self::error(format!( - "Invalid Permutive Secure Signals path: {}", - original_path - )) - })?; - - // Build full target URL with query parameters - let query = req - .get_url() - .query() - .map(|q| format!("?{}", q)) - .unwrap_or_default(); - let target_url = format!( - "{}{}{}", - self.config.secure_signals_endpoint, signal_path, query - ); - - log::info!("Forwarding to Permutive Secure Signals: {}", target_url); - - // Create new request - let mut target_req = Request::new(method.clone(), &target_url); - - // Copy headers - self.copy_request_headers(&req, &mut target_req); - - // Copy body for POST/PUT/PATCH - if matches!(method, &Method::POST | &Method::PUT | &Method::PATCH) { - let body = req.take_body(); - target_req.set_body(body); - } - - // Get backend and forward - let backend_name = BackendConfig::from_url(&self.config.secure_signals_endpoint, true) - .change_context(Self::error( - "Failed to determine backend for Secure Signals proxy", + let backend_name = + Self::backend_name_for_url(services, upstream_base).change_context(Self::error( + format!("Failed to determine backend for {} proxy", route_name), ))?; - let response = target_req - .send(backend_name) + let response = services + .http_client() + .send(PlatformHttpRequest::new(target_req, backend_name)) + .await .change_context(Self::error(format!( "Failed to forward request to {}", target_url - )))?; + )))? + .response; log::info!( - "Permutive Secure Signals responded with status: {}", - response.get_status() - ); - - Ok(response) - } - - /// Handle Events proxy - forward requests to events.permutive.app. - async fn handle_events_proxy( - &self, - _settings: &Settings, - mut req: Request, - ) -> Result> { - let original_path = req.get_path(); - let method = req.get_method(); - - log::info!( - "Proxying Permutive Events request: {} {}", - method, - original_path - ); - - // Extract path after /integrations/permutive/events - let events_path = original_path - .strip_prefix("/integrations/permutive/events") - .ok_or_else(|| { - Self::error(format!("Invalid Permutive Events path: {}", original_path)) - })?; - - // Build full target URL with query parameters - let query = req - .get_url() - .query() - .map(|q| format!("?{}", q)) - .unwrap_or_default(); - let target_url = format!("https://events.permutive.app{}{}", events_path, query); - - log::info!("Forwarding to Permutive Events: {}", target_url); - - // Create new request - let mut target_req = Request::new(method.clone(), &target_url); - - // Copy headers - self.copy_request_headers(&req, &mut target_req); - - // Copy body for POST/PUT/PATCH - if matches!(method, &Method::POST | &Method::PUT | &Method::PATCH) { - let body = req.take_body(); - target_req.set_body(body); - } - - // Get backend and forward - let backend_name = BackendConfig::from_url("https://events.permutive.app", true) - .change_context(Self::error("Failed to determine backend for Events proxy"))?; - - let response = target_req - .send(backend_name) - .change_context(Self::error(format!( - "Failed to forward request to {}", - target_url - )))?; - - log::info!( - "Permutive Events responded with status: {}", - response.get_status() - ); - - Ok(response) - } - - /// Handle Sync proxy - forward requests to sync.permutive.com. - async fn handle_sync_proxy( - &self, - _settings: &Settings, - mut req: Request, - ) -> Result> { - let original_path = req.get_path(); - let method = req.get_method(); - - log::info!( - "Proxying Permutive Sync request: {} {}", - method, - original_path - ); - - // Extract path after /integrations/permutive/sync - let sync_path = original_path - .strip_prefix("/integrations/permutive/sync") - .ok_or_else(|| { - Self::error(format!("Invalid Permutive Sync path: {}", original_path)) - })?; - - // Build full target URL with query parameters - let query = req - .get_url() - .query() - .map(|q| format!("?{}", q)) - .unwrap_or_default(); - let target_url = format!("https://sync.permutive.com{}{}", sync_path, query); - - log::info!("Forwarding to Permutive Sync: {}", target_url); - - // Create new request - let mut target_req = Request::new(method.clone(), &target_url); - - // Copy headers - self.copy_request_headers(&req, &mut target_req); - - // Copy body for POST/PUT/PATCH - if matches!(method, &Method::POST | &Method::PUT | &Method::PATCH) { - let body = req.take_body(); - target_req.set_body(body); - } - - // Get backend and forward - let backend_name = BackendConfig::from_url("https://sync.permutive.com", true) - .change_context(Self::error("Failed to determine backend for Sync proxy"))?; - - let response = target_req - .send(backend_name) - .change_context(Self::error(format!( - "Failed to forward request to {}", - target_url - )))?; - - log::info!( - "Permutive Sync responded with status: {}", - response.get_status() - ); - - Ok(response) - } - - /// Handle CDN proxy - forward requests to cdn.permutive.com. - async fn handle_cdn_proxy( - &self, - _settings: &Settings, - req: Request, - ) -> Result> { - let original_path = req.get_path(); - let method = req.get_method(); - - log::info!( - "Proxying Permutive CDN request: {} {}", - method, - original_path - ); - - // Extract path after /integrations/permutive/cdn - let cdn_path = original_path - .strip_prefix("/integrations/permutive/cdn") - .ok_or_else(|| Self::error(format!("Invalid Permutive CDN path: {}", original_path)))?; - - // Build full target URL with query parameters - let query = req - .get_url() - .query() - .map(|q| format!("?{}", q)) - .unwrap_or_default(); - let target_url = format!("https://cdn.permutive.com{}{}", cdn_path, query); - - log::info!("Forwarding to Permutive CDN: {}", target_url); - - // Create new request - let mut target_req = Request::new(method.clone(), &target_url); - - // Copy headers - self.copy_request_headers(&req, &mut target_req); - - // Get backend and forward - let backend_name = BackendConfig::from_url("https://cdn.permutive.com", true) - .change_context(Self::error("Failed to determine backend for CDN proxy"))?; - - let response = target_req - .send(backend_name) - .change_context(Self::error(format!( - "Failed to forward request to {}", - target_url - )))?; - - log::info!( - "Permutive CDN responded with status: {}", - response.get_status() + "{} responded with status: {}", + route_name, + response.status() ); Ok(response) } /// Copy relevant request headers for proxying. - fn copy_request_headers(&self, from: &Request, to: &mut Request) { + fn copy_request_headers(&self, from: &HeaderMap, to: &mut HeaderMap) { let headers_to_copy = [ header::CONTENT_TYPE, header::ACCEPT, @@ -489,13 +256,26 @@ impl PermutiveIntegration { ]; for header_name in &headers_to_copy { - if let Some(value) = from.get_header(header_name) { - to.set_header(header_name, value); + if let Some(value) = from.get(header_name) { + to.insert(header_name, value.clone()); } } // Copy any X-* custom headers, skipping TS-internal headers - compat::copy_fastly_custom_headers(from, to); + for (name, value) in from { + let name_str = name.as_str(); + if name_str.starts_with("x-") && !INTERNAL_HEADERS.contains(&name_str) + { + to.append(name.clone(), value.clone()); + } + } + } + + fn backend_name_for_url( + services: &RuntimeServices, + target_url: &str, + ) -> Result> { + ensure_integration_backend(services, target_url, PERMUTIVE_INTEGRATION_ID) } } @@ -561,23 +341,58 @@ impl IntegrationProxy for PermutiveIntegration { async fn handle( &self, settings: &Settings, - _services: &RuntimeServices, - req: Request, - ) -> Result> { - let path = req.get_path(); + services: &RuntimeServices, + req: http::Request, + ) -> Result, Report> { + let path = req.uri().path().to_string(); if path.starts_with("/integrations/permutive/api/") { - self.handle_api_proxy(settings, req).await + self.forward_proxy_request( + services, + req, + "/integrations/permutive/api", + &self.config.api_endpoint, + "Permutive API", + ) + .await } else if path.starts_with("/integrations/permutive/secure-signal/") { - self.handle_secure_signals_proxy(settings, req).await + self.forward_proxy_request( + services, + req, + "/integrations/permutive/secure-signal", + &self.config.secure_signals_endpoint, + "Permutive Secure Signals", + ) + .await } else if path.starts_with("/integrations/permutive/events/") { - self.handle_events_proxy(settings, req).await + self.forward_proxy_request( + services, + req, + "/integrations/permutive/events", + "https://events.permutive.app", + "Permutive Events", + ) + .await } else if path.starts_with("/integrations/permutive/sync/") { - self.handle_sync_proxy(settings, req).await + self.forward_proxy_request( + services, + req, + "/integrations/permutive/sync", + "https://sync.permutive.com", + "Permutive Sync", + ) + .await } else if path.starts_with("/integrations/permutive/cdn/") { - self.handle_cdn_proxy(settings, req).await + self.forward_proxy_request( + services, + req, + "/integrations/permutive/cdn", + "https://cdn.permutive.com", + "Permutive CDN", + ) + .await } else if path == "/integrations/permutive/sdk" { - self.handle_sdk_serving(settings, req).await + self.handle_sdk_serving(settings, services).await } else { Err(Report::new(Self::error(format!( "Unknown Permutive route: {}", @@ -641,7 +456,10 @@ fn default_rewrite_sdk() -> bool { #[cfg(test)] mod tests { + use std::sync::Arc; + use super::*; + use crate::platform::test_support::{build_services_with_http_client, StubHttpClient}; use crate::test_support::tests::create_test_settings; #[test] @@ -791,4 +609,43 @@ mod tests { "Should register SDK endpoint" ); } + + #[test] + fn permutive_proxy_uses_platform_http_client() { + let stub = Arc::new(StubHttpClient::new()); + stub.push_response(200, b"ok".to_vec()); + let services = build_services_with_http_client( + Arc::clone(&stub) as Arc + ); + let settings = create_test_settings(); + let integration = PermutiveIntegration::new(PermutiveConfig { + enabled: true, + organization_id: "myorg".to_string(), + workspace_id: "workspace-123".to_string(), + project_id: String::new(), + api_endpoint: default_api_endpoint(), + secure_signals_endpoint: default_secure_signals_endpoint(), + cache_ttl_seconds: 3600, + rewrite_sdk: true, + }); + let req = http::Request::builder() + .method(http::Method::GET) + .uri("https://publisher.example/integrations/permutive/api/v2.0/events") + .body(EdgeBody::empty()) + .expect("should build request"); + + let response = futures::executor::block_on(integration.handle(&settings, &services, req)) + .expect("should proxy request"); + + assert_eq!( + response.status(), + http::StatusCode::OK, + "should return stubbed response" + ); + assert_eq!( + stub.recorded_backend_names(), + vec!["stub-backend".to_string()], + "should route outbound request through PlatformHttpClient" + ); + } } diff --git a/crates/trusted-server-core/src/integrations/prebid.rs b/crates/trusted-server-core/src/integrations/prebid.rs index 5d39e775..d3131d11 100644 --- a/crates/trusted-server-core/src/integrations/prebid.rs +++ b/crates/trusted-server-core/src/integrations/prebid.rs @@ -3,9 +3,11 @@ use std::sync::Arc; use std::time::Duration; use async_trait::async_trait; +use edgezero_core::body::Body as EdgeBody; use error_stack::{Report, ResultExt}; -use fastly::http::{header, Method, StatusCode, Url}; -use fastly::{Request, Response}; +use http::{header, Method, StatusCode}; +use http::header::HeaderValue; +use url::Url; use serde::{Deserialize, Serialize}; use serde_json::Value as Json; use validator::Validate; @@ -15,8 +17,8 @@ use crate::auction::types::{ AuctionContext, AuctionRequest, AuctionResponse, Bid as AuctionBid, MediaType, }; use crate::backend::BackendConfig; -use crate::compat; use crate::consent_config::ConsentForwardingMode; +use crate::cookies::{strip_cookies, CONSENT_COOKIE_NAMES}; use crate::error::TrustedServerError; use crate::http_util::RequestInfo; use crate::integrations::{ @@ -29,7 +31,9 @@ use crate::openrtb::{ OpenRtbRequest, PrebidExt, PrebidImpExt, Publisher, Regs, RegsExt, RequestExt, Site, ToExt, TrustedServerExt, User, UserExt, }; -use crate::platform::RuntimeServices; +use crate::platform::{ + PlatformHttpRequest, PlatformPendingRequest, PlatformResponse, RuntimeServices, +}; use crate::request_signing::{RequestSigner, SigningParams, SIGNING_VERSION}; use crate::settings::{IntegrationConfig, Settings}; @@ -229,16 +233,22 @@ impl PrebidIntegration { false } - fn handle_script_handler(&self) -> Result> { + fn handle_script_handler( + &self, + ) -> Result, Report> { let body = "// Script overridden by Trusted Server\n"; - Ok(Response::from_status(StatusCode::OK) - .with_header( + http::Response::builder() + .status(StatusCode::OK) + .header( header::CONTENT_TYPE, "application/javascript; charset=utf-8", ) - .with_header(header::CACHE_CONTROL, "public, max-age=31536000") - .with_body(body)) + .header(header::CACHE_CONTROL, "public, max-age=31536000") + .body(EdgeBody::from(body)) + .change_context(TrustedServerError::Prebid { + message: "Failed to build Prebid script handler response".to_string(), + }) } } @@ -316,15 +326,20 @@ impl IntegrationProxy for PrebidIntegration { &self, _settings: &Settings, _services: &RuntimeServices, - req: Request, - ) -> Result> { - let path = req.get_path().to_string(); - let method = req.get_method().clone(); + req: http::Request, + ) -> Result, Report> { + let path = req.uri().path().to_string(); + let method = req.method().clone(); match method { // Serve empty JS for matching script patterns Method::GET if self.matches_script_pattern(&path) => self.handle_script_handler(), - _ => Ok(Response::from_status(StatusCode::NOT_FOUND).with_body("Not Found")), + _ => http::Response::builder() + .status(StatusCode::NOT_FOUND) + .body(EdgeBody::from("Not Found")) + .change_context(TrustedServerError::Prebid { + message: "Failed to build Prebid not found response".to_string(), + }), } } } @@ -426,8 +441,8 @@ fn expand_trusted_server_bidders( /// stripped from the `Cookie` header since consent travels exclusively /// through the `OpenRTB` body. fn copy_request_headers( - from: &Request, - to: &mut Request, + from: &http::Request, + to: &mut http::Request, consent_forwarding: ConsentForwardingMode, ) { let headers_to_copy = [ @@ -438,12 +453,37 @@ fn copy_request_headers( ]; for header_name in &headers_to_copy { - if let Some(value) = from.get_header(header_name) { - to.set_header(header_name, value); + if let Some(value) = from.headers().get(header_name) { + to.headers_mut().insert(header_name, value.clone()); } } - compat::forward_fastly_cookie_header(from, to, consent_forwarding.strips_consent_cookies()); + let Some(cookie_value) = from.headers().get(header::COOKIE) else { + return; + }; + + if !consent_forwarding.strips_consent_cookies() { + to.headers_mut() + .insert(header::COOKIE, cookie_value.clone()); + return; + } + + match cookie_value.to_str() { + Ok(value) => { + let stripped = strip_cookies(value, CONSENT_COOKIE_NAMES); + if stripped.is_empty() { + return; + } + + if let Ok(cookie_header) = HeaderValue::from_str(&stripped) { + to.headers_mut().insert(header::COOKIE, cookie_header); + } + } + Err(_) => { + to.headers_mut() + .insert(header::COOKIE, cookie_value.clone()); + } + } } /// Appends query parameters to a URL, handling both URLs with and without existing query strings. @@ -627,17 +667,24 @@ impl PrebidAuctionProvider { }); // Extract DNT header and Accept-Language from the original request - let dnt = context.request.get_header_str("DNT").and_then(|v| { - if v.trim() == "1" { - Some(true) - } else { - None - } - }); + let dnt = context + .request + .headers() + .get("DNT") + .and_then(|value| value.to_str().ok()) + .and_then(|value| { + if value.trim() == "1" { + Some(true) + } else { + None + } + }); let language = context .request - .get_header_str(header::ACCEPT_LANGUAGE) + .headers() + .get(header::ACCEPT_LANGUAGE) + .and_then(|value| value.to_str().ok()) .and_then(|v| { // Extract the primary ISO-639 language tag (e.g., "en" from // "en-US,en;q=0.9"). Strip the region subtag so bidders get a @@ -710,8 +757,7 @@ impl PrebidAuctionProvider { let regs = Self::build_regs(consent_ctx); // Build ext object - let http_req = compat::from_fastly_request_ref(context.request); - let request_info = RequestInfo::from_request(&http_req, context.client_info); + let request_info = RequestInfo::from_request(context.request, context.client_info); let (version, signature, kid, ts) = signer .map(|(s, sig, params)| { ( @@ -744,7 +790,9 @@ impl PrebidAuctionProvider { // Extract Referer header for site.ref let referer = context .request - .get_header_str(header::REFERER) + .headers() + .get(header::REFERER) + .and_then(|value| value.to_str().ok()) .map(std::string::ToString::to_string); let tmax = to_openrtb_i32(self.config.timeout_ms, "tmax", "request"); @@ -992,16 +1040,17 @@ impl PrebidAuctionProvider { } } +#[async_trait(?Send)] impl AuctionProvider for PrebidAuctionProvider { fn provider_name(&self) -> &'static str { "prebid" } - fn request_bids( + async fn request_bids( &self, request: &AuctionRequest, context: &AuctionContext<'_>, - ) -> Result> { + ) -> Result> { log::info!("Prebid: requesting bids for {} slots", request.slots.len()); // Create signer and compute signature if request signing is enabled @@ -1009,8 +1058,7 @@ impl AuctionProvider for PrebidAuctionProvider { &context.settings.request_signing { if request_signing_config.enabled { - let http_req = compat::from_fastly_request_ref(context.request); - let request_info = RequestInfo::from_request(&http_req, context.client_info); + let request_info = RequestInfo::from_request(context.request, context.client_info); let signer = RequestSigner::from_services(context.services)?; let params = SigningParams::new(request.id.clone(), request_info.host, request_info.scheme); @@ -1057,21 +1105,27 @@ impl AuctionProvider for PrebidAuctionProvider { } // Create HTTP request - let mut pbs_req = Request::new( - Method::POST, - format!("{}/openrtb2/auction", self.config.server_url), - ); + let mut pbs_req = http::Request::builder() + .method(http::Method::POST) + .uri(format!("{}/openrtb2/auction", self.config.server_url)) + .body(EdgeBody::empty()) + .change_context(TrustedServerError::Prebid { + message: "Failed to build Prebid request".to_string(), + })?; copy_request_headers( context.request, &mut pbs_req, self.config.consent_forwarding, ); - pbs_req - .set_body_json(&openrtb) - .change_context(TrustedServerError::Prebid { - message: "Failed to set request body".to_string(), - })?; + let pbs_body = serde_json::to_vec(&openrtb).change_context(TrustedServerError::Prebid { + message: "Failed to serialize Prebid request body".to_string(), + })?; + pbs_req.headers_mut().insert( + header::CONTENT_TYPE, + HeaderValue::from_static("application/json"), + ); + *pbs_req.body_mut() = EdgeBody::from(pbs_body); // Send request asynchronously with auction-scoped timeout let backend_name = BackendConfig::from_url_with_first_byte_timeout( @@ -1079,29 +1133,31 @@ impl AuctionProvider for PrebidAuctionProvider { true, Duration::from_millis(u64::from(context.timeout_ms)), )?; - let pending = - pbs_req - .send_async(backend_name) - .change_context(TrustedServerError::Prebid { - message: "Failed to send async request to Prebid Server".to_string(), - })?; + let pending = context + .services + .http_client() + .send_async(PlatformHttpRequest::new(pbs_req, backend_name)) + .await + .change_context(TrustedServerError::Prebid { + message: "Failed to send async request to Prebid Server".to_string(), + })?; Ok(pending) } fn parse_response( &self, - mut response: fastly::Response, + response: PlatformResponse, response_time_ms: u64, ) -> Result> { + let response = response.response; + let status = response.status(); + // Parse response - let body_bytes = response.take_body_bytes(); + let body_bytes = response.into_body().into_bytes(); - if !response.get_status().is_success() { - log::warn!( - "Prebid returned non-success status: {}", - response.get_status(), - ); + if !status.is_success() { + log::warn!("Prebid returned non-success status: {}", status,); if log::log_enabled!(log::Level::Trace) { let body_preview = String::from_utf8_lossy(&body_bytes); log::trace!( @@ -1213,6 +1269,8 @@ pub fn register_auction_provider( #[cfg(test)] mod tests { + use std::sync::Arc; + use super::*; use crate::auction::types::{ AdFormat, AdSlot, AuctionContext, AuctionRequest, DeviceInfo, PublisherInfo, UserInfo, @@ -1223,11 +1281,11 @@ mod tests { use crate::integrations::{ AttributeRewriteAction, IntegrationDocumentState, IntegrationRegistry, }; + use crate::platform::test_support::{build_services_with_http_client, StubHttpClient}; use crate::settings::Settings; use crate::streaming_processor::{Compression, PipelineConfig, StreamingPipeline}; use crate::test_support::tests::crate_test_settings_str; - use fastly::http::Method; - use fastly::Request; + use http::Method; use serde_json::json; use std::collections::HashMap; use std::io::Cursor; @@ -1282,9 +1340,56 @@ mod tests { } } + fn build_test_request() -> http::Request { + http::Request::builder() + .method(http::Method::GET) + .uri("https://pub.example/auction") + .body(EdgeBody::empty()) + .expect("should build request") + } + + #[test] + fn prebid_provider_uses_platform_http_client_for_bid_request() { + let stub = Arc::new(StubHttpClient::new()); + stub.push_response(200, br#"{"seatbid":[]}"#.to_vec()); + let services = build_services_with_http_client( + Arc::clone(&stub) as Arc + ); + let settings = make_settings(); + let provider = PrebidAuctionProvider::new(base_config()); + let auction_request = create_test_auction_request(); + let http_req = http::Request::builder() + .method(http::Method::POST) + .uri("https://publisher.example/auction") + .body(EdgeBody::empty()) + .expect("should build request"); + let context = AuctionContext { + settings: &settings, + request: &http_req, + client_info: services.client_info(), + timeout_ms: 500, + provider_responses: None, + services: &services, + }; + + let pending = + futures::executor::block_on(provider.request_bids(&auction_request, &context)) + .expect("should start request"); + + assert!( + pending.backend_name().is_some(), + "should preserve backend correlation" + ); + assert_eq!( + stub.recorded_backend_names().len(), + 1, + "should launch one upstream request through PlatformHttpClient" + ); + } + fn create_test_auction_context<'a>( settings: &'a Settings, - request: &'a Request, + request: &'a http::Request, client_info: &'a crate::platform::ClientInfo, ) -> AuctionContext<'a> { use crate::platform::test_support::noop_services; @@ -1535,19 +1640,24 @@ server_url = "https://prebid.example" .handle_script_handler() .expect("should return response"); - assert_eq!(response.get_status(), StatusCode::OK); + assert_eq!(response.status(), StatusCode::OK); let content_type = response - .get_header_str(header::CONTENT_TYPE) + .headers() + .get(header::CONTENT_TYPE) + .and_then(|value| value.to_str().ok()) .expect("should have content-type"); assert_eq!(content_type, "application/javascript; charset=utf-8"); let cache_control = response - .get_header_str(header::CACHE_CONTROL) + .headers() + .get(header::CACHE_CONTROL) + .and_then(|value| value.to_str().ok()) .expect("should have cache-control"); assert!(cache_control.contains("max-age=31536000")); - let body = response.into_body_str(); + let body = String::from_utf8(response.into_body().into_bytes().to_vec()) + .expect("should parse script body as utf-8"); assert!(body.contains("// Script overridden by Trusted Server")); } @@ -1709,7 +1819,7 @@ server_url = "https://prebid.example" let provider = PrebidAuctionProvider::new(config); let auction_request = create_test_auction_request(); let settings = make_settings(); - let request = Request::get("https://pub.example/auction"); + let request = build_test_request(); let context = create_test_auction_context( &settings, &request, @@ -1764,7 +1874,7 @@ server_url = "https://prebid.example" let provider = PrebidAuctionProvider::new(config); let auction_request = create_test_auction_request(); let settings = make_settings(); - let request = Request::get("https://pub.example/auction"); + let request = build_test_request(); let context = create_test_auction_context( &settings, &request, @@ -1801,7 +1911,7 @@ server_url = "https://prebid.example" geo: None, }); let settings = make_settings(); - let request = Request::get("https://pub.example/auction"); + let request = build_test_request(); let context = create_test_auction_context( &settings, &request, @@ -1836,7 +1946,7 @@ server_url = "https://prebid.example" let provider = PrebidAuctionProvider::new(base_config()); let auction_request = create_test_auction_request(); let settings = make_settings(); - let request = Request::get("https://pub.example/auction"); + let request = build_test_request(); let context = create_test_auction_context( &settings, &request, @@ -1894,7 +2004,7 @@ server_url = "https://prebid.example" auction_request.slots[0].floor_price = Some(1.5); let settings = make_settings(); - let request = Request::get("https://pub.example/auction"); + let request = build_test_request(); let context = create_test_auction_context( &settings, &request, @@ -1922,7 +2032,7 @@ server_url = "https://prebid.example" let auction_request = create_test_auction_request(); // floor_price is None let settings = make_settings(); - let request = Request::get("https://pub.example/auction"); + let request = build_test_request(); let context = create_test_auction_context( &settings, &request, @@ -1949,7 +2059,7 @@ server_url = "https://prebid.example" let auction_request = create_test_auction_request(); let settings = make_settings(); - let request = Request::get("https://pub.example/auction"); + let request = build_test_request(); let context = create_test_auction_context( &settings, &request, @@ -1996,7 +2106,7 @@ server_url = "https://prebid.example" }); let settings = make_settings(); - let request = Request::get("https://pub.example/auction"); + let request = build_test_request(); let context = create_test_auction_context( &settings, &request, @@ -2049,7 +2159,7 @@ server_url = "https://prebid.example" }); let settings = make_settings(); - let request = Request::get("https://pub.example/auction"); + let request = build_test_request(); let context = create_test_auction_context( &settings, &request, @@ -2090,7 +2200,7 @@ server_url = "https://prebid.example" }); let settings = make_settings(); - let request = Request::get("https://pub.example/auction"); + let request = build_test_request(); let context = create_test_auction_context( &settings, &request, @@ -2121,7 +2231,7 @@ server_url = "https://prebid.example" // No device/geo let settings = make_settings(); - let request = Request::get("https://pub.example/auction"); + let request = build_test_request(); let context = create_test_auction_context( &settings, &request, @@ -2147,7 +2257,7 @@ server_url = "https://prebid.example" let auction_request = create_test_auction_request(); // consent=None, no geo let settings = make_settings(); - let request = Request::get("https://pub.example/auction"); + let request = build_test_request(); let context = create_test_auction_context( &settings, &request, @@ -2176,7 +2286,7 @@ server_url = "https://prebid.example" }); let settings = make_settings(); - let request = Request::get("https://pub.example/auction"); + let request = build_test_request(); let context = create_test_auction_context( &settings, &request, @@ -2399,8 +2509,10 @@ server_url = "https://prebid.example" }); let settings = make_settings(); - let mut request = Request::get("https://pub.example/auction"); - request.set_header("DNT", "1"); + let mut request = build_test_request(); + request + .headers_mut() + .insert("DNT", http::header::HeaderValue::from_static("1")); let context = create_test_auction_context( &settings, &request, @@ -2428,8 +2540,11 @@ server_url = "https://prebid.example" }); let settings = make_settings(); - let mut request = Request::get("https://pub.example/auction"); - request.set_header("Accept-Language", "en-US,en;q=0.9,fr;q=0.8"); + let mut request = build_test_request(); + request.headers_mut().insert( + "Accept-Language", + http::header::HeaderValue::from_static("en-US,en;q=0.9,fr;q=0.8"), + ); let context = create_test_auction_context( &settings, &request, @@ -2461,8 +2576,11 @@ server_url = "https://prebid.example" }); let settings = make_settings(); - let mut request = Request::get("https://pub.example/auction"); - request.set_header("Accept-Language", ""); + let mut request = build_test_request(); + request.headers_mut().insert( + "Accept-Language", + http::header::HeaderValue::from_static(""), + ); let context = create_test_auction_context( &settings, &request, @@ -2501,7 +2619,7 @@ server_url = "https://prebid.example" }]; let settings = make_settings(); - let request = Request::get("https://pub.example/auction"); + let request = build_test_request(); let context = create_test_auction_context( &settings, &request, @@ -2539,7 +2657,7 @@ server_url = "https://prebid.example" }); let settings = make_settings(); - let request = Request::get("https://pub.example/auction"); + let request = build_test_request(); let context = create_test_auction_context( &settings, &request, @@ -2575,7 +2693,7 @@ server_url = "https://prebid.example" let auction_request = create_test_auction_request(); let settings = make_settings(); - let request = Request::get("https://pub.example/auction"); + let request = build_test_request(); let context = create_test_auction_context( &settings, &request, @@ -2608,7 +2726,7 @@ server_url = "https://prebid.example" let auction_request = create_test_auction_request(); let settings = make_settings(); - let request = Request::get("https://pub.example/auction"); + let request = build_test_request(); let context = create_test_auction_context( &settings, &request, @@ -2638,7 +2756,7 @@ server_url = "https://prebid.example" }); let settings = make_settings(); - let request = Request::get("https://pub.example/auction"); + let request = build_test_request(); let context = create_test_auction_context( &settings, &request, @@ -2671,8 +2789,11 @@ server_url = "https://prebid.example" let auction_request = create_test_auction_request(); let settings = make_settings(); - let mut request = Request::get("https://pub.example/auction"); - request.set_header("Referer", "https://google.com/search?q=test"); + let mut request = build_test_request(); + request.headers_mut().insert( + "Referer", + http::header::HeaderValue::from_static("https://google.com/search?q=test"), + ); let context = create_test_auction_context( &settings, &request, @@ -2699,7 +2820,7 @@ server_url = "https://prebid.example" let auction_request = create_test_auction_request(); let settings = make_settings(); - let request = Request::get("https://pub.example/auction"); + let request = build_test_request(); let context = create_test_auction_context( &settings, &request, @@ -2860,7 +2981,11 @@ server_url = "https://prebid.example" use crate::platform::test_support::noop_services; let provider = PrebidAuctionProvider::new(config); let settings = make_settings(); - let fastly_req = Request::new(Method::POST, "https://example.com/auction"); + let http_req = http::Request::builder() + .method(http::Method::POST) + .uri("https://example.com/auction") + .body(EdgeBody::empty()) + .expect("should build request"); let client_info = crate::platform::ClientInfo { client_ip: None, tls_protocol: None, @@ -2869,7 +2994,7 @@ server_url = "https://prebid.example" let services = noop_services(); let context = AuctionContext { settings: &settings, - request: &fastly_req, + request: &http_req, client_info: &client_info, timeout_ms: 1000, provider_responses: None, diff --git a/crates/trusted-server-core/src/integrations/registry.rs b/crates/trusted-server-core/src/integrations/registry.rs index 8d6da6b6..0b648ae7 100644 --- a/crates/trusted-server-core/src/integrations/registry.rs +++ b/crates/trusted-server-core/src/integrations/registry.rs @@ -3,13 +3,13 @@ use std::collections::BTreeMap; use std::sync::{Arc, Mutex}; use async_trait::async_trait; +use edgezero_core::body::Body as EdgeBody; use error_stack::Report; -use fastly::http::Method; -use fastly::{Request, Response}; +use http::{HeaderValue, Method, Request, Response}; use matchit::Router; -use crate::compat; use crate::constants::HEADER_X_SYNTHETIC_ID; +use crate::cookies::set_synthetic_cookie; use crate::error::TrustedServerError; use crate::platform::RuntimeServices; use crate::settings::Settings; @@ -260,8 +260,8 @@ pub trait IntegrationProxy: Send + Sync { &self, settings: &Settings, services: &RuntimeServices, - req: Request, - ) -> Result>; + req: Request, + ) -> Result, Report>; /// Helper to create a namespaced GET endpoint. /// Automatically prefixes the path with `/integrations/{integration_name()}`. @@ -655,38 +655,45 @@ impl IntegrationRegistry { path: &str, settings: &Settings, services: &RuntimeServices, - mut req: Request, - ) -> Option>> { + mut req: Request, + ) -> Option, Report>> { if let Some((proxy, _)) = self.find_route(method, path) { - // Generate synthetic ID before consuming request - let http_req = compat::from_fastly_request_ref(&req); - let synthetic_id_result = get_or_generate_synthetic_id(settings, services, &http_req); + let synthetic_id_result = get_or_generate_synthetic_id(settings, services, &req); - // Set synthetic ID header on the request so integrations can read it. - // Header injection: Fastly's HeaderValue API rejects values containing \r, \n, or \0, - // so a crafted synthetic_id cannot inject additional request headers. if let Ok(ref synthetic_id) = synthetic_id_result { - req.set_header(HEADER_X_SYNTHETIC_ID, synthetic_id.as_str()); + match HeaderValue::from_str(synthetic_id) { + Ok(header_value) => { + req.headers_mut() + .insert(HEADER_X_SYNTHETIC_ID.clone(), header_value); + } + Err(error) => { + log::warn!( + "Failed to build x-synthetic-id request header value: {}", + error + ); + } + } } let mut result = proxy.handle(settings, services, req).await; - // Set synthetic ID header on successful responses if let Ok(ref mut response) = result { match synthetic_id_result { Ok(ref synthetic_id) => { - // Response-header injection: Fastly's HeaderValue API rejects values - // containing \r, \n, or \0, so a crafted synthetic_id cannot inject - // additional response headers. - response.set_header(HEADER_X_SYNTHETIC_ID, synthetic_id.as_str()); - // Cookie is intentionally not set when synthetic_id contains RFC 6265-illegal - // characters (e.g. a crafted x-synthetic-id header value). The response header - // is still emitted; only cookie persistence is skipped. - compat::set_fastly_synthetic_cookie( - settings, - response, - synthetic_id.as_str(), - ); + match HeaderValue::from_str(synthetic_id) { + Ok(header_value) => { + response + .headers_mut() + .insert(HEADER_X_SYNTHETIC_ID.clone(), header_value); + } + Err(error) => { + log::warn!( + "Failed to build x-synthetic-id response header value: {}", + error + ); + } + } + set_synthetic_cookie(settings, response, synthetic_id); } Err(ref err) => { log::warn!( @@ -952,6 +959,7 @@ impl IntegrationRegistry { mod tests { use super::*; use crate::platform::test_support::noop_services; + use http::{header, StatusCode}; // Mock integration proxy for testing struct MockProxy; @@ -970,9 +978,9 @@ mod tests { &self, _settings: &Settings, _services: &RuntimeServices, - _req: Request, - ) -> Result> { - Ok(Response::new()) + _req: Request, + ) -> Result, Report> { + Ok(Response::new(EdgeBody::empty())) } } @@ -988,6 +996,33 @@ mod tests { } } + struct EchoProxy; + + #[async_trait(?Send)] + impl IntegrationProxy for EchoProxy { + fn integration_name(&self) -> &'static str { + "echo" + } + + fn routes(&self) -> Vec { + vec![] + } + + async fn handle( + &self, + _settings: &Settings, + _services: &RuntimeServices, + req: http::Request, + ) -> Result, Report> { + let response = http::Response::builder() + .status(http::StatusCode::OK) + .header("x-echo-path", req.uri().path()) + .body(EdgeBody::empty()) + .expect("should build echo response"); + Ok(response) + } + } + #[test] fn default_html_post_processor_should_process_is_false() { let processor = NoopHtmlPostProcessor; @@ -1005,6 +1040,42 @@ mod tests { ); } + #[test] + fn handle_proxy_passes_http_request_without_fastly_round_trip() { + let settings = create_test_settings(); + let registry = IntegrationRegistry::from_routes(vec![( + http::Method::GET, + "/integrations/test/echo", + (Arc::new(EchoProxy) as Arc, "echo"), + )]); + let req = http::Request::builder() + .method(http::Method::GET) + .uri("https://test.example.com/integrations/test/echo?x=1") + .body(EdgeBody::empty()) + .expect("should build request"); + + let response = futures::executor::block_on(registry.handle_proxy( + &http::Method::GET, + "/integrations/test/echo", + &settings, + &noop_services(), + req, + )) + .expect("should match route") + .expect("proxy should succeed"); + + assert_eq!( + response.status(), + http::StatusCode::OK, + "should preserve HTTP status" + ); + assert_eq!( + response.headers()["x-echo-path"], + "/integrations/test/echo", + "should expose the HTTP request path to the proxy" + ); + } + #[test] fn test_exact_route_matching() { let routes = vec![( @@ -1226,7 +1297,6 @@ mod tests { // Tests for synthetic ID header on proxy responses use crate::constants::COOKIE_SYNTHETIC_ID; use crate::test_support::tests::create_test_settings; - use fastly::http::header; /// Mock proxy that returns a simple 200 OK response struct SyntheticIdTestProxy; @@ -1254,11 +1324,12 @@ mod tests { &self, _settings: &Settings, _services: &RuntimeServices, - _req: Request, - ) -> Result> { - // Return a simple response without the synthetic ID header. - // The registry's handle_proxy should add it. - Ok(Response::from_status(fastly::http::StatusCode::OK).with_body("test response")) + _req: Request, + ) -> Result, Report> { + Ok(Response::builder() + .status(StatusCode::OK) + .body(EdgeBody::from("test response")) + .expect("should build test response")) } } @@ -1276,7 +1347,11 @@ mod tests { let registry = IntegrationRegistry::from_routes(routes); // Create a request without a synthetic ID cookie - let req = Request::get("https://test-publisher.com/integrations/test/synthetic"); + let req = Request::builder() + .method(Method::GET) + .uri("https://test-publisher.com/integrations/test/synthetic") + .body(EdgeBody::empty()) + .expect("should build request"); // Call handle_proxy (uses futures executor in test environment) let result = futures::executor::block_on(registry.handle_proxy( @@ -1294,14 +1369,12 @@ mod tests { let response = response.unwrap(); - // Verify x-synthetic-id header is present assert!( - response.get_header(HEADER_X_SYNTHETIC_ID).is_some(), + response.headers().get(&HEADER_X_SYNTHETIC_ID).is_some(), "Response should have x-synthetic-id header" ); - // Verify Set-Cookie header is present (since no cookie was in request) - let set_cookie = response.get_header(header::SET_COOKIE); + let set_cookie = response.headers().get(header::SET_COOKIE); assert!( set_cookie.is_some(), "Response should have Set-Cookie header for synthetic_id" @@ -1328,8 +1401,15 @@ mod tests { )]; let registry = IntegrationRegistry::from_routes(routes); - let mut req = Request::get("https://test-publisher.com/integrations/test/synthetic"); - req.set_header(HEADER_X_SYNTHETIC_ID, "evil;injected"); + let mut req = Request::builder() + .method(Method::GET) + .uri("https://test-publisher.com/integrations/test/synthetic") + .body(EdgeBody::empty()) + .expect("should build request"); + req.headers_mut().insert( + HEADER_X_SYNTHETIC_ID.clone(), + HeaderValue::from_static("evil;injected"), + ); let result = futures::executor::block_on(registry.handle_proxy( &Method::GET, @@ -1342,13 +1422,15 @@ mod tests { let response = result.expect("handler should succeed"); let response_header = response - .get_header(HEADER_X_SYNTHETIC_ID) + .headers() + .get(&HEADER_X_SYNTHETIC_ID) .expect("response should have x-synthetic-id header") .to_str() .expect("header should be valid UTF-8") .to_string(); let cookie_header = response - .get_header(header::SET_COOKIE) + .headers() + .get(header::SET_COOKIE) .expect("response should have Set-Cookie header") .to_str() .expect("header should be valid UTF-8"); @@ -1381,15 +1463,19 @@ mod tests { let registry = IntegrationRegistry::from_routes(routes); - let mut req = Request::get("https://test.example.com/integrations/test/synthetic"); - // Pre-existing cookie with a valid-format synthetic ID - req.set_header( + let mut req = Request::builder() + .method(Method::GET) + .uri("https://test.example.com/integrations/test/synthetic") + .body(EdgeBody::empty()) + .expect("should build request"); + req.headers_mut().insert( header::COOKIE, - format!( + HeaderValue::from_str(&format!( "{}={}", crate::constants::COOKIE_SYNTHETIC_ID, crate::test_support::tests::VALID_SYNTHETIC_ID - ), + )) + .expect("should build Cookie header"), ); let result = futures::executor::block_on(registry.handle_proxy( @@ -1403,14 +1489,12 @@ mod tests { let response = result.expect("proxy handle should succeed"); - // Should still have x-synthetic-id header assert!( - response.get_header(HEADER_X_SYNTHETIC_ID).is_some(), + response.headers().get(&HEADER_X_SYNTHETIC_ID).is_some(), "Response should still have x-synthetic-id header" ); - // Should ALWAYS set the cookie again (per new requirements) - let set_cookie = response.get_header(header::SET_COOKIE); + let set_cookie = response.headers().get(header::SET_COOKIE); assert!( set_cookie.is_some(), @@ -1440,8 +1524,11 @@ mod tests { )]; let registry = IntegrationRegistry::from_routes(routes); - let req = Request::post("https://test-publisher.com/integrations/test/synthetic") - .with_body("test body"); + let req = Request::builder() + .method(Method::POST) + .uri("https://test-publisher.com/integrations/test/synthetic") + .body(EdgeBody::from("test body")) + .expect("should build POST request"); let result = futures::executor::block_on(registry.handle_proxy( &Method::POST, @@ -1457,7 +1544,7 @@ mod tests { let response = response.unwrap(); assert!( - response.get_header(HEADER_X_SYNTHETIC_ID).is_some(), + response.headers().get(&HEADER_X_SYNTHETIC_ID).is_some(), "POST response should have x-synthetic-id header" ); } diff --git a/crates/trusted-server-core/src/integrations/testlight.rs b/crates/trusted-server-core/src/integrations/testlight.rs index 40de4366..aa14ac72 100644 --- a/crates/trusted-server-core/src/integrations/testlight.rs +++ b/crates/trusted-server-core/src/integrations/testlight.rs @@ -1,17 +1,17 @@ use std::sync::Arc; use async_trait::async_trait; +use edgezero_core::body::Body as EdgeBody; use error_stack::{Report, ResultExt}; -use fastly::http::{header, HeaderValue}; -use fastly::{Request, Response}; +use http::header::{self, HeaderValue}; +use http::Response; use serde::{Deserialize, Serialize}; use serde_json::{Map, Value}; use validator::Validate; -use crate::compat; use crate::error::TrustedServerError; use crate::integrations::{ - AttributeRewriteAction, IntegrationAttributeContext, IntegrationAttributeRewriter, + collect_body, AttributeRewriteAction, IntegrationAttributeContext, IntegrationAttributeRewriter, IntegrationEndpoint, IntegrationProxy, IntegrationRegistration, }; use crate::platform::RuntimeServices; @@ -95,6 +95,38 @@ impl TestlightIntegration { message: message.into(), } } + + fn rewrite_request_body( + payload_bytes: &[u8], + synthetic_id: &str, + ) -> Result, Report> { + let mut payload = serde_json::from_slice::(payload_bytes) + .change_context(Self::error("Failed to parse request body"))?; + payload + .validate() + .map_err(|err| Report::new(Self::error(format!("Invalid request payload: {err}"))))?; + + payload.user.id = Some(synthetic_id.to_string()); + + serde_json::to_vec(&payload).change_context(Self::error("Failed to serialize request body")) + } + + fn rebuild_response( + mut parts: http::response::Parts, + body_bytes: Vec, + json_content_type: bool, + ) -> Result, Report> { + parts.headers.remove(header::CONTENT_LENGTH); + + if json_content_type { + parts.headers.insert( + header::CONTENT_TYPE, + HeaderValue::from_static("application/json"), + ); + } + + Ok(Response::from_parts(parts, EdgeBody::from(body_bytes))) + } } fn build( @@ -143,17 +175,14 @@ impl IntegrationProxy for TestlightIntegration { &self, settings: &Settings, services: &RuntimeServices, - mut req: Request, - ) -> Result> { - let mut payload = serde_json::from_slice::(&req.take_body_bytes()) - .change_context(Self::error("Failed to parse request body"))?; - payload - .validate() - .map_err(|err| Report::new(Self::error(format!("Invalid request payload: {err}"))))?; + req: http::Request, + ) -> Result, Report> { + let (parts, body) = req.into_parts(); + let payload_bytes = collect_body(body, TESTLIGHT_INTEGRATION_ID).await?; + let req = http::Request::from_parts(parts, EdgeBody::empty()); // Read synthetic ID from header (set by registry) or cookie - let http_req = compat::from_fastly_request_ref(&req); - let synthetic_id = get_synthetic_id(&http_req) + let synthetic_id = get_synthetic_id(&req) .change_context(Self::error("Failed to read synthetic ID"))? .ok_or_else(|| { Report::new(Self::error( @@ -162,10 +191,7 @@ impl IntegrationProxy for TestlightIntegration { )) })?; - payload.user.id = Some(synthetic_id); - - let payload_bytes = serde_json::to_vec(&payload) - .change_context(Self::error("Failed to serialize request body"))?; + let payload_bytes = Self::rewrite_request_body(&payload_bytes, &synthetic_id)?; let mut proxy_config = ProxyRequestConfig::new(&self.config.endpoint); proxy_config.forward_synthetic_id = false; @@ -176,32 +202,24 @@ impl IntegrationProxy for TestlightIntegration { HeaderValue::from_static("application/json"), )); - let mut response = compat::to_fastly_response( - proxy_request( - settings, - compat::from_fastly_request(req), - proxy_config, - services, - ) + let response = proxy_request(settings, req, proxy_config, services) .await - .change_context(Self::error("Failed to contact upstream integration"))?, - ); + .change_context(Self::error("Failed to contact upstream integration"))?; + let (parts, body) = response.into_parts(); // Attempt to parse response into structured form for logging/future transforms. - let response_body = response.take_body_bytes(); + let response_body = collect_body(body, TESTLIGHT_INTEGRATION_ID).await?; match serde_json::from_slice::(&response_body) { Ok(body) => { - response - .set_body_json(&body) + let response_body = serde_json::to_vec(&body) .change_context(Self::error("Failed to serialize integration response body"))?; + Self::rebuild_response(parts, response_body, true) } Err(_) => { // Preserve original body if the integration responded with non-JSON content. - response.set_body(response_body); + Self::rebuild_response(parts, response_body, false) } } - - Ok(response) } } @@ -266,9 +284,12 @@ impl Default for TestlightResponseBody { #[cfg(test)] mod tests { use super::*; - use crate::{test_support::tests::create_test_settings, tsjs}; - use fastly::http::Method; + use crate::platform::test_support::{build_services_with_http_client, StubHttpClient}; + use crate::test_support::tests::{create_test_settings, VALID_SYNTHETIC_ID}; + use crate::tsjs; + use http::Method; use serde_json::json; + use std::sync::Arc; #[test] fn build_requires_config() { @@ -360,4 +381,95 @@ mod tests { "Integration should register POST /integrations/testlight/auction" ); } + + #[test] + fn rewrite_request_body_injects_synthetic_id_without_fastly_types() { + let payload = br#"{"imp":[{"id":"slot-1"}]}"#; + + let rewritten = TestlightIntegration::rewrite_request_body(payload, "abc123.XyZ789") + .expect("should rewrite Testlight payload"); + let rewritten_json: serde_json::Value = + serde_json::from_slice(&rewritten).expect("should parse rewritten payload"); + + assert_eq!( + rewritten_json["user"]["id"], "abc123.XyZ789", + "should inject the synthetic ID into the Testlight user payload" + ); + } + + #[test] + fn rebuild_response_drops_stale_content_length_when_body_changes() { + let response = http::Response::builder() + .status(http::StatusCode::OK) + .header(header::CONTENT_LENGTH, "99") + .body(EdgeBody::from(br#"{ "ok" : true }"#.to_vec())) + .expect("should build Testlight response"); + let (parts, _) = response.into_parts(); + + let rebuilt = + TestlightIntegration::rebuild_response(parts, br#"{"ok":true}"#.to_vec(), true) + .expect("should rebuild Testlight response"); + + assert!( + rebuilt.headers().get(header::CONTENT_LENGTH).is_none(), + "should drop stale Content-Length when rebuilding the response body" + ); + assert_eq!( + rebuilt + .headers() + .get(header::CONTENT_TYPE) + .and_then(|value| value.to_str().ok()), + Some("application/json"), + "should normalize JSON responses to application/json" + ); + } + + #[tokio::test] + async fn handle_uses_platform_http_client_with_http_request() { + let stub = Arc::new(StubHttpClient::new()); + stub.push_response(200, br#"{"ok":true}"#.to_vec()); + let services = build_services_with_http_client( + Arc::clone(&stub) as Arc + ); + let settings = create_test_settings(); + let integration = TestlightIntegration::new(TestlightConfig { + enabled: true, + endpoint: "https://example.com/openrtb".to_string(), + timeout_ms: 1000, + shim_src: tsjs::tsjs_unified_script_src(), + rewrite_scripts: true, + }); + let mut req = http::Request::builder() + .method(Method::POST) + .uri("https://edge.example.com/integrations/testlight/auction") + .body(EdgeBody::from(br#"{"imp":[{"id":"slot-1"}]}"#.to_vec())) + .expect("should build request"); + req.headers_mut().insert( + crate::constants::HEADER_X_SYNTHETIC_ID.clone(), + http::HeaderValue::from_static(VALID_SYNTHETIC_ID), + ); + + let response = integration + .handle(&settings, &services, req) + .await + .expect("should proxy Testlight request"); + + assert_eq!( + response.status(), + http::StatusCode::OK, + "should return stubbed upstream status" + ); + assert_eq!( + stub.recorded_backend_names(), + vec!["stub-backend".to_string()], + "should route outbound request through PlatformHttpClient" + ); + let response_json: serde_json::Value = + serde_json::from_slice(&response.into_body().into_bytes()) + .expect("should parse JSON response"); + assert_eq!( + response_json["ok"], true, + "should preserve the upstream JSON response body" + ); + } }