From 300d4566fd3359a703a00e2f62a1a3a2ab7c40fa Mon Sep 17 00:00:00 2001 From: Yoenn Burban Date: Wed, 27 May 2026 17:48:01 +0200 Subject: [PATCH 1/2] add http observability to aws connector --- src/aws/mod.rs | 113 +++++++++++++++--- src/internal_events/http_client.rs | 185 ++++++++++++++++++++++------- 2 files changed, 241 insertions(+), 57 deletions(-) diff --git a/src/aws/mod.rs b/src/aws/mod.rs index 35cb39cfdf9c7..41d5c8d1120fe 100644 --- a/src/aws/mod.rs +++ b/src/aws/mod.rs @@ -11,7 +11,7 @@ use std::{ atomic::{AtomicUsize, Ordering}, }, task::{Context, Poll}, - time::{Duration, SystemTime}, + time::{Duration, Instant, SystemTime}, }; pub use auth::{AwsAuthentication, ImdsAuthentication}; @@ -37,8 +37,7 @@ use aws_smithy_runtime_api::client::{ use aws_smithy_types::body::SdkBody; use aws_types::sdk_config::SharedHttpClient; use bytes::Bytes; -use futures_util::FutureExt; -use http::HeaderMap; +use http::{HeaderMap, header::HeaderValue}; use http_body::{Body, combinators::BoxBody}; use pin_project::pin_project; use regex::RegexSet; @@ -49,7 +48,13 @@ pub use timeout::AwsTimeout; use crate::{ config::ProxyConfig, http::{build_proxy_connector, build_tls_connector, status}, - internal_events::AwsBytesSent, + internal_events::{ + AwsBytesSent, + http_client::{ + AboutToSendHttpRequest, GotHttpResponse, GotHttpWarning, HttpRequestTelemetry, + HttpResponseTelemetry, + }, + }, tls::{MaybeTlsSettings, TlsConfig}, }; @@ -350,12 +355,77 @@ struct AwsConnector { region: Region, } +// ── Telemetry trait implementations for the AWS SDK HTTP types ──────────────── +// +// `HttpRequest` and `HttpResponse` are the SDK's own structs (not `http::Request` +// / `http::Response`), so they cannot implement the hyper-specific body/version +// accessors. The required method impls (method/uri and status) are enough for +// metric labels; the optional rich-logging fields fall back to `None`. + +impl HttpRequestTelemetry for HttpRequest { + fn method(&self) -> &str { + self.method() + } + + fn uri(&self) -> String { + self.uri().to_string() + } + + fn headers(&self) -> HeaderMap { + smithy_headers_to_map(self.headers()) + } + + fn body_size_hint(&self) -> (u64, Option) { + let hint = Body::size_hint(self.body()); + (hint.lower(), hint.upper()) + } +} + +impl HttpResponseTelemetry for HttpResponse { + fn status_u16(&self) -> u16 { + self.status().as_u16() + } + + fn headers(&self) -> HeaderMap { + smithy_headers_to_map(self.headers()) + } + + fn body_size_hint(&self) -> (u64, Option) { + let hint = Body::size_hint(self.body()); + (hint.lower(), hint.upper()) + } +} + +/// Converts the AWS SDK's string-pair header iterator into an `http::HeaderMap`. +/// Sanitization (marking sensitive headers) is handled by the trait's default +/// `sanitized_headers` method. +fn smithy_headers_to_map( + headers: &aws_smithy_runtime_api::http::Headers, +) -> HeaderMap { + let mut map = HeaderMap::with_capacity(headers.len()); + for (name, value) in headers { + let Ok(header_name) = http::HeaderName::from_bytes(name.as_bytes()) else { + continue; + }; + let Ok(header_value) = HeaderValue::from_str(value) else { + continue; + }; + map.insert(header_name, header_value); + } + map +} + +// ───────────────────────────────────────────────────────────────────────────── + impl HttpConnector for AwsConnector where T: HttpConnector, { fn call(&self, req: HttpRequest) -> HttpConnectorFuture { - let bytes_sent = Arc::new(std::sync::atomic::AtomicUsize::new(0)); + let bytes_sent = Arc::new(AtomicUsize::new(0)); + + emit!(AboutToSendHttpRequest { request: &req }); + let req = req.map(|body| { let bytes_sent = Arc::clone(&bytes_sent); body.map_preserve_contents(move |body| { @@ -367,17 +437,32 @@ where let fut = self.http.call(req); let region = self.region.clone(); - HttpConnectorFuture::new(fut.inspect(move |result| { + HttpConnectorFuture::new(async move { + let before = Instant::now(); + let result = fut.await; + let roundtrip = before.elapsed(); let byte_size = bytes_sent.load(Ordering::Relaxed); - if let Ok(result) = result - && result.status().is_success() - { - emit!(AwsBytesSent { - byte_size, - region: Some(region), - }); + + match &result { + Ok(response) => { + if response.status().is_success() { + emit!(AwsBytesSent { + byte_size, + region: Some(region), + }); + } + emit!(GotHttpResponse { + response, + roundtrip + }); + } + Err(error) => { + emit!(GotHttpWarning { error, roundtrip }); + } } - })) + + result + }) } } diff --git a/src/internal_events/http_client.rs b/src/internal_events/http_client.rs index 346192dfca141..9b0a0f25ddf62 100644 --- a/src/internal_events/http_client.rs +++ b/src/internal_events/http_client.rs @@ -1,44 +1,125 @@ use std::time::Duration; use http::{ - Request, Response, - header::{self, HeaderMap, HeaderValue}, + HeaderMap, Request, Response, Version, + header::{self, HeaderValue}, }; -use hyper::{Error, body::HttpBody}; +use hyper::body::HttpBody; use vector_lib::{ NamedInternalEvent, counter, histogram, internal_event::{CounterName, HistogramName, InternalEvent, error_stage, error_type}, }; -#[derive(Debug, NamedInternalEvent)] -pub struct AboutToSendHttpRequest<'a, T> { - pub request: &'a Request, +// ── Telemetry traits ────────────────────────────────────────────────────────── + +/// Provides the data required to emit HTTP request telemetry. +/// +/// `method`, `uri`, `sanitized_headers`, and `body_debug` are required; every +/// transport must implement them. +/// +/// `version` is optional (default: `None`) because some transports — notably +/// the AWS SDK connector layer — do not expose HTTP version in their request +/// type. +pub trait HttpRequestTelemetry { + fn method(&self) -> &str; + fn uri(&self) -> String; + fn headers(&self) -> HeaderMap; + /// Returns the body size bounds as `(lower, upper)`. + fn body_size_hint(&self) -> (u64, Option); + /// Returns the HTTP version when the transport exposes it. + fn version(&self) -> Option { + None + } + + /// Returns the headers with sensitive values redacted. + /// + /// Provided by default; implementors only need to implement [`headers`]. + fn sanitized_headers(&self) -> HeaderMap { + remove_sensitive(self.headers()) + } } -fn remove_sensitive(headers: &HeaderMap) -> HeaderMap { - let mut headers = headers.clone(); - for name in &[ - header::AUTHORIZATION, - header::PROXY_AUTHORIZATION, - header::COOKIE, - header::SET_COOKIE, - ] { - if let Some(value) = headers.get_mut(name) { - value.set_sensitive(true); - } +/// Provides the data required to emit HTTP response telemetry. +/// +/// Same rationale as [`HttpRequestTelemetry`]: `version` is optional. +pub trait HttpResponseTelemetry { + fn status_u16(&self) -> u16; + fn headers(&self) -> HeaderMap; + /// Returns the body size bounds as `(lower, upper)`. + fn body_size_hint(&self) -> (u64, Option); + /// Returns the HTTP version when the transport exposes it. + fn version(&self) -> Option { + None } - headers + + /// Returns the headers with sensitive values redacted. + /// + /// Provided by default; implementors only need to implement [`headers`]. + fn sanitized_headers(&self) -> HeaderMap { + remove_sensitive(self.headers()) + } +} + +// ── Implementations for the hyper HTTP types (full data) ────────────────────── + +impl HttpRequestTelemetry for Request { + fn method(&self) -> &str { + self.method().as_str() + } + + fn uri(&self) -> String { + self.uri().to_string() + } + + fn headers(&self) -> HeaderMap { + self.headers().clone() + } + + fn body_size_hint(&self) -> (u64, Option) { + let hint = self.body().size_hint(); + (hint.lower(), hint.upper()) + } + + fn version(&self) -> Option { + Some(self.version()) + } +} + +impl HttpResponseTelemetry for Response { + fn status_u16(&self) -> u16 { + self.status().as_u16() + } + + fn headers(&self) -> HeaderMap { + self.headers().clone() + } + + fn body_size_hint(&self) -> (u64, Option) { + let hint = self.body().size_hint(); + (hint.lower(), hint.upper()) + } + + fn version(&self) -> Option { + Some(self.version()) + } +} + +// ── Events ──────────────────────────────────────────────────────────────────── + +#[derive(Debug, NamedInternalEvent)] +pub struct AboutToSendHttpRequest<'a, T: HttpRequestTelemetry> { + pub request: &'a T, } -impl InternalEvent for AboutToSendHttpRequest<'_, T> { +impl InternalEvent for AboutToSendHttpRequest<'_, T> { fn emit(self) { debug!( message = "Sending HTTP request.", uri = %self.request.uri(), method = %self.request.method(), version = ?self.request.version(), - headers = ?remove_sensitive(self.request.headers()), - body = %FormatBody(self.request.body()), + headers = ?self.request.sanitized_headers(), + body = %FormatBodySizeHint::from(self.request.body_size_hint()), ); counter!(CounterName::HttpClientRequestsSentTotal, "method" => self.request.method().to_string()) .increment(1); @@ -46,37 +127,34 @@ impl InternalEvent for AboutToSendHttpRequest<'_, T> { } #[derive(Debug, NamedInternalEvent)] -pub struct GotHttpResponse<'a, T> { - pub response: &'a Response, +pub struct GotHttpResponse<'a, T: HttpResponseTelemetry> { + pub response: &'a T, pub roundtrip: Duration, } -impl InternalEvent for GotHttpResponse<'_, T> { +impl InternalEvent for GotHttpResponse<'_, T> { fn emit(self) { + let status = self.response.status_u16(); + let status_str = status.to_string(); debug!( message = "HTTP response.", - status = %self.response.status(), + status = %status, version = ?self.response.version(), - headers = ?remove_sensitive(self.response.headers()), - body = %FormatBody(self.response.body()), + headers = ?self.response.sanitized_headers(), + body = %FormatBodySizeHint::from(self.response.body_size_hint()), + ); - counter!( - CounterName::HttpClientResponsesTotal, - "status" => self.response.status().as_u16().to_string(), - ) - .increment(1); + counter!(CounterName::HttpClientResponsesTotal, "status" => status_str.clone()) + .increment(1); histogram!(HistogramName::HttpClientRttSeconds).record(self.roundtrip); - histogram!( - HistogramName::HttpClientResponseRttSeconds, - "status" => self.response.status().as_u16().to_string(), - ) - .record(self.roundtrip); + histogram!(HistogramName::HttpClientResponseRttSeconds, "status" => status_str) + .record(self.roundtrip); } } #[derive(Debug, NamedInternalEvent)] pub struct GotHttpWarning<'a> { - pub error: &'a Error, + pub error: &'a dyn std::error::Error, pub roundtrip: Duration, } @@ -96,13 +174,34 @@ impl InternalEvent for GotHttpWarning<'_> { } } -/// Newtype placeholder to provide a formatter for the request and response body. -struct FormatBody<'a, B>(&'a B); +// ── Helpers ─────────────────────────────────────────────────────────────────── + +fn remove_sensitive(mut headers: HeaderMap) -> HeaderMap { + for name in &[ + header::AUTHORIZATION, + header::PROXY_AUTHORIZATION, + header::COOKIE, + header::SET_COOKIE, + ] { + if let Some(value) = headers.get_mut(name) { + value.set_sensitive(true); + } + } + headers +} + +/// Formats a body size hint `(lower, upper)` for debug logging. +struct FormatBodySizeHint(u64, Option); + +impl From<(u64, Option)> for FormatBodySizeHint { + fn from((lower, upper): (u64, Option)) -> Self { + FormatBodySizeHint(lower, upper) + } +} -impl std::fmt::Display for FormatBody<'_, B> { +impl std::fmt::Display for FormatBodySizeHint { fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> { - let size = self.0.size_hint(); - match (size.lower(), size.upper()) { + match (self.0, self.1) { (0, None) => write!(fmt, "[unknown]"), (lower, None) => write!(fmt, "[>={lower} bytes]"), From 3959a94226dcbb414c76071566bfb93ead9f9e0c Mon Sep 17 00:00:00 2001 From: Yoenn Burban Date: Wed, 27 May 2026 17:49:20 +0200 Subject: [PATCH 2/2] add changelog --- .../12779_add_aws_connector_observability.enhancement.md | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 changelog.d/12779_add_aws_connector_observability.enhancement.md diff --git a/changelog.d/12779_add_aws_connector_observability.enhancement.md b/changelog.d/12779_add_aws_connector_observability.enhancement.md new file mode 100644 index 0000000000000..fb6802ce524ad --- /dev/null +++ b/changelog.d/12779_add_aws_connector_observability.enhancement.md @@ -0,0 +1,3 @@ +Added common internal HTTP metrics to the connector used by AWS sinks. + +authors: gwenaskell