From d4683656e5ceea38895cca3889618f2caa95d3fd Mon Sep 17 00:00:00 2001 From: Jason Vranek Date: Tue, 19 May 2026 16:05:35 -0700 Subject: [PATCH] Rewrite submit_block relay communication to support SSZ and JSON content negotiation. Includes: - SSZ-first request encoding with JSON fallback on 406/415 - Content-Type and Eth-Consensus-Version header handling - Fork-aware SSZ decoding for relay responses - MIME parameter tolerance on relay response Content-Type - v2 to v1 fallback forwards payload to BN (prevents silent block loss) - V2 fallback metric counter - Comprehensive submit_block integration tests for both encodings --- crates/pbs/src/metrics.rs | 10 + crates/pbs/src/mev_boost/submit_block.rs | 463 ++++++++++++++++------ crates/pbs/src/routes/submit_block.rs | 81 ++-- tests/src/mock_validator.rs | 65 ++- tests/tests/pbs_mux.rs | 28 +- tests/tests/pbs_post_blinded_blocks.rs | 477 +++++++++++++++++++++-- 6 files changed, 939 insertions(+), 185 deletions(-) diff --git a/crates/pbs/src/metrics.rs b/crates/pbs/src/metrics.rs index 1f91e47f..2bf9b912 100644 --- a/crates/pbs/src/metrics.rs +++ b/crates/pbs/src/metrics.rs @@ -60,4 +60,14 @@ lazy_static! { &["http_status_code", "endpoint"], PBS_METRICS_REGISTRY ).unwrap(); + + /// Count of v2 submit_block requests that fell back to the v1 endpoint + /// because the relay returned 404 on v2. A high value indicates the relay + /// fleet has not been upgraded to support submitBlindedBlockV2. + pub static ref V2_FALLBACK_TO_V1: IntCounterVec = register_int_counter_vec_with_registry!( + "pbs_submit_block_v2_fallback_to_v1_total", + "Count of v2 submit_block requests that fell back to v1 because the relay did not support v2", + &["relay_id"], + PBS_METRICS_REGISTRY + ).unwrap(); } diff --git a/crates/pbs/src/mev_boost/submit_block.rs b/crates/pbs/src/mev_boost/submit_block.rs index b416dba2..dfabcf60 100644 --- a/crates/pbs/src/mev_boost/submit_block.rs +++ b/crates/pbs/src/mev_boost/submit_block.rs @@ -1,5 +1,4 @@ use std::{ - str::FromStr, sync::Arc, time::{Duration, Instant}, }; @@ -8,26 +7,63 @@ use alloy::{eips::eip7594::CELLS_PER_EXT_BLOB, primitives::B256}; use axum::http::{HeaderMap, HeaderValue}; use cb_common::{ pbs::{ - BlindedBeaconBlock, BlobsBundle, BuilderApiVersion, ForkName, HEADER_CONSENSUS_VERSION, - HEADER_START_TIME_UNIX_MS, KzgCommitments, RelayClient, SignedBlindedBeaconBlock, - SubmitBlindedBlockResponse, + BlindedBeaconBlock, BlobsBundle, BuilderApiVersion, ForkName, ForkVersionDecode, + HEADER_START_TIME_UNIX_MS, KzgCommitments, PayloadAndBlobs, RelayClient, + SignedBlindedBeaconBlock, SubmitBlindedBlockResponse, error::{PbsError, ValidationError}, }, - utils::{get_user_agent_with_version, read_chunked_body_with_max, utcnow_ms}, + utils::{ + CONSENSUS_VERSION_HEADER, EncodingType, OUTBOUND_ACCEPT, get_user_agent_with_version, + parse_response_encoding_and_fork, read_chunked_body_with_max, utcnow_ms, + }, }; use futures::{FutureExt, future::select_ok}; -use reqwest::header::USER_AGENT; +use reqwest::{ + StatusCode, + header::{ACCEPT, CONTENT_TYPE, USER_AGENT}, +}; +use ssz::Encode; use tracing::{debug, warn}; use url::Url; use crate::{ - constants::{ - MAX_SIZE_SUBMIT_BLOCK_RESPONSE, SUBMIT_BLINDED_BLOCK_ENDPOINT_TAG, TIMEOUT_ERROR_CODE_STR, - }, - metrics::{RELAY_LATENCY, RELAY_STATUS_CODE}, + TIMEOUT_ERROR_CODE_STR, + constants::{MAX_SIZE_SUBMIT_BLOCK_RESPONSE, SUBMIT_BLINDED_BLOCK_ENDPOINT_TAG}, + metrics::{RELAY_LATENCY, RELAY_STATUS_CODE, V2_FALLBACK_TO_V1}, state::{BuilderApiState, PbsState}, }; +#[derive(Clone)] +struct ProposalInfo { + /// The signed blinded block to submit + signed_blinded_block: Arc, + + /// Common baseline of headers to send with each request + headers: HeaderMap, + + /// The version of the submit_block route being used + api_version: BuilderApiVersion, +} + +struct SubmitBlockResponseInfo { + /// The raw body of the response + response_bytes: Vec, + + /// The content type the response is encoded with. `None` on v2 + /// ACCEPTED/OK paths where no body is returned. + content_type: Option, + + /// Which fork the response bid is for (if provided as a header, rather than + /// part of the body) + fork: Option, + + /// The status code of the response, for logging + code: StatusCode, + + /// The round-trip latency of the request + request_latency: Duration, +} + /// Implements https://ethereum.github.io/builder-specs/#/Builder/submitBlindedBlock and /// https://ethereum.github.io/builder-specs/#/Builder/submitBlindedBlockV2. Use `api_version` to /// distinguish between the two. @@ -39,36 +75,27 @@ pub async fn submit_block( ) -> eyre::Result> { debug!(?req_headers, "received headers"); - let fork_name = req_headers - .get(HEADER_CONSENSUS_VERSION) - .and_then(|h| { - let str = h.to_str().ok()?; - ForkName::from_str(str).ok() - }) - .unwrap_or_else(|| { - let slot = signed_blinded_block.slot().as_u64(); - state.config.chain.fork_by_slot(slot) - }); - - // safe because ForkName is visible ASCII chars - let consensus_version = HeaderValue::from_str(&fork_name.to_string()).unwrap(); - // prepare headers let mut send_headers = HeaderMap::new(); send_headers.insert(HEADER_START_TIME_UNIX_MS, HeaderValue::from(utcnow_ms())); send_headers.insert(USER_AGENT, get_user_agent_with_version(&req_headers)?); - send_headers.insert(HEADER_CONSENSUS_VERSION, consensus_version); + // Create the Accept headers for requests + // Use the documented, deterministic preference: + // SSZ first (wire-efficient), JSON fallback. + let accept_types = OUTBOUND_ACCEPT.to_string(); + send_headers.insert(ACCEPT, HeaderValue::from_str(&accept_types).unwrap()); + + // Send requests to all relays concurrently + let proposal_info = + Arc::new(ProposalInfo { signed_blinded_block, headers: send_headers, api_version }); let mut handles = Vec::with_capacity(state.all_relays().len()); - for relay in state.all_relays().iter().cloned() { + for relay in state.all_relays().iter() { handles.push( tokio::spawn(submit_block_with_timeout( - signed_blinded_block.clone(), - relay, - send_headers.clone(), + proposal_info.clone(), + relay.clone(), state.pbs_config().timeout_get_payload_ms, - api_version, - fork_name, )) .map(|join_result| match join_result { Ok(res) => res, @@ -87,40 +114,42 @@ pub async fn submit_block( /// Submit blinded block to relay, retry connection errors until the /// given timeout has passed async fn submit_block_with_timeout( - signed_blinded_block: Arc, + proposal_info: Arc, relay: RelayClient, - headers: HeaderMap, timeout_ms: u64, - api_version: BuilderApiVersion, - fork_name: ForkName, ) -> Result, PbsError> { - let mut url = relay.submit_block_url(api_version)?; + let mut url = Arc::new(relay.submit_block_url(proposal_info.api_version)?); let mut remaining_timeout_ms = timeout_ms; let mut retry = 0; let mut backoff = Duration::from_millis(250); - let mut request_api_version = api_version; + let mut request_api_version = proposal_info.api_version; loop { let start_request = Instant::now(); match send_submit_block( + proposal_info.clone(), url.clone(), - &signed_blinded_block, &relay, - headers.clone(), remaining_timeout_ms, retry, - &request_api_version, - fork_name, + request_api_version, ) .await { Ok(response) => { - // If the original request was for v2 but we had to fall back to v1, return a v2 - // response + // If the original request was for v2 but we had to fall back to v1, the + // V1 response body (execution payload + blobs bundle) MUST be forwarded + // back to the beacon node so the proposer can broadcast. Returning an + // empty 202 here would cause silent block loss because the BN never + // receives the unblinded payload. if request_api_version == BuilderApiVersion::V1 && - api_version != request_api_version + proposal_info.api_version != request_api_version { - return Ok(None); + warn!( + relay_id = relay.id.as_ref(), + "v2 submit_block fell back to v1; forwarding v1 payload to beacon node" + ); + V2_FALLBACK_TO_V1.with_label_values(&[relay.id.as_ref()]).inc(); } return Ok(response); } @@ -144,7 +173,7 @@ async fn submit_block_with_timeout( relay_id = relay.id.as_ref(), "relay does not support v2 endpoint, retrying with v1" ); - url = relay.submit_block_url(BuilderApiVersion::V1)?; + url = Arc::new(relay.submit_block_url(BuilderApiVersion::V1)?); request_api_version = BuilderApiVersion::V1; } @@ -159,22 +188,155 @@ async fn submit_block_with_timeout( // back #[allow(clippy::too_many_arguments)] async fn send_submit_block( - url: Url, - signed_blinded_block: &SignedBlindedBeaconBlock, + proposal_info: Arc, + url: Arc, relay: &RelayClient, - headers: HeaderMap, timeout_ms: u64, retry: u32, - api_version: &BuilderApiVersion, - fork_name: ForkName, + api_version: BuilderApiVersion, ) -> Result, PbsError> { + // Full processing: decode full response and validate + let response = + send_submit_block_full(proposal_info.clone(), url, relay, timeout_ms, retry, api_version) + .await?; + let response = match response { + None => { + // v2 request with no body + return Ok(None); + } + Some(res) => res, + }; + // Extract the info needed for validation + let got_block_hash = response.data.execution_payload.block_hash().0; + + // request has different type so cant be deserialized in the wrong version, + // response has a "version" field + match &proposal_info.signed_blinded_block.message() { + BlindedBeaconBlock::Electra(blinded_block) => { + let expected_block_hash = + blinded_block.body.execution_payload.execution_payload_header.block_hash.0; + let expected_commitments = &blinded_block.body.blob_kzg_commitments; + + validate_unblinded_block( + expected_block_hash, + got_block_hash, + expected_commitments, + &response.data.blobs_bundle, + response.version, + ) + } + + BlindedBeaconBlock::Fulu(blinded_block) => { + let expected_block_hash = + blinded_block.body.execution_payload.execution_payload_header.block_hash.0; + let expected_commitments = &blinded_block.body.blob_kzg_commitments; + + validate_unblinded_block( + expected_block_hash, + got_block_hash, + expected_commitments, + &response.data.blobs_bundle, + response.version, + ) + } + + _ => return Err(PbsError::Validation(ValidationError::UnsupportedFork)), + }?; + Ok(Some(response)) +} + +/// Send and fully process a submit_block request, returning a complete decoded +/// response +async fn send_submit_block_full( + proposal_info: Arc, + url: Arc, + relay: &RelayClient, + timeout_ms: u64, + retry: u32, + api_version: BuilderApiVersion, +) -> Result, PbsError> { + // Send the request + let block_response = send_submit_block_impl( + relay, + url, + timeout_ms, + proposal_info.headers.clone(), + &proposal_info.signed_blinded_block, + retry, + api_version, + ) + .await?; + + // If this is not v1, there's no body to decode + if api_version != BuilderApiVersion::V1 { + return Ok(None); + } + + // Decode the payload based on content type. The v1 guard above ensures + // `content_type` is Some. + let decoded_response = + decode_by_encoding(&block_response, decode_json_payload, decode_ssz_payload)?; + + // Log and return + debug!( + relay_id = relay.id.as_ref(), + retry, + latency = ?block_response.request_latency, + version =% decoded_response.version, + "received unblinded block" + ); + + Ok(Some(decoded_response)) +} + +/// Dispatch a v1 submit_block response to the appropriate decoder based on the +/// negotiated content-type. Caller guarantees `content_type` is Some (v2 +/// paths early-exit before reaching decode); an absent Content-Type on v1 is +/// treated as a protocol violation. SSZ additionally requires a fork header. +fn decode_by_encoding( + info: &SubmitBlockResponseInfo, + on_json: impl FnOnce(&[u8]) -> Result, + on_ssz: impl FnOnce(&[u8], ForkName) -> Result, +) -> Result { + let content_type = info.content_type.ok_or_else(|| PbsError::RelayResponse { + error_msg: "v1 submit_block response missing Content-Type".to_string(), + code: info.code.as_u16(), + })?; + match content_type { + EncodingType::Json => on_json(&info.response_bytes), + EncodingType::Ssz => { + let fork = info.fork.ok_or_else(|| PbsError::RelayResponse { + error_msg: "missing fork version header in SSZ submit_block response".to_string(), + code: info.code.as_u16(), + })?; + on_ssz(&info.response_bytes, fork) + } + } +} + +/// Sends the actual HTTP request to the relay's submit_block endpoint, +/// returning the response (if applicable), the round-trip time, and the +/// encoding type used for the body (if any). Used by send_submit_block. +async fn send_submit_block_impl( + relay: &RelayClient, + url: Arc, + timeout_ms: u64, + headers: HeaderMap, + signed_blinded_block: &SignedBlindedBeaconBlock, + retry: u32, + api_version: BuilderApiVersion, +) -> Result { let start_request = Instant::now(); - let res = match relay + + // Try SSZ first + let mut res = match relay .client - .post(url) + .post(url.as_ref().clone()) .timeout(Duration::from_millis(timeout_ms)) - .headers(headers) - .json(&signed_blinded_block) + .headers(headers.clone()) + .body(signed_blinded_block.as_ssz_bytes()) + .header(CONTENT_TYPE, EncodingType::Ssz.to_string()) + .header(CONSENSUS_VERSION_HEADER, signed_blinded_block.fork_name_unchecked().to_string()) .send() .await { @@ -190,96 +352,151 @@ async fn send_submit_block( return Err(err.into()); } }; + + // Retry as JSON only on the two status codes the builder-spec defines as + // "media type is the problem": 406 Not Acceptable and 415 Unsupported + // Media Type (RFC 7231 §6.5.13). Any other 4xx (400 malformed, 401/403 + // auth, 409 conflict, 429 rate limit, etc.) is orthogonal to encoding + // and MUST surface unchanged — retrying pollutes observability, doubles + // load on the relay, and can mask real errors behind a JSON-path reply. + if matches!(res.status(), StatusCode::NOT_ACCEPTABLE | StatusCode::UNSUPPORTED_MEDIA_TYPE,) { + warn!( + relay_id = relay.id.as_ref(), + status = %res.status(), + "relay rejected SSZ content-type, resubmitting block with JSON content-type" + ); + res = match relay + .client + .post(url.as_ref().clone()) + .timeout(Duration::from_millis(timeout_ms)) + .headers(headers) + .body(serde_json::to_vec(&signed_blinded_block).unwrap()) + .header(CONTENT_TYPE, EncodingType::Json.to_string()) + .send() + .await + { + Ok(res) => res, + Err(err) => { + RELAY_STATUS_CODE + .with_label_values(&[ + TIMEOUT_ERROR_CODE_STR, + SUBMIT_BLINDED_BLOCK_ENDPOINT_TAG, + &relay.id, + ]) + .inc(); + return Err(err.into()); + } + }; + } + + // Log the response code and latency + let code = res.status(); let request_latency = start_request.elapsed(); RELAY_LATENCY .with_label_values(&[SUBMIT_BLINDED_BLOCK_ENDPOINT_TAG, &relay.id]) .observe(request_latency.as_secs_f64()); - - let code = res.status(); RELAY_STATUS_CODE .with_label_values(&[code.as_str(), SUBMIT_BLINDED_BLOCK_ENDPOINT_TAG, &relay.id]) .inc(); - let response_bytes = read_chunked_body_with_max(res, MAX_SIZE_SUBMIT_BLOCK_RESPONSE).await?; - if !code.is_success() { - let err = PbsError::RelayResponse { - error_msg: String::from_utf8_lossy(&response_bytes).into_owned(), - code: code.as_u16(), - }; - - // we requested the payload from all relays, but some may have not received it - warn!(relay_id = relay.id.as_ref(), retry, %err, "failed to get payload (this might be ok if other relays have it)"); - return Err(err); - }; - - if api_version != &BuilderApiVersion::V1 { - // v2 response is going to be empty, so just break here + // If this was API v2 and succeeded then we can just return here + if api_version != BuilderApiVersion::V1 { debug!( relay_id = relay.id.as_ref(), retry, latency = ?request_latency, - "successful request" + status = %code, + "received response for v2 submit_block" ); - return Ok(None); - } - - let block_response = match serde_json::from_slice::(&response_bytes) - { - Ok(parsed) => parsed, - Err(err) => { - return Err(PbsError::JsonDecode { - err, - raw: String::from_utf8_lossy(&response_bytes).into_owned(), - }); + match code { + StatusCode::ACCEPTED => { + return Ok(SubmitBlockResponseInfo { + response_bytes: Vec::new(), + content_type: None, + fork: None, + code, + request_latency, + }); + } + StatusCode::OK => { + warn!( + relay_id = relay.id.as_ref(), + "relay sent OK response for v2 submit_block, expected 202 Accepted" + ); + return Ok(SubmitBlockResponseInfo { + response_bytes: Vec::new(), + content_type: None, + fork: None, + code, + request_latency, + }); + } + _ => { + return Err(PbsError::RelayResponse { + error_msg: format!( + "relay sent unexpected code for builder route v2 {}: {code}", + relay.id.as_ref() + ), + code: code.as_u16(), + }); + } } - }; + } - debug!( - relay_id = relay.id.as_ref(), - retry, - latency = ?request_latency, - version =% block_response.version, - "received unblinded block" - ); + // If the code is not OK, return early + if code != StatusCode::OK { + let response_bytes = + read_chunked_body_with_max(res, MAX_SIZE_SUBMIT_BLOCK_RESPONSE).await?; + let err = PbsError::RelayResponse { + error_msg: String::from_utf8_lossy(&response_bytes).into_owned(), + code: code.as_u16(), + }; - let got_block_hash = block_response.data.execution_payload.block_hash().0; + // we requested the payload from all relays, but some may have not received it + warn!(relay_id = relay.id.as_ref(), %err, "failed to get payload (this might be ok if other relays have it)"); + return Err(err); + } - // request has different type so cant be deserialized in the wrong version, - // response has a "version" field - match &signed_blinded_block.message() { - BlindedBeaconBlock::Electra(blinded_block) => { - let expected_block_hash = - blinded_block.body.execution_payload.execution_payload_header.block_hash.0; - let expected_commitments = &blinded_block.body.blob_kzg_commitments; + // We're on v1 so decode the payload normally. Parse Content-Type + // (tolerating MIME parameters per RFC 7231 §3.1.1.1) and + // Eth-Consensus-Version headers + let (content_type, fork) = parse_response_encoding_and_fork(res.headers(), code.as_u16())?; - validate_unblinded_block( - expected_block_hash, - got_block_hash, - expected_commitments, - &block_response.data.blobs_bundle, - fork_name, - ) - } + // Decode the body + let response_bytes = read_chunked_body_with_max(res, MAX_SIZE_SUBMIT_BLOCK_RESPONSE).await?; + Ok(SubmitBlockResponseInfo { + response_bytes, + content_type: Some(content_type), + fork, + code, + request_latency, + }) +} - BlindedBeaconBlock::Fulu(blinded_block) => { - let expected_block_hash = - blinded_block.body.execution_payload.execution_payload_header.block_hash.0; - let expected_commitments = &blinded_block.body.blob_kzg_commitments; +/// Decode a JSON-encoded submit_block response +fn decode_json_payload(response_bytes: &[u8]) -> Result { + match serde_json::from_slice::(response_bytes) { + Ok(parsed) => Ok(parsed), + Err(err) => Err(PbsError::JsonDecode { + err, + raw: String::from_utf8_lossy(response_bytes).into_owned(), + }), + } +} - validate_unblinded_block( - expected_block_hash, - got_block_hash, - expected_commitments, - &block_response.data.blobs_bundle, - fork_name, - ) +/// Decode an SSZ-encoded submit_block response +fn decode_ssz_payload( + response_bytes: &[u8], + fork: ForkName, +) -> Result { + let data = PayloadAndBlobs::from_ssz_bytes_by_fork(response_bytes, fork).map_err(|e| { + PbsError::RelayResponse { + error_msg: (format!("error decoding relay payload: {e:?}")).to_string(), + code: 200, } - - _ => return Err(PbsError::Validation(ValidationError::UnsupportedFork)), - }?; - - Ok(Some(block_response)) + })?; + Ok(SubmitBlindedBlockResponse { version: fork, data, metadata: Default::default() }) } fn validate_unblinded_block( diff --git a/crates/pbs/src/routes/submit_block.rs b/crates/pbs/src/routes/submit_block.rs index 004b601e..8f70c79a 100644 --- a/crates/pbs/src/routes/submit_block.rs +++ b/crates/pbs/src/routes/submit_block.rs @@ -1,11 +1,20 @@ use std::sync::Arc; -use axum::{Json, extract::State, http::HeaderMap, response::IntoResponse}; +use axum::{ + body::Bytes, + extract::State, + http::{HeaderMap, HeaderValue}, + response::IntoResponse, +}; use cb_common::{ - pbs::{BuilderApiVersion, GetPayloadInfo, SignedBlindedBeaconBlock}, - utils::{get_user_agent, timestamp_of_slot_start_millis, utcnow_ms}, + pbs::{BuilderApiVersion, GetPayloadInfo}, + utils::{ + CONSENSUS_VERSION_HEADER, EncodingType, deserialize_body, get_accept_types, get_user_agent, + timestamp_of_slot_start_millis, utcnow_ms, + }, }; -use reqwest::StatusCode; +use reqwest::{StatusCode, header::CONTENT_TYPE}; +use ssz::Encode; use tracing::{error, info, trace}; use crate::{ @@ -19,37 +28,26 @@ use crate::{ pub async fn handle_submit_block_v1>( state: State>, req_headers: HeaderMap, - Json(signed_blinded_block): Json>, + body_bytes: Bytes, ) -> Result { - handle_submit_block_impl::( - state, - req_headers, - signed_blinded_block, - BuilderApiVersion::V1, - ) - .await + handle_submit_block_impl::(state, req_headers, body_bytes, BuilderApiVersion::V1).await } pub async fn handle_submit_block_v2>( state: State>, req_headers: HeaderMap, - Json(signed_blinded_block): Json>, + body_bytes: Bytes, ) -> Result { - handle_submit_block_impl::( - state, - req_headers, - signed_blinded_block, - BuilderApiVersion::V2, - ) - .await + handle_submit_block_impl::(state, req_headers, body_bytes, BuilderApiVersion::V2).await } async fn handle_submit_block_impl>( State(state): State>, req_headers: HeaderMap, - signed_blinded_block: Arc, + body_bytes: Bytes, api_version: BuilderApiVersion, ) -> Result { + let signed_blinded_block = Arc::new(deserialize_body(&req_headers, body_bytes).await?); tracing::Span::current().record("slot", signed_blinded_block.slot().as_u64() as i64); tracing::Span::current() .record("block_hash", tracing::field::debug(signed_blinded_block.block_hash())); @@ -64,27 +62,60 @@ async fn handle_submit_block_impl>( let block_hash = signed_blinded_block.block_hash(); let slot_start_ms = timestamp_of_slot_start_millis(slot.into(), state.config.chain); let ua = get_user_agent(&req_headers); + let accept_types = get_accept_types(&req_headers).map_err(|e| { + error!(%e, "error parsing accept header"); + PbsClientError::DecodeError(format!("error parsing accept header: {e}")) + })?; + // Honor caller q-value preference: pick the highest-priority encoding that + // we can actually produce. Server preference for tiebreaks is SSZ first. + let response_encoding = accept_types.preferred(&[EncodingType::Ssz, EncodingType::Json]); info!(ua, ms_into_slot = now.saturating_sub(slot_start_ms), "new request"); match A::submit_block(signed_blinded_block, req_headers, state, api_version).await { Ok(res) => match res { - Some(block_response) => { - trace!(?block_response); + Some(payload_and_blobs) => { + trace!(?payload_and_blobs); info!("received unblinded block (v1)"); BEACON_NODE_STATUS .with_label_values(&["200", SUBMIT_BLINDED_BLOCK_ENDPOINT_TAG]) .inc(); - Ok((StatusCode::OK, Json(block_response).into_response())) + + // Three arms: no viable encoding (unreachable in practice — + // `get_accept_types` errors earlier if the caller offers + // nothing we support), SSZ, or JSON. + match response_encoding { + None => Err(PbsClientError::DecodeError( + "no viable accept types in request".to_string(), + )), + Some(EncodingType::Ssz) => { + let mut response = payload_and_blobs.data.as_ssz_bytes().into_response(); + + let content_type_header = EncodingType::Ssz.content_type_header().clone(); + response.headers_mut().insert(CONTENT_TYPE, content_type_header); + response.headers_mut().insert( + CONSENSUS_VERSION_HEADER, + HeaderValue::from_str(&payload_and_blobs.version.to_string()).unwrap(), + ); + info!("sending response as SSZ"); + Ok(response) + } + Some(EncodingType::Json) => { + info!("sending response as JSON"); + Ok((StatusCode::OK, axum::Json(payload_and_blobs)).into_response()) + } + } } None => { info!("received unblinded block (v2)"); + // Note: this doesn't provide consensus_version_header because it doesn't pass + // the body through, and there's no content-type header since the body is empty. BEACON_NODE_STATUS .with_label_values(&["202", SUBMIT_BLINDED_BLOCK_ENDPOINT_TAG]) .inc(); - Ok((StatusCode::ACCEPTED, "".into_response())) + Ok((StatusCode::ACCEPTED, "").into_response()) } }, diff --git a/tests/src/mock_validator.rs b/tests/src/mock_validator.rs index 07fa8f06..35adae87 100644 --- a/tests/src/mock_validator.rs +++ b/tests/src/mock_validator.rs @@ -4,7 +4,11 @@ use cb_common::{ types::BlsPublicKey, utils::{CONSENSUS_VERSION_HEADER, EncodingType, ForkName, bls_pubkey_from_hex}, }; -use reqwest::{Response, header::ACCEPT}; +use reqwest::{ + Response, + header::{ACCEPT, CONTENT_TYPE}, +}; +use ssz::Encode; use crate::utils::generate_mock_relay; @@ -73,28 +77,77 @@ impl MockValidator { pub async fn do_submit_block_v1( &self, signed_blinded_block_opt: Option, + accept: Vec, + content_type: EncodingType, + fork_name: ForkName, ) -> eyre::Result { - self.do_submit_block_impl(signed_blinded_block_opt, BuilderApiVersion::V1).await + self.do_submit_block_impl( + signed_blinded_block_opt, + accept, + content_type, + fork_name, + BuilderApiVersion::V1, + ) + .await } pub async fn do_submit_block_v2( &self, signed_blinded_block_opt: Option, + accept: Vec, + content_type: EncodingType, + fork_name: ForkName, ) -> eyre::Result { - self.do_submit_block_impl(signed_blinded_block_opt, BuilderApiVersion::V2).await + self.do_submit_block_impl( + signed_blinded_block_opt, + accept, + content_type, + fork_name, + BuilderApiVersion::V2, + ) + .await } async fn do_submit_block_impl( &self, - signed_blinded_block: Option, + signed_blinded_block_opt: Option, + accept: Vec, + content_type: EncodingType, + fork_name: ForkName, api_version: BuilderApiVersion, ) -> eyre::Result { let url = self.comm_boost.submit_block_url(api_version).unwrap(); let signed_blinded_block = - signed_blinded_block.unwrap_or_else(load_test_signed_blinded_block); + signed_blinded_block_opt.unwrap_or_else(load_test_signed_blinded_block); + let body = match content_type { + EncodingType::Json => serde_json::to_vec(&signed_blinded_block).unwrap(), + EncodingType::Ssz => signed_blinded_block.as_ssz_bytes(), + }; - Ok(self.comm_boost.client.post(url).json(&signed_blinded_block).send().await?) + let accept = match accept.len() { + 0 => None, + 1 => Some(accept.into_iter().next().unwrap().to_string()), + _ => { + // Ordered: first-listed is highest preference. Server honors + // RFC 9110 §12.5.1 (first-listed wins at equal q). + let accept_strings: Vec = + accept.into_iter().map(|e| e.to_string()).collect(); + Some(accept_strings.join(", ")) + } + }; + let mut res = self + .comm_boost + .client + .post(url) + .body(body) + .header(CONSENSUS_VERSION_HEADER, &fork_name.to_string()) + .header(CONTENT_TYPE, &content_type.to_string()); + if let Some(accept_header) = accept { + res = res.header(ACCEPT, accept_header); + } + let res = res.send().await?; + Ok(res) } } diff --git a/tests/tests/pbs_mux.rs b/tests/tests/pbs_mux.rs index 6d093bef..d8ce1356 100644 --- a/tests/tests/pbs_mux.rs +++ b/tests/tests/pbs_mux.rs @@ -12,7 +12,7 @@ use cb_common::{ }, signer::random_secret, types::Chain, - utils::{ForkName, ResponseReadError, set_ignore_content_length}, + utils::{EncodingType, ForkName, ResponseReadError, set_ignore_content_length}, }; use cb_pbs::{DefaultBuilderApi, PbsService, PbsState}; use cb_tests::{ @@ -285,12 +285,34 @@ async fn test_mux() -> Result<()> { // v1 Submit block requests should go to all relays info!("Sending submit block v1"); - assert_eq!(mock_validator.do_submit_block_v1(None,).await?.status(), StatusCode::OK); + assert_eq!( + mock_validator + .do_submit_block_v1( + None, + vec![EncodingType::Json], + EncodingType::Json, + ForkName::Electra + ) + .await? + .status(), + StatusCode::OK + ); assert_eq!(mock_state.received_submit_block(), 3); // default + 2 mux relays were used // v2 Submit block requests should go to all relays info!("Sending submit block v2"); - assert_eq!(mock_validator.do_submit_block_v2(None,).await?.status(), StatusCode::ACCEPTED); + assert_eq!( + mock_validator + .do_submit_block_v2( + None, + vec![EncodingType::Json], + EncodingType::Json, + ForkName::Electra + ) + .await? + .status(), + StatusCode::ACCEPTED + ); assert_eq!(mock_state.received_submit_block(), 6); // default + 2 mux relays were used Ok(()) diff --git a/tests/tests/pbs_post_blinded_blocks.rs b/tests/tests/pbs_post_blinded_blocks.rs index bf4703c2..12cb58e0 100644 --- a/tests/tests/pbs_post_blinded_blocks.rs +++ b/tests/tests/pbs_post_blinded_blocks.rs @@ -1,25 +1,37 @@ -use std::{path::PathBuf, sync::Arc, time::Duration}; +use std::{collections::HashSet, path::PathBuf, sync::Arc, time::Duration}; use cb_common::{ - pbs::{BuilderApiVersion, GetPayloadInfo, SubmitBlindedBlockResponse}, + pbs::{BuilderApiVersion, GetPayloadInfo, PayloadAndBlobs, SubmitBlindedBlockResponse}, signer::random_secret, types::Chain, + utils::{EncodingType, ForkName}, }; use cb_pbs::{DefaultBuilderApi, PbsService, PbsState}; use cb_tests::{ - mock_relay::{MockRelayState, start_mock_relay_service}, + mock_relay::{MockRelayState, start_mock_relay_service_with_listener}, mock_validator::{MockValidator, load_test_signed_blinded_block}, - utils::{generate_mock_relay, get_pbs_config, setup_test_env, to_pbs_config}, + utils::{ + generate_mock_relay, get_free_listener, get_pbs_config, setup_test_env, to_pbs_config, + }, }; use eyre::Result; +use lh_types::ForkVersionDecode; use reqwest::{Response, StatusCode}; use tracing::info; #[tokio::test] async fn test_submit_block_v1() -> Result<()> { - let res = submit_block_impl(3800, &BuilderApiVersion::V1, false, false).await?; - assert_eq!(res.status(), StatusCode::OK); - + let res = submit_block_impl( + BuilderApiVersion::V1, + vec![EncodingType::Json], + HashSet::from([EncodingType::Ssz, EncodingType::Json]), + EncodingType::Json, + 1, + StatusCode::OK, + false, + false, + ) + .await?; let signed_blinded_block = load_test_signed_blinded_block(); let response_body = serde_json::from_slice::(&res.bytes().await?)?; @@ -32,19 +44,46 @@ async fn test_submit_block_v1() -> Result<()> { #[tokio::test] async fn test_submit_block_v2() -> Result<()> { - let res = submit_block_impl(3802, &BuilderApiVersion::V2, false, false).await?; - assert_eq!(res.status(), StatusCode::ACCEPTED); + let res = submit_block_impl( + BuilderApiVersion::V2, + vec![EncodingType::Json], + HashSet::from([EncodingType::Ssz, EncodingType::Json]), + EncodingType::Json, + 1, + StatusCode::ACCEPTED, + false, + false, + ) + .await?; assert_eq!(res.bytes().await?.len(), 0); Ok(()) } // Test that when submitting a block using v2 to a relay that does not support -// v2, PBS falls back to v1 and successfully submits the block. +// v2, PBS falls back to v1 and forwards the v1 response body to the beacon +// node (a 200 with the execution payload), rather than swallowing the payload +// and replying 202 with an empty body — which would cause silent block loss. #[tokio::test] async fn test_submit_block_v2_without_relay_support() -> Result<()> { - let res = submit_block_impl(3804, &BuilderApiVersion::V2, true, false).await?; - assert_eq!(res.status(), StatusCode::ACCEPTED); - assert_eq!(res.bytes().await?.len(), 0); + let res = submit_block_impl( + BuilderApiVersion::V2, + vec![EncodingType::Json], + HashSet::from([EncodingType::Ssz, EncodingType::Json]), + EncodingType::Json, + 1, + StatusCode::OK, + true, + false, + ) + .await?; + // Payload must be forwarded so the BN can broadcast. + let signed_blinded_block = load_test_signed_blinded_block(); + let response_body = serde_json::from_slice::(&res.bytes().await?)?; + assert_eq!( + response_body.data.execution_payload.block_hash(), + signed_blinded_block.block_hash().into(), + "v2->v1 fallback must forward the execution payload to the BN" + ); Ok(()) } @@ -52,8 +91,155 @@ async fn test_submit_block_v2_without_relay_support() -> Result<()> { // for both v1 and v2, PBS doesn't loop forever. #[tokio::test] async fn test_submit_block_on_broken_relay() -> Result<()> { - let res = submit_block_impl(3806, &BuilderApiVersion::V2, true, true).await?; - assert_eq!(res.status(), StatusCode::BAD_GATEWAY); + let _res = submit_block_impl( + BuilderApiVersion::V2, + vec![EncodingType::Json], + HashSet::from([EncodingType::Ssz, EncodingType::Json]), + EncodingType::Json, + 1, + StatusCode::BAD_GATEWAY, + true, + true, + ) + .await?; + Ok(()) +} + +#[tokio::test] +async fn test_submit_block_v1_ssz() -> Result<()> { + let res = submit_block_impl( + BuilderApiVersion::V1, + vec![EncodingType::Ssz], + HashSet::from([EncodingType::Ssz, EncodingType::Json]), + EncodingType::Ssz, + 1, + StatusCode::OK, + false, + false, + ) + .await?; + let signed_blinded_block = load_test_signed_blinded_block(); + + let response_body = + PayloadAndBlobs::from_ssz_bytes_by_fork(&res.bytes().await?, ForkName::Electra).unwrap(); + assert_eq!( + response_body.execution_payload.block_hash(), + signed_blinded_block.block_hash().into() + ); + Ok(()) +} + +#[tokio::test] +async fn test_submit_block_v2_ssz() -> Result<()> { + let res = submit_block_impl( + BuilderApiVersion::V2, + vec![EncodingType::Ssz], + HashSet::from([EncodingType::Ssz, EncodingType::Json]), + EncodingType::Ssz, + 1, + StatusCode::ACCEPTED, + false, + false, + ) + .await?; + assert_eq!(res.bytes().await?.len(), 0); + Ok(()) +} + +/// Test that a v1 submit block request in SSZ is converted to JSON if the relay +/// only supports JSON +#[tokio::test] +async fn test_submit_block_v1_ssz_into_json() -> Result<()> { + let res = submit_block_impl( + BuilderApiVersion::V1, + vec![EncodingType::Ssz], + HashSet::from([EncodingType::Json]), + EncodingType::Ssz, + 2, + StatusCode::OK, + false, + false, + ) + .await?; + let signed_blinded_block = load_test_signed_blinded_block(); + + let response_body = + PayloadAndBlobs::from_ssz_bytes_by_fork(&res.bytes().await?, ForkName::Electra).unwrap(); + assert_eq!( + response_body.execution_payload.block_hash(), + signed_blinded_block.block_hash().into() + ); + Ok(()) +} + +/// Test that a v2 submit block request in SSZ is converted to JSON if the relay +/// only supports JSON +#[tokio::test] +async fn test_submit_block_v2_ssz_into_json() -> Result<()> { + let res = submit_block_impl( + BuilderApiVersion::V2, + vec![EncodingType::Ssz], + HashSet::from([EncodingType::Json]), + EncodingType::Ssz, + 2, + StatusCode::ACCEPTED, + false, + false, + ) + .await?; + assert_eq!(res.bytes().await?.len(), 0); + Ok(()) +} + +/// Test v1 requesting multiple types when the relay supports SSZ, which should +/// return SSZ +#[tokio::test] +async fn test_submit_block_v1_multitype_ssz() -> Result<()> { + let res = submit_block_impl( + BuilderApiVersion::V1, + vec![EncodingType::Ssz, EncodingType::Json], + HashSet::from([EncodingType::Ssz]), + EncodingType::Ssz, + 1, + StatusCode::OK, + false, + false, + ) + .await?; + let signed_blinded_block = load_test_signed_blinded_block(); + + let response_body = + PayloadAndBlobs::from_ssz_bytes_by_fork(&res.bytes().await?, ForkName::Electra).unwrap(); + assert_eq!( + response_body.execution_payload.block_hash(), + signed_blinded_block.block_hash().into() + ); + Ok(()) +} + +/// Test v1 requesting multiple types when the relay supports JSON, which should +/// still return SSZ +#[tokio::test] +async fn test_submit_block_v1_multitype_json() -> Result<()> { + let res = submit_block_impl( + BuilderApiVersion::V1, + vec![EncodingType::Ssz, EncodingType::Json], + HashSet::from([EncodingType::Json]), + EncodingType::Ssz, + 2, + StatusCode::OK, + false, + false, + ) + .await?; + let signed_blinded_block = load_test_signed_blinded_block(); + + let response_body = + PayloadAndBlobs::from_ssz_bytes_by_fork(&res.bytes().await?, ForkName::Electra).unwrap(); + assert_eq!( + response_body.execution_payload.block_hash(), + signed_blinded_block.block_hash().into() + ); Ok(()) } @@ -64,14 +250,18 @@ async fn test_submit_block_too_large() -> Result<()> { let pubkey = signer.public_key(); let chain = Chain::Holesky; - let pbs_port = 3900; + let pbs_listener = get_free_listener().await; + let relay_listener = get_free_listener().await; + let pbs_port = pbs_listener.local_addr().unwrap().port(); + let relay_port = relay_listener.local_addr().unwrap().port(); - let relays = vec![generate_mock_relay(pbs_port + 1, pubkey)?]; + let relays = vec![generate_mock_relay(relay_port, pubkey)?]; let mock_state = Arc::new(MockRelayState::new(chain, signer).with_large_body()); - tokio::spawn(start_mock_relay_service(mock_state.clone(), pbs_port + 1)); + tokio::spawn(start_mock_relay_service_with_listener(mock_state.clone(), relay_listener)); let config = to_pbs_config(chain, get_pbs_config(pbs_port), relays); let state = PbsState::new(config, PathBuf::new()); + drop(pbs_listener); tokio::spawn(PbsService::run::<(), DefaultBuilderApi>(state)); // leave some time to start servers @@ -79,7 +269,9 @@ async fn test_submit_block_too_large() -> Result<()> { let mock_validator = MockValidator::new(pbs_port)?; info!("Sending submit block"); - let res = mock_validator.do_submit_block_v1(None).await; + let res = mock_validator + .do_submit_block_v1(None, vec![EncodingType::Json], EncodingType::Json, ForkName::Electra) + .await; // response size exceeds max size: max: 20971520 assert_eq!(res.unwrap().status(), StatusCode::BAD_GATEWAY); @@ -87,21 +279,30 @@ async fn test_submit_block_too_large() -> Result<()> { Ok(()) } +#[allow(clippy::too_many_arguments)] async fn submit_block_impl( - pbs_port: u16, - api_version: &BuilderApiVersion, + api_version: BuilderApiVersion, + accept_types: Vec, + relay_types: HashSet, + serialization_mode: EncodingType, + expected_try_count: u64, + expected_code: StatusCode, remove_v2_support: bool, force_404s: bool, ) -> Result { setup_test_env(); let signer = random_secret(); let pubkey = signer.public_key(); - let chain = Chain::Holesky; + let pbs_listener = get_free_listener().await; + let relay_listener = get_free_listener().await; + let pbs_port = pbs_listener.local_addr().unwrap().port(); + let relay_port = relay_listener.local_addr().unwrap().port(); // Run a mock relay - let relays = vec![generate_mock_relay(pbs_port + 1, pubkey)?]; + let mock_relay = generate_mock_relay(relay_port, pubkey)?; let mut mock_relay_state = MockRelayState::new(chain, signer); + mock_relay_state.supported_content_types = Arc::new(relay_types); if remove_v2_support { mock_relay_state = mock_relay_state.with_no_submit_block_v2(); } @@ -109,28 +310,248 @@ async fn submit_block_impl( mock_relay_state = mock_relay_state.with_not_found_for_submit_block(); } let mock_state = Arc::new(mock_relay_state); - tokio::spawn(start_mock_relay_service(mock_state.clone(), pbs_port + 1)); + tokio::spawn(start_mock_relay_service_with_listener(mock_state.clone(), relay_listener)); // Run the PBS service - let config = to_pbs_config(chain, get_pbs_config(pbs_port), relays); + let pbs_config = get_pbs_config(pbs_port); + let config = to_pbs_config(chain, pbs_config, vec![mock_relay]); let state = PbsState::new(config, PathBuf::new()); + drop(pbs_listener); tokio::spawn(PbsService::run::<(), DefaultBuilderApi>(state)); // leave some time to start servers tokio::time::sleep(Duration::from_millis(100)).await; + // Send the submit block request let signed_blinded_block = load_test_signed_blinded_block(); let mock_validator = MockValidator::new(pbs_port)?; info!("Sending submit block"); let res = match api_version { BuilderApiVersion::V1 => { - mock_validator.do_submit_block_v1(Some(signed_blinded_block)).await? + mock_validator + .do_submit_block_v1( + Some(signed_blinded_block), + accept_types, + serialization_mode, + ForkName::Electra, + ) + .await? } BuilderApiVersion::V2 => { - mock_validator.do_submit_block_v2(Some(signed_blinded_block)).await? + mock_validator + .do_submit_block_v2( + Some(signed_blinded_block), + accept_types, + serialization_mode, + ForkName::Electra, + ) + .await? } }; - let expected_count = if force_404s { 0 } else { 1 }; + let expected_count = if force_404s { 0 } else { expected_try_count }; assert_eq!(mock_state.received_submit_block(), expected_count); + assert_eq!(res.status(), expected_code); Ok(res) } + +// Retry-as-JSON trigger must be restricted +// to 406 Not Acceptable and 415 Unsupported Media Type. Any other 4xx is +// orthogonal to encoding and MUST surface unchanged. + +/// Shared fixture: relay returns `ssz_status` when the PBS sends SSZ, +/// everything else takes the happy path. Returns `(Response, attempt_count)`. +/// `api_version` picks v1 or v2 endpoint; `relay_types` controls what the +/// relay advertises as supported so the happy JSON path works when retried. +async fn submit_block_ssz_override( + api_version: BuilderApiVersion, + ssz_status: StatusCode, +) -> Result<(Response, u64)> { + setup_test_env(); + let signer = random_secret(); + let pubkey = signer.public_key(); + let chain = Chain::Holesky; + let pbs_listener = get_free_listener().await; + let relay_listener = get_free_listener().await; + let pbs_port = pbs_listener.local_addr().unwrap().port(); + let relay_port = relay_listener.local_addr().unwrap().port(); + + let mock_relay = generate_mock_relay(relay_port, pubkey)?; + let mut mock_relay_state = MockRelayState::new(chain, signer); + // Relay only advertises JSON so the retry (which goes out as JSON) lands + // on a clean success path. The SSZ-status override below intercepts + // before the supported-types check, so the first SSZ attempt still hits + // our injected status regardless of what's advertised here. + mock_relay_state.supported_content_types = Arc::new(HashSet::from([EncodingType::Json])); + mock_relay_state = mock_relay_state.with_submit_block_ssz_status(ssz_status); + let mock_state = Arc::new(mock_relay_state); + tokio::spawn(start_mock_relay_service_with_listener(mock_state.clone(), relay_listener)); + + let pbs_config = get_pbs_config(pbs_port); + let config = to_pbs_config(chain, pbs_config, vec![mock_relay]); + let state = PbsState::new(config, PathBuf::new()); + drop(pbs_listener); + tokio::spawn(PbsService::run::<(), DefaultBuilderApi>(state)); + + tokio::time::sleep(Duration::from_millis(100)).await; + + let signed_blinded_block = load_test_signed_blinded_block(); + let mock_validator = MockValidator::new(pbs_port)?; + // The BN sends SSZ; PBS forwards SSZ first, that's what our override hits. + let accept_types = vec![EncodingType::Ssz, EncodingType::Json]; + let res = match api_version { + BuilderApiVersion::V1 => { + mock_validator + .do_submit_block_v1( + Some(signed_blinded_block), + accept_types, + EncodingType::Ssz, + ForkName::Electra, + ) + .await? + } + BuilderApiVersion::V2 => { + mock_validator + .do_submit_block_v2( + Some(signed_blinded_block), + accept_types, + EncodingType::Ssz, + ForkName::Electra, + ) + .await? + } + }; + Ok((res, mock_state.received_submit_block())) +} + +/// 406 is the spec-defined "retry with a different media type" signal, so we +/// MUST retry as JSON and succeed. +#[tokio::test] +async fn test_submit_block_ssz_retries_as_json_on_406() -> Result<()> { + let (res, attempts) = + submit_block_ssz_override(BuilderApiVersion::V1, StatusCode::NOT_ACCEPTABLE).await?; + assert_eq!(res.status(), StatusCode::OK, "retry-as-JSON must succeed on 406"); + assert_eq!(attempts, 2, "expected SSZ attempt + JSON retry"); + Ok(()) +} + +/// 415 is the other spec-defined media-type rejection status; same retry. +#[tokio::test] +async fn test_submit_block_ssz_retries_as_json_on_415() -> Result<()> { + let (res, attempts) = + submit_block_ssz_override(BuilderApiVersion::V1, StatusCode::UNSUPPORTED_MEDIA_TYPE) + .await?; + assert_eq!(res.status(), StatusCode::OK, "retry-as-JSON must succeed on 415"); + assert_eq!(attempts, 2); + Ok(()) +} + +/// 400 Bad Request is a validation failure — encoding is not the problem. +/// Retrying doubles relay load and hides the real error. MUST NOT retry. +#[tokio::test] +async fn test_submit_block_ssz_does_not_retry_on_400() -> Result<()> { + let (_res, attempts) = + submit_block_ssz_override(BuilderApiVersion::V1, StatusCode::BAD_REQUEST).await?; + assert_eq!(attempts, 1, "400 is not a media-type error; must not retry"); + Ok(()) +} + +/// 401 Unauthorized — auth problem, not encoding. No retry. +#[tokio::test] +async fn test_submit_block_ssz_does_not_retry_on_401() -> Result<()> { + let (_res, attempts) = + submit_block_ssz_override(BuilderApiVersion::V1, StatusCode::UNAUTHORIZED).await?; + assert_eq!(attempts, 1); + Ok(()) +} + +/// 409 Conflict — state mismatch. No retry. +#[tokio::test] +async fn test_submit_block_ssz_does_not_retry_on_409() -> Result<()> { + let (_res, attempts) = + submit_block_ssz_override(BuilderApiVersion::V1, StatusCode::CONFLICT).await?; + assert_eq!(attempts, 1); + Ok(()) +} + +/// 429 Too Many Requests — `PbsError::should_retry` already excludes this; +/// retrying as JSON would add insult to injury. No retry. +#[tokio::test] +async fn test_submit_block_ssz_does_not_retry_on_429() -> Result<()> { + let (_res, attempts) = + submit_block_ssz_override(BuilderApiVersion::V1, StatusCode::TOO_MANY_REQUESTS).await?; + assert_eq!(attempts, 1); + Ok(()) +} + +/// Same policy applies to the v2 endpoint. +#[tokio::test] +async fn test_submit_block_v2_ssz_retries_as_json_on_415() -> Result<()> { + let (res, attempts) = + submit_block_ssz_override(BuilderApiVersion::V2, StatusCode::UNSUPPORTED_MEDIA_TYPE) + .await?; + assert_eq!(res.status(), StatusCode::ACCEPTED, "v2 success is 202 Accepted"); + assert_eq!(attempts, 2); + Ok(()) +} + +/// v2 + 400: same no-retry rule as v1. +#[tokio::test] +async fn test_submit_block_v2_ssz_does_not_retry_on_400() -> Result<()> { + let (_res, attempts) = + submit_block_ssz_override(BuilderApiVersion::V2, StatusCode::BAD_REQUEST).await?; + assert_eq!(attempts, 1); + Ok(()) +} + +/// PBS must accept relay `Content-Type: application/octet-stream; +/// charset=binary` on `submit_block` responses. The audit fix for C2 switched +/// `EncodingType::from_str` to parse via the `mediatype` crate; this test +/// exercises the full relay→PBS→BN path to guard against regressions on the +/// v1 submit path. +#[tokio::test] +async fn test_submit_block_tolerates_mime_params_in_content_type() -> Result<()> { + setup_test_env(); + let signer = random_secret(); + let pubkey = signer.public_key(); + let chain = Chain::Holesky; + let pbs_listener = get_free_listener().await; + let relay_listener = get_free_listener().await; + let pbs_port = pbs_listener.local_addr().unwrap().port(); + let relay_port = relay_listener.local_addr().unwrap().port(); + + let mock_relay = generate_mock_relay(relay_port, pubkey)?; + let mut mock_relay_state = MockRelayState::new(chain, signer) + .with_response_content_type("application/octet-stream; charset=binary"); + mock_relay_state.supported_content_types = Arc::new(HashSet::from([EncodingType::Ssz])); + let mock_state = Arc::new(mock_relay_state); + tokio::spawn(start_mock_relay_service_with_listener(mock_state.clone(), relay_listener)); + + let pbs_config = get_pbs_config(pbs_port); + let config = to_pbs_config(chain, pbs_config, vec![mock_relay]); + let state = PbsState::new(config, PathBuf::new()); + drop(pbs_listener); + tokio::spawn(PbsService::run::<(), DefaultBuilderApi>(state)); + + tokio::time::sleep(Duration::from_millis(100)).await; + + let signed_blinded_block = load_test_signed_blinded_block(); + let mock_validator = MockValidator::new(pbs_port)?; + let res = mock_validator + .do_submit_block_v1( + Some(signed_blinded_block.clone()), + vec![EncodingType::Ssz], + EncodingType::Ssz, + ForkName::Electra, + ) + .await?; + assert_eq!(res.status(), StatusCode::OK, "PBS should tolerate `; charset=binary` MIME param"); + assert_eq!(mock_state.received_submit_block(), 1); + + let bytes = res.bytes().await?; + let response_body = PayloadAndBlobs::from_ssz_bytes_by_fork(&bytes, ForkName::Electra).unwrap(); + assert_eq!( + response_body.execution_payload.block_hash(), + signed_blinded_block.block_hash().into() + ); + Ok(()) +}