Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Added common internal HTTP metrics to the connector used by AWS sinks.

authors: gwenaskell
113 changes: 99 additions & 14 deletions src/aws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::{
atomic::{AtomicUsize, Ordering},
},
task::{Context, Poll},
time::{Duration, SystemTime},
time::{Duration, Instant, SystemTime},
};

pub use auth::{AwsAuthentication, ImdsAuthentication};
Expand All @@ -37,8 +37,7 @@ use aws_smithy_runtime_api::client::{
use aws_smithy_types::body::SdkBody;
use aws_types::sdk_config::SharedHttpClient;
use bytes::Bytes;
use futures_util::FutureExt;
use http::HeaderMap;
use http::{HeaderMap, header::HeaderValue};
use http_body::{Body, combinators::BoxBody};
use pin_project::pin_project;
use regex::RegexSet;
Expand All @@ -49,7 +48,13 @@ pub use timeout::AwsTimeout;
use crate::{
config::ProxyConfig,
http::{build_proxy_connector, build_tls_connector, status},
internal_events::AwsBytesSent,
internal_events::{
AwsBytesSent,
http_client::{
AboutToSendHttpRequest, GotHttpResponse, GotHttpWarning, HttpRequestTelemetry,
HttpResponseTelemetry,
},
},
tls::{MaybeTlsSettings, TlsConfig},
};

Expand Down Expand Up @@ -350,12 +355,77 @@ struct AwsConnector<T> {
region: Region,
}

// ── Telemetry trait implementations for the AWS SDK HTTP types ────────────────
//
// `HttpRequest` and `HttpResponse` are the SDK's own structs (not `http::Request`
// / `http::Response`), so they cannot implement the hyper-specific body/version
// accessors. The required method impls (method/uri and status) are enough for
// metric labels; the optional rich-logging fields fall back to `None`.

impl HttpRequestTelemetry for HttpRequest {
fn method(&self) -> &str {
self.method()
}

fn uri(&self) -> String {
self.uri().to_string()
}

fn headers(&self) -> HeaderMap<HeaderValue> {
smithy_headers_to_map(self.headers())
}

fn body_size_hint(&self) -> (u64, Option<u64>) {
let hint = Body::size_hint(self.body());
(hint.lower(), hint.upper())
}
}

impl HttpResponseTelemetry for HttpResponse {
fn status_u16(&self) -> u16 {
self.status().as_u16()
}

fn headers(&self) -> HeaderMap<HeaderValue> {
smithy_headers_to_map(self.headers())
}

fn body_size_hint(&self) -> (u64, Option<u64>) {
let hint = Body::size_hint(self.body());
(hint.lower(), hint.upper())
}
}

/// Converts the AWS SDK's string-pair header iterator into an `http::HeaderMap`.
/// Sanitization (marking sensitive headers) is handled by the trait's default
/// `sanitized_headers` method.
fn smithy_headers_to_map(
headers: &aws_smithy_runtime_api::http::Headers,
) -> HeaderMap<HeaderValue> {
let mut map = HeaderMap::with_capacity(headers.len());
for (name, value) in headers {
let Ok(header_name) = http::HeaderName::from_bytes(name.as_bytes()) else {
continue;
};
let Ok(header_value) = HeaderValue::from_str(value) else {
continue;
};
map.insert(header_name, header_value);
}
map
}

// ─────────────────────────────────────────────────────────────────────────────

impl<T> HttpConnector for AwsConnector<T>
where
T: HttpConnector,
{
fn call(&self, req: HttpRequest) -> HttpConnectorFuture {
let bytes_sent = Arc::new(std::sync::atomic::AtomicUsize::new(0));
let bytes_sent = Arc::new(AtomicUsize::new(0));

emit!(AboutToSendHttpRequest { request: &req });

let req = req.map(|body| {
let bytes_sent = Arc::clone(&bytes_sent);
body.map_preserve_contents(move |body| {
Expand All @@ -367,17 +437,32 @@ where
let fut = self.http.call(req);
let region = self.region.clone();

HttpConnectorFuture::new(fut.inspect(move |result| {
HttpConnectorFuture::new(async move {
let before = Instant::now();
let result = fut.await;
let roundtrip = before.elapsed();
let byte_size = bytes_sent.load(Ordering::Relaxed);
if let Ok(result) = result
&& result.status().is_success()
{
emit!(AwsBytesSent {
byte_size,
region: Some(region),
});

match &result {
Ok(response) => {
if response.status().is_success() {
emit!(AwsBytesSent {
byte_size,
region: Some(region),
});
}
emit!(GotHttpResponse {
response,
roundtrip
});
}
Err(error) => {
emit!(GotHttpWarning { error, roundtrip });
}
}
}))

result
})
}
}

Expand Down
185 changes: 142 additions & 43 deletions src/internal_events/http_client.rs
Original file line number Diff line number Diff line change
@@ -1,82 +1,160 @@
use std::time::Duration;

use http::{
Request, Response,
header::{self, HeaderMap, HeaderValue},
HeaderMap, Request, Response, Version,
header::{self, HeaderValue},
};
use hyper::{Error, body::HttpBody};
use hyper::body::HttpBody;
use vector_lib::{
NamedInternalEvent, counter, histogram,
internal_event::{CounterName, HistogramName, InternalEvent, error_stage, error_type},
};

#[derive(Debug, NamedInternalEvent)]
pub struct AboutToSendHttpRequest<'a, T> {
pub request: &'a Request<T>,
// ── Telemetry traits ──────────────────────────────────────────────────────────

/// Provides the data required to emit HTTP request telemetry.
///
/// `method`, `uri`, `sanitized_headers`, and `body_debug` are required; every
/// transport must implement them.
///
/// `version` is optional (default: `None`) because some transports — notably
/// the AWS SDK connector layer — do not expose HTTP version in their request
/// type.
pub trait HttpRequestTelemetry {
fn method(&self) -> &str;
fn uri(&self) -> String;
fn headers(&self) -> HeaderMap<HeaderValue>;
/// Returns the body size bounds as `(lower, upper)`.
fn body_size_hint(&self) -> (u64, Option<u64>);
/// Returns the HTTP version when the transport exposes it.
fn version(&self) -> Option<Version> {
None
}

/// Returns the headers with sensitive values redacted.
///
/// Provided by default; implementors only need to implement [`headers`].
fn sanitized_headers(&self) -> HeaderMap<HeaderValue> {
remove_sensitive(self.headers())
}
}

fn remove_sensitive(headers: &HeaderMap<HeaderValue>) -> HeaderMap<HeaderValue> {
let mut headers = headers.clone();
for name in &[
header::AUTHORIZATION,
header::PROXY_AUTHORIZATION,
header::COOKIE,
header::SET_COOKIE,
] {
if let Some(value) = headers.get_mut(name) {
value.set_sensitive(true);
}
/// Provides the data required to emit HTTP response telemetry.
///
/// Same rationale as [`HttpRequestTelemetry`]: `version` is optional.
pub trait HttpResponseTelemetry {
fn status_u16(&self) -> u16;
fn headers(&self) -> HeaderMap<HeaderValue>;
/// Returns the body size bounds as `(lower, upper)`.
fn body_size_hint(&self) -> (u64, Option<u64>);
/// Returns the HTTP version when the transport exposes it.
fn version(&self) -> Option<Version> {
None
}
headers

/// Returns the headers with sensitive values redacted.
///
/// Provided by default; implementors only need to implement [`headers`].
fn sanitized_headers(&self) -> HeaderMap<HeaderValue> {
remove_sensitive(self.headers())
}
}

// ── Implementations for the hyper HTTP types (full data) ──────────────────────

impl<T: HttpBody> HttpRequestTelemetry for Request<T> {
fn method(&self) -> &str {
self.method().as_str()
}

fn uri(&self) -> String {
self.uri().to_string()
}

fn headers(&self) -> HeaderMap<HeaderValue> {
self.headers().clone()
}

fn body_size_hint(&self) -> (u64, Option<u64>) {
let hint = self.body().size_hint();
(hint.lower(), hint.upper())
}

fn version(&self) -> Option<Version> {
Some(self.version())
}
}

impl<T: HttpBody> HttpResponseTelemetry for Response<T> {
fn status_u16(&self) -> u16 {
self.status().as_u16()
}

fn headers(&self) -> HeaderMap<HeaderValue> {
self.headers().clone()
}

fn body_size_hint(&self) -> (u64, Option<u64>) {
let hint = self.body().size_hint();
(hint.lower(), hint.upper())
}

fn version(&self) -> Option<Version> {
Some(self.version())
}
}

// ── Events ────────────────────────────────────────────────────────────────────

#[derive(Debug, NamedInternalEvent)]
pub struct AboutToSendHttpRequest<'a, T: HttpRequestTelemetry> {
pub request: &'a T,
}

impl<T: HttpBody> InternalEvent for AboutToSendHttpRequest<'_, T> {
impl<T: HttpRequestTelemetry> InternalEvent for AboutToSendHttpRequest<'_, T> {
fn emit(self) {
debug!(
message = "Sending HTTP request.",
uri = %self.request.uri(),
method = %self.request.method(),
version = ?self.request.version(),
headers = ?remove_sensitive(self.request.headers()),
body = %FormatBody(self.request.body()),
headers = ?self.request.sanitized_headers(),
body = %FormatBodySizeHint::from(self.request.body_size_hint()),
);
counter!(CounterName::HttpClientRequestsSentTotal, "method" => self.request.method().to_string())
.increment(1);
}
}

#[derive(Debug, NamedInternalEvent)]
pub struct GotHttpResponse<'a, T> {
pub response: &'a Response<T>,
pub struct GotHttpResponse<'a, T: HttpResponseTelemetry> {
pub response: &'a T,
pub roundtrip: Duration,
}

impl<T: HttpBody> InternalEvent for GotHttpResponse<'_, T> {
impl<T: HttpResponseTelemetry> InternalEvent for GotHttpResponse<'_, T> {
fn emit(self) {
let status = self.response.status_u16();
let status_str = status.to_string();
debug!(
message = "HTTP response.",
status = %self.response.status(),
status = %status,
version = ?self.response.version(),
headers = ?remove_sensitive(self.response.headers()),
body = %FormatBody(self.response.body()),
headers = ?self.response.sanitized_headers(),
body = %FormatBodySizeHint::from(self.response.body_size_hint()),

);
counter!(
CounterName::HttpClientResponsesTotal,
"status" => self.response.status().as_u16().to_string(),
)
.increment(1);
counter!(CounterName::HttpClientResponsesTotal, "status" => status_str.clone())
.increment(1);
histogram!(HistogramName::HttpClientRttSeconds).record(self.roundtrip);
histogram!(
HistogramName::HttpClientResponseRttSeconds,
"status" => self.response.status().as_u16().to_string(),
)
.record(self.roundtrip);
histogram!(HistogramName::HttpClientResponseRttSeconds, "status" => status_str)
.record(self.roundtrip);
}
}

#[derive(Debug, NamedInternalEvent)]
pub struct GotHttpWarning<'a> {
pub error: &'a Error,
pub error: &'a dyn std::error::Error,
pub roundtrip: Duration,
}

Expand All @@ -96,13 +174,34 @@ impl InternalEvent for GotHttpWarning<'_> {
}
}

/// Newtype placeholder to provide a formatter for the request and response body.
struct FormatBody<'a, B>(&'a B);
// ── Helpers ───────────────────────────────────────────────────────────────────

fn remove_sensitive(mut headers: HeaderMap<HeaderValue>) -> HeaderMap<HeaderValue> {
for name in &[
header::AUTHORIZATION,
header::PROXY_AUTHORIZATION,
header::COOKIE,
header::SET_COOKIE,
] {
if let Some(value) = headers.get_mut(name) {
value.set_sensitive(true);
}
}
headers
}

/// Formats a body size hint `(lower, upper)` for debug logging.
struct FormatBodySizeHint(u64, Option<u64>);

impl From<(u64, Option<u64>)> for FormatBodySizeHint {
fn from((lower, upper): (u64, Option<u64>)) -> Self {
FormatBodySizeHint(lower, upper)
}
}

impl<B: HttpBody> std::fmt::Display for FormatBody<'_, B> {
impl std::fmt::Display for FormatBodySizeHint {
fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
let size = self.0.size_hint();
match (size.lower(), size.upper()) {
match (self.0, self.1) {
(0, None) => write!(fmt, "[unknown]"),
(lower, None) => write!(fmt, "[>={lower} bytes]"),

Expand Down
Loading