Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
The `datadog_agent` source now supports validating incoming Datadog API keys. The new `valid_api_keys` option accepts a list of permitted API keys, and `drop_on_invalid_api_key` controls whether requests carrying an unrecognized key are rejected with a `403 Forbidden` response (when `true`) or accepted with the key simply not stored (when `false`, the default). When `valid_api_keys` is empty (the default), no validation is performed and all API keys are accepted, preserving existing behavior.
23 changes: 13 additions & 10 deletions src/sources/datadog_agent/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@ use vector_lib::{
use vrl::core::Value;
use warp::{Filter, filters::BoxedFilter, path as warp_path, path::FullPath, reply::Response};

use super::{ApiKeyQueryParams, DatadogAgentConfig, DatadogAgentSource, LogMsg, RequestHandler};
use super::{
ApiKeyQueryParams, ApiKeyValidation, DatadogAgentConfig, DatadogAgentSource, LogMsg,
RequestHandler, invalid_api_key_error,
};
use crate::{
common::{datadog::DDTAGS, http::ErrorMessage},
event::Event,
Expand Down Expand Up @@ -43,15 +46,15 @@ pub(super) fn build_warp_filter(
let events = source
.decode(&encoding_header, body, path.as_str())
.and_then(|body| {
decode_log_body(
body,
source.api_key_extractor.extract(
path.as_str(),
api_token,
query_params.dd_api_key,
),
&source,
)
let api_key = match source.api_key_extractor.extract_and_validate(
Comment on lines 47 to +49
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Reject invalid keys before decoding bodies

With drop_on_invalid_api_key = true, this endpoint still reads and decodes/decompresses the request body before running the new API-key check. An unauthenticated request with a bad key and malformed or very large compressed body will consume decode work and return the decode error instead of the intended 403; the same post-decode validation pattern appears in the metrics and traces handlers, even though validation only needs path/header/query data.

Useful? React with 👍 / 👎.

path.as_str(),
api_token,
query_params.dd_api_key,
) {
ApiKeyValidation::Accepted(api_key) => api_key,
ApiKeyValidation::Rejected => return Err(invalid_api_key_error()),
};
decode_log_body(body, api_key, &source)
});
handler.clone().handle_request(events, super::LOGS)
},
Expand Down
46 changes: 30 additions & 16 deletions src/sources/datadog_agent/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ use vector_lib::{
use warp::{Filter, filters::BoxedFilter, path, path::FullPath, reply::Response};

use super::ddmetric_proto::{Metadata, MetricPayload, SketchPayload, metric_payload};
use super::{ApiKeyQueryParams, DatadogAgentSource, RequestHandler};
use super::{
ApiKeyQueryParams, ApiKeyValidation, DatadogAgentSource, RequestHandler, invalid_api_key_error,
};
use crate::{
common::{
datadog::{DatadogMetricType, DatadogSeriesMetric},
Expand Down Expand Up @@ -70,13 +72,17 @@ fn sketches_service(
let events = source
.decode(&encoding_header, body, path.as_str())
.and_then(|body| {
let api_key = match source.api_key_extractor.extract_and_validate(
path.as_str(),
api_token,
query_params.dd_api_key,
) {
ApiKeyValidation::Accepted(api_key) => api_key,
ApiKeyValidation::Rejected => return Err(invalid_api_key_error()),
};
decode_datadog_sketches(
body,
source.api_key_extractor.extract(
path.as_str(),
api_token,
query_params.dd_api_key,
),
api_key,
source.split_metric_namespace,
&source.events_received,
)
Expand Down Expand Up @@ -107,13 +113,17 @@ fn series_v1_service(
let events = source
.decode(&encoding_header, body, path.as_str())
.and_then(|body| {
let api_key = match source.api_key_extractor.extract_and_validate(
path.as_str(),
api_token,
query_params.dd_api_key,
) {
ApiKeyValidation::Accepted(api_key) => api_key,
ApiKeyValidation::Rejected => return Err(invalid_api_key_error()),
};
decode_datadog_series_v1(
body,
source.api_key_extractor.extract(
path.as_str(),
api_token,
query_params.dd_api_key,
),
api_key,
// Currently metrics do not have schemas defined, so for now we just pass a
// default one.
&Arc::new(schema::Definition::default_legacy_namespace()),
Expand Down Expand Up @@ -147,13 +157,17 @@ fn series_v2_service(
let events = source
.decode(&encoding_header, body, path.as_str())
.and_then(|body| {
let api_key = match source.api_key_extractor.extract_and_validate(
path.as_str(),
api_token,
query_params.dd_api_key,
) {
ApiKeyValidation::Accepted(api_key) => api_key,
ApiKeyValidation::Rejected => return Err(invalid_api_key_error()),
};
decode_datadog_series_v2(
body,
source.api_key_extractor.extract(
path.as_str(),
api_token,
query_params.dd_api_key,
),
api_key,
source.split_metric_namespace,
&source.events_received,
)
Expand Down
98 changes: 97 additions & 1 deletion src/sources/datadog_agent/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ pub(crate) mod ddtrace_proto {
include!(concat!(env!("OUT_DIR"), "/dd_trace.rs"));
}

use std::{convert::Infallible, fmt::Debug, io::Read, net::SocketAddr, sync::Arc, time::Duration};
use std::{
collections::HashSet, convert::Infallible, fmt::Debug, io::Read, net::SocketAddr, sync::Arc,
time::Duration,
};

use bytes::{Buf, Bytes};
use chrono::{DateTime, Utc, serde::ts_milliseconds};
Expand Down Expand Up @@ -91,6 +94,31 @@ pub struct DatadogAgentConfig {
#[serde(default = "crate::serde::default_true")]
store_api_key: bool,

/// A list of API keys that are permitted to send events to this source.
///
/// When this list is non-empty, the API key carried by an incoming request (in the URL,
/// the `dd-api-key` header, or the `dd-api-key` query parameter) is checked against it.
/// When the list is empty (the default), all API keys are accepted and no validation is
/// performed.
///
/// The behavior when a request carries an API key that is not in this list is controlled by
/// `drop_on_invalid_api_key`.
#[configurable(metadata(docs::advanced))]
#[serde(default)]
valid_api_keys: Vec<String>,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Treat allow-listed API keys as sensitive

Datadog API keys are credentials, but storing valid_api_keys as plain Strings means they are included verbatim in the derived Debug output for DatadogAgentConfig and the generated schema is not marked sensitive. In contexts that log or expose component config, this can leak every allow-listed key; existing credential fields in this repo use SensitiveString specifically to redact these values.

Useful? React with 👍 / 👎.


/// Controls what happens when a request carries an API key that is not present in
/// `valid_api_keys`.
///
/// When set to `true`, requests with an unrecognized API key are rejected with a
/// `403 Forbidden` response. When set to `false` (the default), the unrecognized key is
/// simply not stored in the event metadata, but the events are still accepted.
///
/// This option has no effect when `valid_api_keys` is empty.
#[configurable(metadata(docs::advanced))]
#[serde(default = "crate::serde::default_false")]
drop_on_invalid_api_key: bool,

/// If this is set to `true`, logs are not accepted by the component.
#[configurable(metadata(docs::advanced))]
#[serde(default = "crate::serde::default_false")]
Expand Down Expand Up @@ -173,6 +201,8 @@ impl GenerateConfig for DatadogAgentConfig {
address: "0.0.0.0:8080".parse().unwrap(),
tls: None,
store_api_key: true,
valid_api_keys: Vec::new(),
drop_on_invalid_api_key: false,
framing: default_framing_message_based(),
decoding: default_decoding(),
acknowledgements: SourceAcknowledgementsConfig::default(),
Expand Down Expand Up @@ -209,6 +239,8 @@ impl SourceConfig for DatadogAgentConfig {
let tls = MaybeTlsSettings::from_config(self.tls.as_ref(), true)?;
let source = DatadogAgentSource::new(
self.store_api_key,
self.valid_api_keys.clone(),
self.drop_on_invalid_api_key,
decoder,
tls.http_protocol_name(),
logs_schema_definition,
Expand Down Expand Up @@ -386,9 +418,41 @@ pub(crate) struct DatadogAgentSource {
pub struct ApiKeyExtractor {
matcher: Regex,
store_api_key: bool,
valid_api_keys: Arc<HashSet<String>>,
drop_on_invalid_api_key: bool,
}

/// The result of checking an extracted API key against the configured allow list.
pub(crate) enum ApiKeyValidation {
/// The key is accepted. The contained value is the key to store in the event
/// metadata (which may be `None` when `store_api_key` is disabled or no key was present).
Accepted(Option<Arc<str>>),
/// The request carried an API key that is not in `valid_api_keys` and
/// `drop_on_invalid_api_key` is enabled, so the request must be rejected.
Rejected,
}

/// The error returned to the Datadog Agent when a request is rejected because its API key is not
/// in the configured `valid_api_keys` list and `drop_on_invalid_api_key` is enabled.
pub(crate) fn invalid_api_key_error() -> ErrorMessage {
ErrorMessage::new(
StatusCode::FORBIDDEN,
"API key is not in the configured `valid_api_keys` list.".to_string(),
)
}

impl ApiKeyExtractor {
#[cfg(test)]
pub(crate) fn for_test(valid_api_keys: Vec<String>, drop_on_invalid_api_key: bool) -> Self {
Self {
matcher: Regex::new(r"^/v1/input/(?P<api_key>[[:alnum:]]{32})/??")
.expect("static regex always compiles"),
store_api_key: true,
valid_api_keys: Arc::new(valid_api_keys.into_iter().collect()),
drop_on_invalid_api_key,
}
}

pub fn extract(
&self,
path: &str,
Expand All @@ -407,11 +471,41 @@ impl ApiKeyExtractor {
// Try from header next
.or_else(|| header.map(Arc::from))
}

/// Extracts the API key from the request and validates it against `valid_api_keys`.
///
/// When `valid_api_keys` is empty, no validation is performed and the extracted key (if any)
/// is accepted. Otherwise, the key is checked against the allow list: a missing or
/// unrecognized key results in `Rejected` when `drop_on_invalid_api_key` is set, or an
/// `Accepted(None)` (the key is not stored) otherwise.
pub(crate) fn extract_and_validate(
&self,
path: &str,
header: Option<String>,
query_params: Option<String>,
) -> ApiKeyValidation {
let api_key = self.extract(path, header, query_params);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Decouple validation from API-key storage

When store_api_key = false, extract() returns None before it ever reads the path/header/query key, so valid_api_keys cannot be used independently of metadata storage. In a config with valid_api_keys set and drop_on_invalid_api_key = true, even requests carrying an allowed key are rejected as missing; users who disable key forwarding to sinks therefore cannot enable the new validation feature.

Useful? React with 👍 / 👎.


if self.valid_api_keys.is_empty() {
return ApiKeyValidation::Accepted(api_key);
}

match &api_key {
Some(key) if self.valid_api_keys.contains(key.as_ref()) => {
ApiKeyValidation::Accepted(api_key)
}
_ if self.drop_on_invalid_api_key => ApiKeyValidation::Rejected,
_ => ApiKeyValidation::Accepted(None),
}
}
}

impl DatadogAgentSource {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
store_api_key: bool,
valid_api_keys: Vec<String>,
drop_on_invalid_api_key: bool,
decoder: Decoder,
protocol: &'static str,
logs_schema_definition: Option<schema::Definition>,
Expand All @@ -424,6 +518,8 @@ impl DatadogAgentSource {
store_api_key,
matcher: Regex::new(r"^/v1/input/(?P<api_key>[[:alnum:]]{32})/??")
.expect("static regex always compiles"),
valid_api_keys: Arc::new(valid_api_keys.into_iter().collect()),
drop_on_invalid_api_key,
},
log_schema_host_key: log_schema()
.host_key_target_path()
Expand Down
62 changes: 60 additions & 2 deletions src/sources/datadog_agent/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,9 @@ use crate::{
schema::Definition,
serde::{default_decoding, default_framing_message_based},
sources::datadog_agent::{
DatadogAgentConfig, DatadogAgentSource, LOGS, LogMsg, METRICS, TRACES, ddmetric_proto,
ddtrace_proto, logs::decode_log_body, metrics::DatadogSeriesRequest,
ApiKeyExtractor, ApiKeyValidation, DatadogAgentConfig, DatadogAgentSource, LOGS, LogMsg,
METRICS, TRACES, ddmetric_proto, ddtrace_proto, logs::decode_log_body,
metrics::DatadogSeriesRequest,
},
test_util::{
addr::{PortGuard, next_addr},
Expand Down Expand Up @@ -108,6 +109,8 @@ fn test_decode_log_body() {

let source = DatadogAgentSource::new(
true,
Vec::new(),
false,
decoder,
"http",
Some(test_logs_schema_definition()),
Expand Down Expand Up @@ -164,6 +167,8 @@ fn test_decode_log_body_parse_ddtags() {

let source = DatadogAgentSource::new(
true,
Vec::new(),
false,
decoder,
"http",
Some(test_logs_schema_definition()),
Expand Down Expand Up @@ -201,6 +206,8 @@ fn test_decode_log_body_empty_object() {

let source = DatadogAgentSource::new(
true,
Vec::new(),
false,
decoder,
"http",
Some(test_logs_schema_definition()),
Expand Down Expand Up @@ -1609,6 +1616,8 @@ fn test_config_outputs_with_disabled_data_types() {
address: "0.0.0.0:8080".parse().unwrap(),
tls: None,
store_api_key: true,
valid_api_keys: Vec::new(),
drop_on_invalid_api_key: false,
framing: default_framing_message_based(),
decoding: default_decoding(),
acknowledgements: Default::default(),
Expand Down Expand Up @@ -2053,6 +2062,8 @@ fn test_config_outputs() {
address: "0.0.0.0:8080".parse().unwrap(),
tls: None,
store_api_key: true,
valid_api_keys: Vec::new(),
drop_on_invalid_api_key: false,
framing: default_framing_message_based(),
decoding,
acknowledgements: Default::default(),
Expand Down Expand Up @@ -2780,6 +2791,8 @@ impl ValidatableComponent for DatadogAgentConfig {
address: "0.0.0.0:9007".parse().unwrap(),
tls: None,
store_api_key: false,
valid_api_keys: Vec::new(),
drop_on_invalid_api_key: false,
framing: CharacterDelimitedDecoderConfig {
character_delimited: CharacterDelimitedDecoderOptions {
delimiter: b',',
Expand Down Expand Up @@ -2832,3 +2845,48 @@ impl ValidatableComponent for DatadogAgentConfig {
}

register_validatable_component!(DatadogAgentConfig);

#[test]
fn api_key_validation() {
let valid = "0123456789abcdef0123456789abcdef".to_string();
let invalid = "ffffffffffffffffffffffffffffffff".to_string();

// No `valid_api_keys` configured: any key (or none) is accepted as-is.
let extractor = ApiKeyExtractor::for_test(vec![], false);
assert!(matches!(
extractor.extract_and_validate("/v1/input", Some(invalid.clone()), None),
ApiKeyValidation::Accepted(Some(key)) if key.as_ref() == invalid
));

// Allow list set, key matches (via header): accepted and stored.
let extractor = ApiKeyExtractor::for_test(vec![valid.clone()], true);
assert!(matches!(
extractor.extract_and_validate("/v1/input", Some(valid.clone()), None),
ApiKeyValidation::Accepted(Some(key)) if key.as_ref() == valid
));

// Allow list set, key matches (via URL path): accepted and stored.
assert!(matches!(
extractor.extract_and_validate(&format!("/v1/input/{valid}"), None, None),
ApiKeyValidation::Accepted(Some(key)) if key.as_ref() == valid
));

// Allow list set, unknown key, drop_on_invalid_api_key = true: rejected.
assert!(matches!(
extractor.extract_and_validate("/v1/input", Some(invalid.clone()), None),
ApiKeyValidation::Rejected
));

// Allow list set, no key present, drop_on_invalid_api_key = true: rejected.
assert!(matches!(
extractor.extract_and_validate("/v1/input", None, None),
ApiKeyValidation::Rejected
));

// Allow list set, unknown key, drop_on_invalid_api_key = false: accepted but key not stored.
let extractor = ApiKeyExtractor::for_test(vec![valid], false);
assert!(matches!(
extractor.extract_and_validate("/v1/input", Some(invalid), None),
ApiKeyValidation::Accepted(None)
));
}
Loading
Loading