diff --git a/crates/blockchain/src/aggregation.rs b/crates/blockchain/src/aggregation.rs index 07829164..e100b035 100644 --- a/crates/blockchain/src/aggregation.rs +++ b/crates/blockchain/src/aggregation.rs @@ -290,10 +290,12 @@ pub fn aggregate_job(job: AggregationJob) -> Option { participants.dedup(); let aggregation_bits = aggregation_bits_from_validator_indices(&participants); + let proof = AggregatedSignatureProof::new(aggregation_bits, proof_data); + metrics::observe_aggregated_proof_size(proof.proof_data.len()); Some(AggregatedGroupOutput { hashed: job.hashed, - proof: AggregatedSignatureProof::new(aggregation_bits, proof_data), + proof, participants, keys_to_delete: job.keys_to_delete, }) diff --git a/crates/blockchain/src/metrics.rs b/crates/blockchain/src/metrics.rs index a14dd06a..2307dff4 100644 --- a/crates/blockchain/src/metrics.rs +++ b/crates/blockchain/src/metrics.rs @@ -266,6 +266,25 @@ static LEAN_PQ_SIG_AGGREGATED_SIGNATURES_VERIFICATION_TIME_SECONDS: std::sync::L .unwrap() }); +static LEAN_AGGREGATED_PROOF_SIZE_BYTES: std::sync::LazyLock = + std::sync::LazyLock::new(|| { + register_histogram!( + "lean_aggregated_proof_size_bytes", + "Bytes size of an aggregated signature proof's proof_data field", + vec![ + 1024.0, + 4096.0, + 16384.0, + 65536.0, + 131_072.0, + 262_144.0, + 524_288.0, + 1_048_576.0 + ] + ) + .unwrap() + }); + static LEAN_COMMITTEE_SIGNATURES_AGGREGATION_TIME_SECONDS: std::sync::LazyLock = std::sync::LazyLock::new(|| { register_histogram!( @@ -396,6 +415,7 @@ pub fn init() { std::sync::LazyLock::force(&LEAN_PQ_SIG_AGGREGATED_SIGNATURES_BUILDING_TIME_SECONDS); std::sync::LazyLock::force(&LEAN_PQ_SIG_AGGREGATED_SIGNATURES_VERIFICATION_TIME_SECONDS); std::sync::LazyLock::force(&LEAN_COMMITTEE_SIGNATURES_AGGREGATION_TIME_SECONDS); + std::sync::LazyLock::force(&LEAN_AGGREGATED_PROOF_SIZE_BYTES); std::sync::LazyLock::force(&LEAN_FORK_CHOICE_REORG_DEPTH); // Block production std::sync::LazyLock::force(&LEAN_BLOCK_AGGREGATED_PAYLOADS); @@ -530,6 +550,11 @@ pub fn time_pq_sig_aggregated_signatures_verification() -> TimingGuard { TimingGuard::new(&LEAN_PQ_SIG_AGGREGATED_SIGNATURES_VERIFICATION_TIME_SECONDS) } +/// Observe the size of an aggregated signature proof's `proof_data` bytes. +pub fn observe_aggregated_proof_size(bytes: usize) { + LEAN_AGGREGATED_PROOF_SIZE_BYTES.observe(bytes as f64); +} + /// Observe committee-signature aggregation duration. Measured in the /// off-thread worker and reported back via an `AggregationDone` message, so a /// drop-guard that crosses the thread boundary is not appropriate here. diff --git a/crates/common/metrics/src/lib.rs b/crates/common/metrics/src/lib.rs index 83539a2e..a1a19e37 100644 --- a/crates/common/metrics/src/lib.rs +++ b/crates/common/metrics/src/lib.rs @@ -5,9 +5,10 @@ pub mod timing; // Re-export prometheus types and macros we use pub use prometheus::{ - Encoder, Error as PrometheusError, Histogram, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, - TextEncoder, core::Collector, gather, register_histogram, register_int_counter, - register_int_counter_vec, register_int_gauge, register_int_gauge_vec, + Encoder, Error as PrometheusError, Histogram, HistogramVec, IntCounter, IntCounterVec, + IntGauge, IntGaugeVec, TextEncoder, core::Collector, gather, register_histogram, + register_histogram_vec, register_int_counter, register_int_counter_vec, register_int_gauge, + register_int_gauge_vec, }; // Re-export commonly used items diff --git a/crates/net/p2p/src/gossipsub/handler.rs b/crates/net/p2p/src/gossipsub/handler.rs index 4513b3e2..c257006b 100644 --- a/crates/net/p2p/src/gossipsub/handler.rs +++ b/crates/net/p2p/src/gossipsub/handler.rs @@ -31,12 +31,13 @@ pub async fn handle_gossipsub_message(server: &mut P2PServer, event: Event) { match topic_kind { Some(BLOCK_TOPIC_KIND) => { info!(kind = "block", peer_count, "P2P message received"); + let compressed_len = message.data.len(); let Ok(uncompressed_data) = decompress_message(&message.data) .inspect_err(|err| error!(%err, "Failed to decompress gossipped block")) else { return; }; - metrics::observe_gossip_block_size(uncompressed_data.len()); + metrics::observe_gossip_block_size(uncompressed_data.len(), compressed_len); let Ok(signed_block) = SignedBlock::from_ssz_bytes(&uncompressed_data) .inspect_err(|err| error!(?err, "Failed to decode gossipped block")) @@ -64,12 +65,13 @@ pub async fn handle_gossipsub_message(server: &mut P2PServer, event: Event) { } Some(AGGREGATION_TOPIC_KIND) => { info!(kind = "aggregation", peer_count, "P2P message received"); + let compressed_len = message.data.len(); let Ok(uncompressed_data) = decompress_message(&message.data) .inspect_err(|err| error!(%err, "Failed to decompress gossipped aggregation")) else { return; }; - metrics::observe_gossip_aggregation_size(uncompressed_data.len()); + metrics::observe_gossip_aggregation_size(uncompressed_data.len(), compressed_len); let Ok(aggregation) = SignedAggregatedAttestation::from_ssz_bytes(&uncompressed_data) .inspect_err(|err| error!(?err, "Failed to decode gossipped aggregation")) @@ -95,12 +97,13 @@ pub async fn handle_gossipsub_message(server: &mut P2PServer, event: Event) { } Some(kind) if kind.starts_with(ATTESTATION_SUBNET_TOPIC_PREFIX) => { info!(kind = "attestation", peer_count, "P2P message received"); + let compressed_len = message.data.len(); let Ok(uncompressed_data) = decompress_message(&message.data) .inspect_err(|err| error!(%err, "Failed to decompress gossipped attestation")) else { return; }; - metrics::observe_gossip_attestation_size(uncompressed_data.len()); + metrics::observe_gossip_attestation_size(uncompressed_data.len(), compressed_len); let Ok(signed_attestation) = SignedAttestation::from_ssz_bytes(&uncompressed_data) .inspect_err(|err| error!(?err, "Failed to decode gossipped attestation")) @@ -142,6 +145,8 @@ pub async fn publish_attestation(server: &mut P2PServer, attestation: SignedAtte // Compress with raw snappy let compressed = compress_message(&ssz_bytes); + metrics::observe_gossip_attestation_size(ssz_bytes.len(), compressed.len()); + // Look up subscribed topic or construct on-the-fly for gossipsub fanout let topic = server .attestation_topics @@ -175,6 +180,8 @@ pub async fn publish_block(server: &mut P2PServer, signed_block: SignedBlock) { // Compress with raw snappy let compressed = compress_message(&ssz_bytes); + metrics::observe_gossip_block_size(ssz_bytes.len(), compressed.len()); + // Publish to gossipsub server .swarm_handle @@ -201,6 +208,8 @@ pub async fn publish_aggregated_attestation( // Compress with raw snappy let compressed = compress_message(&ssz_bytes); + metrics::observe_gossip_aggregation_size(ssz_bytes.len(), compressed.len()); + // Publish to the aggregation topic server .swarm_handle diff --git a/crates/net/p2p/src/metrics.rs b/crates/net/p2p/src/metrics.rs index 19845258..39f197e1 100644 --- a/crates/net/p2p/src/metrics.rs +++ b/crates/net/p2p/src/metrics.rs @@ -78,11 +78,16 @@ static LEAN_PEER_DISCONNECTION_EVENTS_TOTAL: LazyLock = LazyLock: }); // --- Gossip Message Size Histograms --- +// +// `compression` label values: +// - `"raw"`: size of SSZ-encoded payload before snappy compression +// - `"snappy"`: size of the on-wire snappy-compressed payload -static LEAN_GOSSIP_BLOCK_SIZE_BYTES: LazyLock = LazyLock::new(|| { - register_histogram!( +static LEAN_GOSSIP_BLOCK_SIZE_BYTES: LazyLock = LazyLock::new(|| { + register_histogram_vec!( "lean_gossip_block_size_bytes", "Bytes size of a gossip block message", + &["compression"], vec![ 10_000.0, 50_000.0, @@ -97,19 +102,21 @@ static LEAN_GOSSIP_BLOCK_SIZE_BYTES: LazyLock = LazyLock::new(|| { .unwrap() }); -static LEAN_GOSSIP_ATTESTATION_SIZE_BYTES: LazyLock = LazyLock::new(|| { - register_histogram!( +static LEAN_GOSSIP_ATTESTATION_SIZE_BYTES: LazyLock = LazyLock::new(|| { + register_histogram_vec!( "lean_gossip_attestation_size_bytes", "Bytes size of a gossip attestation message", + &["compression"], vec![512.0, 1024.0, 2048.0, 4096.0, 8192.0, 16384.0] ) .unwrap() }); -static LEAN_GOSSIP_AGGREGATION_SIZE_BYTES: LazyLock = LazyLock::new(|| { - register_histogram!( +static LEAN_GOSSIP_AGGREGATION_SIZE_BYTES: LazyLock = LazyLock::new(|| { + register_histogram_vec!( "lean_gossip_aggregation_size_bytes", "Bytes size of a gossip aggregated attestation message", + &["compression"], vec![ 1024.0, 4096.0, @@ -124,19 +131,94 @@ static LEAN_GOSSIP_AGGREGATION_SIZE_BYTES: LazyLock = LazyLock::new(| .unwrap() }); -/// Observe the size of a gossip block message. -pub fn observe_gossip_block_size(bytes: usize) { - LEAN_GOSSIP_BLOCK_SIZE_BYTES.observe(bytes as f64); +/// Observe the size of a gossip block message, recording both the raw SSZ +/// size and the snappy-compressed on-wire size. +pub fn observe_gossip_block_size(raw: usize, snappy: usize) { + LEAN_GOSSIP_BLOCK_SIZE_BYTES + .with_label_values(&["raw"]) + .observe(raw as f64); + LEAN_GOSSIP_BLOCK_SIZE_BYTES + .with_label_values(&["snappy"]) + .observe(snappy as f64); } -/// Observe the size of a gossip attestation message. -pub fn observe_gossip_attestation_size(bytes: usize) { - LEAN_GOSSIP_ATTESTATION_SIZE_BYTES.observe(bytes as f64); +/// Observe the size of a gossip attestation message, recording both the raw +/// SSZ size and the snappy-compressed on-wire size. +pub fn observe_gossip_attestation_size(raw: usize, snappy: usize) { + LEAN_GOSSIP_ATTESTATION_SIZE_BYTES + .with_label_values(&["raw"]) + .observe(raw as f64); + LEAN_GOSSIP_ATTESTATION_SIZE_BYTES + .with_label_values(&["snappy"]) + .observe(snappy as f64); } -/// Observe the size of a gossip aggregated attestation message. -pub fn observe_gossip_aggregation_size(bytes: usize) { - LEAN_GOSSIP_AGGREGATION_SIZE_BYTES.observe(bytes as f64); +/// Observe the size of a gossip aggregated attestation message, recording both +/// the raw SSZ size and the snappy-compressed on-wire size. +pub fn observe_gossip_aggregation_size(raw: usize, snappy: usize) { + LEAN_GOSSIP_AGGREGATION_SIZE_BYTES + .with_label_values(&["raw"]) + .observe(raw as f64); + LEAN_GOSSIP_AGGREGATION_SIZE_BYTES + .with_label_values(&["snappy"]) + .observe(snappy as f64); +} + +// --- Req/Resp Message Size Histograms --- +// +// `protocol` label: `"status"` or `"blocks_by_root"`. +// `compression` label: `"raw"` (SSZ) or `"snappy"` (on-wire, varint-prefixed +// snappy frame bytes only — the response-code byte is not included). + +static LEAN_REQRESP_REQUEST_SIZE_BYTES: LazyLock = LazyLock::new(|| { + register_histogram_vec!( + "lean_reqresp_request_size_bytes", + "Bytes size of a req/resp request", + &["protocol", "compression"], + vec![64.0, 128.0, 256.0, 512.0, 1024.0, 4096.0, 16384.0, 65536.0] + ) + .unwrap() +}); + +static LEAN_REQRESP_RESPONSE_CHUNK_SIZE_BYTES: LazyLock = LazyLock::new(|| { + register_histogram_vec!( + "lean_reqresp_response_chunk_size_bytes", + "Bytes size of a single req/resp response chunk", + &["protocol", "compression"], + vec![ + 128.0, + 1024.0, + 10_000.0, + 100_000.0, + 500_000.0, + 1_000_000.0, + 5_000_000.0, + 10_000_000.0 + ] + ) + .unwrap() +}); + +/// Observe the size of a req/resp request, recording both the raw SSZ size +/// and the snappy-compressed on-wire size. +pub fn observe_reqresp_request_size(protocol: &str, raw: usize, snappy: usize) { + LEAN_REQRESP_REQUEST_SIZE_BYTES + .with_label_values(&[protocol, "raw"]) + .observe(raw as f64); + LEAN_REQRESP_REQUEST_SIZE_BYTES + .with_label_values(&[protocol, "snappy"]) + .observe(snappy as f64); +} + +/// Observe the size of a single req/resp response chunk, recording both the +/// raw SSZ size and the snappy-compressed on-wire size. +pub fn observe_reqresp_response_chunk_size(protocol: &str, raw: usize, snappy: usize) { + LEAN_REQRESP_RESPONSE_CHUNK_SIZE_BYTES + .with_label_values(&[protocol, "raw"]) + .observe(raw as f64); + LEAN_REQRESP_RESPONSE_CHUNK_SIZE_BYTES + .with_label_values(&[protocol, "snappy"]) + .observe(snappy as f64); } /// Set the attestation committee subnet gauge. diff --git a/crates/net/p2p/src/req_resp/codec.rs b/crates/net/p2p/src/req_resp/codec.rs index e85f440a..cbd30a70 100644 --- a/crates/net/p2p/src/req_resp/codec.rs +++ b/crates/net/p2p/src/req_resp/codec.rs @@ -12,8 +12,19 @@ use super::{ }, }; +use crate::metrics; use ethlambda_types::block::SignedBlock; +/// Short label extracted from a libp2p protocol id, used as the `protocol` +/// label on req/resp size metrics. +fn protocol_label(protocol: &str) -> &'static str { + match protocol { + STATUS_PROTOCOL_V1 => "status", + BLOCKS_BY_ROOT_PROTOCOL_V1 => "blocks_by_root", + _ => "unknown", + } +} + #[derive(Debug, Clone, Default)] pub struct Codec; @@ -30,7 +41,10 @@ impl libp2p::request_response::Codec for Codec { where T: AsyncRead + Unpin + Send, { - let payload = decode_payload(io).await?; + let decoded = decode_payload(io).await?; + let payload = decoded.uncompressed; + let label = protocol_label(protocol.as_ref()); + metrics::observe_reqresp_request_size(label, payload.len(), decoded.compressed_size); match protocol.as_ref() { STATUS_PROTOCOL_V1 => { @@ -60,9 +74,10 @@ impl libp2p::request_response::Codec for Codec { where T: AsyncRead + Unpin + Send, { + let label = protocol_label(protocol.as_ref()); match protocol.as_ref() { - STATUS_PROTOCOL_V1 => decode_status_response(io).await, - BLOCKS_BY_ROOT_PROTOCOL_V1 => decode_blocks_by_root_response(io).await, + STATUS_PROTOCOL_V1 => decode_status_response(io, label).await, + BLOCKS_BY_ROOT_PROTOCOL_V1 => decode_blocks_by_root_response(io, label).await, _ => Err(io::Error::new( io::ErrorKind::InvalidData, format!("unknown protocol: {}", protocol.as_ref()), @@ -72,7 +87,7 @@ impl libp2p::request_response::Codec for Codec { async fn write_request( &mut self, - _: &Self::Protocol, + protocol: &Self::Protocol, io: &mut T, req: Self::Request, ) -> io::Result<()> @@ -86,18 +101,22 @@ impl libp2p::request_response::Codec for Codec { Request::BlocksByRoot(request) => request.to_ssz(), }; - write_payload(io, &encoded).await + let compressed_size = write_payload(io, &encoded).await?; + let label = protocol_label(protocol.as_ref()); + metrics::observe_reqresp_request_size(label, encoded.len(), compressed_size); + Ok(()) } async fn write_response( &mut self, - _: &Self::Protocol, + protocol: &Self::Protocol, io: &mut T, resp: Self::Response, ) -> io::Result<()> where T: AsyncWrite + Unpin + Send, { + let label = protocol_label(protocol.as_ref()); match resp { Response::Success { payload } => { match &payload { @@ -105,7 +124,13 @@ impl libp2p::request_response::Codec for Codec { // Send success code (0) io.write_all(&[ResponseCode::SUCCESS.into()]).await?; let encoded = status.to_ssz(); - write_payload(io, &encoded).await + let compressed_size = write_payload(io, &encoded).await?; + metrics::observe_reqresp_response_chunk_size( + label, + encoded.len(), + compressed_size, + ); + Ok(()) } ResponsePayload::BlocksByRoot(blocks) => { // Write each block as a separate chunk. @@ -123,7 +148,12 @@ impl libp2p::request_response::Codec for Codec { continue; } io.write_all(&[ResponseCode::SUCCESS.into()]).await?; - write_payload(io, &encoded).await?; + let compressed_size = write_payload(io, &encoded).await?; + metrics::observe_reqresp_response_chunk_size( + label, + encoded.len(), + compressed_size, + ); } // Empty response if no blocks found (stream just ends) Ok(()) @@ -137,7 +167,9 @@ impl libp2p::request_response::Codec for Codec { // Error messages are SSZ-encoded as List[byte, 256] let encoded = message.to_ssz(); - write_payload(io, &encoded).await + let compressed_size = write_payload(io, &encoded).await?; + metrics::observe_reqresp_response_chunk_size(label, encoded.len(), compressed_size); + Ok(()) } } } @@ -164,7 +196,7 @@ impl libp2p::request_response::Codec for Codec { /// - I/O error occurs while reading the response code or payload /// - Peer's error message cannot be SSZ-decoded (InvalidData) /// - Peer's Status payload cannot be SSZ-decoded (InvalidData) -async fn decode_status_response(io: &mut T) -> io::Result +async fn decode_status_response(io: &mut T, protocol_label: &str) -> io::Result where T: AsyncRead + Unpin + Send, { @@ -173,7 +205,13 @@ where .await?; let code = ResponseCode::from(result_byte); - let payload = decode_payload(io).await?; + let decoded = decode_payload(io).await?; + let payload = decoded.uncompressed; + metrics::observe_reqresp_response_chunk_size( + protocol_label, + payload.len(), + decoded.compressed_size, + ); if code != ResponseCode::SUCCESS { let message = ErrorMessage::from_ssz_bytes(&payload).map_err(|err| { @@ -215,7 +253,7 @@ where /// /// Note: Error chunks from the peer (non-SUCCESS response codes) do not cause this /// function to return `Err` - they are logged and skipped. -async fn decode_blocks_by_root_response(io: &mut T) -> io::Result +async fn decode_blocks_by_root_response(io: &mut T, protocol_label: &str) -> io::Result where T: AsyncRead + Unpin + Send, { @@ -232,7 +270,13 @@ where } let code = ResponseCode::from(result_byte); - let payload = decode_payload(io).await?; + let decoded = decode_payload(io).await?; + let payload = decoded.uncompressed; + metrics::observe_reqresp_response_chunk_size( + protocol_label, + payload.len(), + decoded.compressed_size, + ); if code != ResponseCode::SUCCESS { let error_message = ErrorMessage::from_ssz_bytes(&payload) diff --git a/crates/net/p2p/src/req_resp/encoding.rs b/crates/net/p2p/src/req_resp/encoding.rs index 7a4116c4..3cb26b0a 100644 --- a/crates/net/p2p/src/req_resp/encoding.rs +++ b/crates/net/p2p/src/req_resp/encoding.rs @@ -8,8 +8,25 @@ pub const MAX_PAYLOAD_SIZE: usize = 10 * 1024 * 1024; // 10 MB // https://github.com/ethereum/consensus-specs/blob/master/specs/phase0/p2p-interface.md#max_message_size pub const MAX_COMPRESSED_PAYLOAD_SIZE: usize = 32 + MAX_PAYLOAD_SIZE + MAX_PAYLOAD_SIZE / 6 + 1024; // ~12 MB +/// Decoded payload together with the size of its on-wire snappy-compressed +/// bytes (excluding the varint length prefix). +/// +/// `compressed_size` is accurate for single-chunk streams (Status request / +/// response, BlocksByRoot request). For multi-chunk streams (BlocksByRoot +/// response) the value is over-reported on the first chunk because +/// `decode_payload` slurps the whole stream via `read_to_end` before parsing +/// the first varint, so `compressed_size` measures everything left after that +/// varint rather than just this chunk's snappy frame. The metric is still +/// useful for single-chunk traffic and as an order-of-magnitude signal on +/// multi-chunk responses; precise per-chunk accounting would require refactoring +/// `decode_payload` to read one varint + one snappy frame at a time. +pub struct DecodedPayload { + pub uncompressed: Vec, + pub compressed_size: usize, +} + /// Decode a varint-prefixed, snappy-compressed SSZ payload from an async reader. -pub async fn decode_payload(io: &mut T) -> io::Result> +pub async fn decode_payload(io: &mut T) -> io::Result where T: AsyncRead + Unpin + Send, { @@ -26,6 +43,7 @@ where )); } let (size, rest) = decode_varint(&buf)?; + let compressed_size = rest.len(); if size as usize > MAX_PAYLOAD_SIZE { return Err(io::Error::new( @@ -44,10 +62,15 @@ where )); } - Ok(uncompressed) + Ok(DecodedPayload { + uncompressed, + compressed_size, + }) } -pub async fn write_payload(io: &mut T, encoded: &[u8]) -> io::Result<()> +/// Write a varint-prefixed, snappy-compressed SSZ payload. Returns the size +/// of the snappy-compressed bytes (excluding the varint length prefix). +pub async fn write_payload(io: &mut T, encoded: &[u8]) -> io::Result where T: AsyncWrite + Unpin, { @@ -70,7 +93,7 @@ where io.write_all(varint_buf).await?; io.write_all(&buf).await?; - Ok(()) + Ok(buf.len()) } /// Encodes a u32 as a varint into the provided buffer, returning a slice of the buffer diff --git a/docs/metrics.md b/docs/metrics.md index 3c956fd8..e2611f16 100644 --- a/docs/metrics.md +++ b/docs/metrics.md @@ -80,6 +80,26 @@ The exposed metrics follow [the leanMetrics specification](https://github.com/le |`lean_peer_connection_events_total`| Counter | Total number of peer connection events | On peer connection | direction=inbound,outbound
result=success,timeout,error | ✅ | |`lean_peer_disconnection_events_total`| Counter | Total number of peer disconnection events | On peer disconnection | direction=inbound,outbound
reason=timeout,remote_close,local_close,error | ✅ | +## Custom Metrics (non-leanMetrics) + +The metrics below are not part of the [leanMetrics specification](https://github.com/leanEthereum/leanMetrics/blob/2719baad8351c9ad5eaf3c8621f33fcec20a1dc7/metrics.md). They are ethlambda-specific observability around on-wire message sizes and post-quantum aggregated proof sizes. + +### PQ Signature Sizes + +| Name | Type | Usage | Sample collection event | Labels | Buckets | +|------|------|-------|-------------------------|--------|---------| +| `lean_aggregated_proof_size_bytes` | Histogram | Bytes size of an aggregated signature proof's `proof_data` field | On aggregated signature production | | 1024, 4096, 16384, 65536, 131072, 262144, 524288, 1048576 | + +### Network Sizes + +| Name | Type | Usage | Sample collection event | Labels | Buckets | +|------|------|-------|-------------------------|--------|---------| +| `lean_gossip_block_size_bytes` | Histogram | Bytes size of a gossip block message (raw SSZ or snappy on-wire) | On gossip block send/receive | compression=raw,snappy | 10000, 50000, 100000, 250000, 500000, 1000000, 2000000, 5000000 | +| `lean_gossip_attestation_size_bytes` | Histogram | Bytes size of a gossip attestation message (raw SSZ or snappy on-wire) | On gossip attestation send/receive | compression=raw,snappy | 512, 1024, 2048, 4096, 8192, 16384 | +| `lean_gossip_aggregation_size_bytes` | Histogram | Bytes size of a gossip aggregated attestation message (raw SSZ or snappy on-wire) | On gossip aggregation send/receive | compression=raw,snappy | 1024, 4096, 16384, 65536, 131072, 262144, 524288, 1048576 | +| `lean_reqresp_request_size_bytes` | Histogram | Bytes size of a req/resp request (raw SSZ or snappy on-wire) | On req/resp request send/receive | protocol=status,blocks_by_root
compression=raw,snappy | 64, 128, 256, 512, 1024, 4096, 16384, 65536 | +| `lean_reqresp_response_chunk_size_bytes` | Histogram | Bytes size of a single req/resp response chunk (raw SSZ or snappy on-wire) | On req/resp response chunk send/receive | protocol=status,blocks_by_root
compression=raw,snappy | 128, 1024, 10000, 100000, 500000, 1000000, 5000000, 10000000 | + --- ✅(*) **Partial support**: These metrics are implemented but not collected "on scrape" as the spec requires. They are updated on specific events (e.g., on tick, on block processing) rather than being computed fresh on each Prometheus scrape.