Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion bottlecap/src/bin/bottlecap/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1219,8 +1219,11 @@ async fn start_dogstatsd(
) {
// Start aggregator service and handle
let start_time = Instant::now();
let metric_tags = tags_provider
.get_tags_vec_excluding(&config.custom_metrics_tags_drop)
.join(",");
let (aggregator_service, aggregator_handle) = MetricsAggregatorService::new(
SortedTags::parse(&tags_provider.get_tags_string()).unwrap_or(EMPTY_TAGS),
SortedTags::parse(&metric_tags).unwrap_or(EMPTY_TAGS),
CONTEXTS,
)
.expect("can't create metrics service");
Expand Down
14 changes: 14 additions & 0 deletions bottlecap/src/config/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,10 @@ pub struct EnvConfig {
/// Defaults to 1024. Increase if the processor can't keep up with burst traffic.
#[serde(deserialize_with = "deserialize_option_lossless")]
pub dogstatsd_queue_size: Option<usize>,
/// @env `DD_CUSTOM_METRICS_TAGS_DROP`
/// Comma-separated list of tag keys to remove from custom metrics emitted by the extension.
#[serde(deserialize_with = "deserialize_array_from_comma_separated_string")]
pub custom_metrics_tags_drop: Vec<String>,

// OTLP
//
Expand Down Expand Up @@ -589,6 +593,7 @@ fn merge_config(config: &mut Config, env_config: &EnvConfig) {
merge_option!(config, env_config, dogstatsd_so_rcvbuf);
merge_option!(config, env_config, dogstatsd_buffer_size);
merge_option!(config, env_config, dogstatsd_queue_size);
merge_vec!(config, env_config, custom_metrics_tags_drop);

// OTLP
merge_option_to_value!(config, env_config, otlp_config_traces_enabled);
Expand Down Expand Up @@ -873,6 +878,10 @@ mod tests {
jail.set_env("DD_DOGSTATSD_SO_RCVBUF", "1048576");
jail.set_env("DD_DOGSTATSD_BUFFER_SIZE", "65507");
jail.set_env("DD_DOGSTATSD_QUEUE_SIZE", "2048");
jail.set_env(
"DD_CUSTOM_METRICS_TAGS_DROP",
"function_arn,executedversion,cold_start",
);

// AWS Lambda
jail.set_env(
Expand Down Expand Up @@ -1032,6 +1041,11 @@ mod tests {
dogstatsd_so_rcvbuf: Some(1_048_576),
dogstatsd_buffer_size: Some(65507),
dogstatsd_queue_size: Some(2048),
custom_metrics_tags_drop: vec![
"function_arn".to_string(),
"executedversion".to_string(),
"cold_start".to_string(),
],
api_key_secret_arn: "arn:aws:secretsmanager:region:account:secret:datadog-api-key"
.to_string(),
kms_api_key: "test-kms-key".to_string(),
Expand Down
2 changes: 2 additions & 0 deletions bottlecap/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,7 @@ pub struct Config {
/// Internal queue capacity between the socket reader and metric processor.
/// Defaults to 1024. Increase if the processor can't keep up with burst traffic.
pub dogstatsd_queue_size: Option<usize>,
pub custom_metrics_tags_drop: Vec<String>,

// OTLP
//
Expand Down Expand Up @@ -443,6 +444,7 @@ impl Default for Config {
dogstatsd_buffer_size: None,
// Defaults to 1024 internally.
dogstatsd_queue_size: None,
custom_metrics_tags_drop: vec![],

// OTLP
otlp_config_traces_enabled: true,
Expand Down
1 change: 1 addition & 0 deletions bottlecap/src/config/yaml.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1036,6 +1036,7 @@ api_security_sample_delay: 60 # Seconds
dogstatsd_so_rcvbuf: Some(1_048_576),
dogstatsd_buffer_size: Some(65507),
dogstatsd_queue_size: Some(2048),
custom_metrics_tags_drop: Vec::new(),

dd_org_uuid: String::default(),
};
Expand Down
44 changes: 44 additions & 0 deletions bottlecap/src/metrics/enhanced/lambda.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ impl Lambda {
let vec_tags: Vec<String> = self
.dynamic_value_tags
.iter()
.filter(|(k, _)| {
!crate::tags::tag_should_be_dropped(k, &self.config.custom_metrics_tags_drop)
})
.map(|(k, v)| format!("{k}:{v}"))
.collect();

Expand Down Expand Up @@ -872,6 +875,47 @@ mod tests {
);
}

#[tokio::test]
async fn test_custom_metrics_tags_drop_filters_dynamic_tags() {
let (metrics_aggr, config) = setup();
let config = Arc::new(config::Config {
custom_metrics_tags_drop: vec!["cold_start".to_string()],
..config.as_ref().clone()
});
let mut lambda = Lambda::new(metrics_aggr.clone(), config);

let now: i64 = std::time::UNIX_EPOCH
.elapsed()
.unwrap_or_default()
.as_secs()
.try_into()
.unwrap_or_default();
lambda.set_init_tags(false, true);
lambda.set_runtime_tag("nodejs");
lambda.increment_invocation_metric(now);

let ts = (now / 10) * 10;
let kept_tags = SortedTags::parse("runtime:nodejs").ok();
let dropped_tags = SortedTags::parse("cold_start:true,runtime:nodejs").ok();

assert!(
metrics_aggr
.get_entry_by_id(constants::INVOCATIONS_METRIC.into(), kept_tags, ts)
.await
.unwrap()
.is_some(),
"expected metric without cold_start tag"
);
assert!(
metrics_aggr
.get_entry_by_id(constants::INVOCATIONS_METRIC.into(), dropped_tags, ts)
.await
.unwrap()
.is_none(),
"expected cold_start tag to be dropped"
);
}

#[tokio::test]
#[allow(clippy::float_cmp)]
async fn test_increment_invocation_metric() {
Expand Down
30 changes: 30 additions & 0 deletions bottlecap/src/tags/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,32 @@
pub mod lambda;
pub mod provider;

#[must_use]
pub fn tag_should_be_dropped(tag_key: &str, tags_to_drop: &[String]) -> bool {
tags_to_drop.iter().any(|tag_to_drop| {
let drop_key = tag_to_drop
.trim()
.split_once(':')
.map_or(tag_to_drop.trim(), |(key, _)| key.trim());
!drop_key.is_empty() && tag_key.eq_ignore_ascii_case(drop_key)
})
}

#[cfg(test)]
mod tests {
use super::tag_should_be_dropped;

#[test]
fn test_tag_should_be_dropped() {
let tags_to_drop = vec![
"function_arn".to_string(),
" executedversion ".to_string(),
"cold_start:true".to_string(),
];

assert!(tag_should_be_dropped("function_arn", &tags_to_drop));
assert!(tag_should_be_dropped("executedversion", &tags_to_drop));
assert!(tag_should_be_dropped("cold_start", &tags_to_drop));
assert!(!tag_should_be_dropped("runtime", &tags_to_drop));
}
}
32 changes: 32 additions & 0 deletions bottlecap/src/tags/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,15 @@ impl Provider {
self.get_tags_vec().join(",")
}

#[must_use]
pub fn get_tags_vec_excluding(&self, tags_to_drop: &[String]) -> Vec<String> {
self.get_tags_map()
.iter()
.filter(|(key, _)| !crate::tags::tag_should_be_dropped(key, tags_to_drop))
.map(|(key, value)| format!("{key}:{value}"))
.collect()
}

#[must_use]
pub fn get_canonical_id(&self) -> Option<String> {
self.tag_provider.get_canonical_id()
Expand Down Expand Up @@ -128,4 +137,27 @@ mod tests {
let provider = Provider::new(config, LAMBDA_RUNTIME_SLUG.to_string(), &metadata);
assert!(provider.get_tags_string().contains("service:test-service"));
}

#[test]
fn test_get_tags_vec_excluding() {
let config = Arc::new(Config {
service: Some("test-service".to_string()),
tags: HashMap::from([("test".to_string(), "tag".to_string())]),
..config::Config::default()
});
let mut metadata = HashMap::new();
metadata.insert(
"function_arn".to_string(),
"arn:aws:lambda:us-west-2:123456789012:function:my-function".to_string(),
);
let provider = Provider::new(config, LAMBDA_RUNTIME_SLUG.to_string(), &metadata);
let tags = provider.get_tags_vec_excluding(&[
"service".to_string(),
"function_arn:ignored-value".to_string(),
]);

assert!(tags.iter().any(|tag| tag == "test:tag"));
assert!(!tags.iter().any(|tag| tag.starts_with("service:")));
assert!(!tags.iter().any(|tag| tag.starts_with("function_arn:")));
}
}
Loading