diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 433077e910..0d89d0e47b 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -103,3 +103,4 @@ tools/sidecar_mockgen/ @DataDog/libdatadog-php libdd-data-pipeline/src/otlp/ @DataDog/apm-sdk-capabilities-rust libdd-data-pipeline/tests/test_trace_exporter_otlp_export.rs @DataDog/apm-sdk-capabilities-rust libdd-trace-utils/src/otlp_encoder/ @DataDog/apm-sdk-capabilities-rust +datadog-sidecar/src/service/ffe_metrics_flusher.rs @DataDog/libdatadog-php @DataDog/libdatadog-apm @DataDog/feature-flagging-and-experimentation-sdk diff --git a/Cargo.lock b/Cargo.lock index ba5b18e3bb..45b82ddcba 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1357,8 +1357,10 @@ dependencies = [ "derive_more", "faststr", "libdd-common", + "libdd-trace-protobuf", "log", "md5", + "prost", "pyo3", "semver", "serde", @@ -1523,6 +1525,7 @@ dependencies = [ "bincode", "chrono", "console-subscriber", + "datadog-ffe", "datadog-ipc", "datadog-ipc-macros", "datadog-live-debugger", @@ -1543,7 +1546,6 @@ dependencies = [ "libdd-dogstatsd-client", "libdd-telemetry", "libdd-tinybytes", - "libdd-trace-protobuf", "libdd-trace-stats", "libdd-trace-utils", "manual_future", diff --git a/datadog-ffe/Cargo.toml b/datadog-ffe/Cargo.toml index 358b4b4016..743cc59915 100644 --- a/datadog-ffe/Cargo.toml +++ b/datadog-ffe/Cargo.toml @@ -24,7 +24,10 @@ serde-bool = { version = "0.1.3", default-features = false } serde_with = { version = "3.11.0", default-features = false, features = ["base64", "hex", "macros"] } thiserror = { version = "2.0.3", default-features = false } url = { version = "2.5.0", default-features = false, features = ["std"] } +libdd-trace-protobuf = { path = "../libdd-trace-protobuf", optional = true } +prost = { version = "0.14.1", optional = true } pyo3 = { version = "0.28", optional = true, default-features = false, features = ["macros"] } [features] +evaluation-metrics = ["dep:libdd-trace-protobuf", "dep:prost"] pyo3 = ["dep:pyo3"] diff --git a/datadog-ffe/src/lib.rs b/datadog-ffe/src/lib.rs index a32b8b757c..8cc4a062df 100644 --- a/datadog-ffe/src/lib.rs +++ b/datadog-ffe/src/lib.rs @@ -4,5 +4,7 @@ mod flag_type; pub mod rules_based; +#[cfg(feature = "evaluation-metrics")] +pub mod telemetry; pub use flag_type::{ExpectedFlagType, FlagType}; diff --git a/datadog-ffe/src/telemetry/evaluation_metrics.rs b/datadog-ffe/src/telemetry/evaluation_metrics.rs new file mode 100644 index 0000000000..23117b1e10 --- /dev/null +++ b/datadog-ffe/src/telemetry/evaluation_metrics.rs @@ -0,0 +1,341 @@ +// Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! Reusable FFE evaluation metric aggregation and OTLP encoding primitives. + +use super::FfeTelemetryContext; +use libdd_trace_protobuf::opentelemetry::proto::common::v1::any_value; +use libdd_trace_protobuf::opentelemetry::proto::common::v1::{ + AnyValue, InstrumentationScope, KeyValue, +}; +use libdd_trace_protobuf::opentelemetry::proto::resource::v1::Resource; +use prost::Message; +use serde::{Deserialize, Serialize}; +use std::collections::BTreeMap; +use std::time::{SystemTime, UNIX_EPOCH}; + +const METER_NAME: &str = "ddtrace.openfeature"; +const METRIC_NAME: &str = "feature_flag.evaluations"; +const METRIC_UNIT: &str = "{evaluation}"; +const METRIC_DESCRIPTION: &str = "Number of feature flag evaluations"; + +const ATTR_SERVICE_NAME: &str = "service.name"; +const ATTR_ENV: &str = "deployment.environment.name"; +const ATTR_VERSION: &str = "service.version"; +const ATTR_FLAG_KEY: &str = "feature_flag.key"; +const ATTR_VARIANT: &str = "feature_flag.result.variant"; +const ATTR_REASON: &str = "feature_flag.result.reason"; +const ATTR_ERROR_TYPE: &str = "error.type"; +const ATTR_ALLOCATION_KEY: &str = "feature_flag.result.allocation_key"; + +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] +pub struct FfeEvaluationMetric { + pub flag_key: String, + pub variant: String, + pub reason: String, + pub error_type: Option, + pub allocation_key: Option, +} + +pub fn encode_metrics_payload( + context: FfeTelemetryContext, + metrics: Vec, +) -> Option> { + if metrics.is_empty() { + return None; + } + + let now = unix_nano_now(); + let data_points = aggregate(metrics) + .into_iter() + .map(|(attributes, count)| otlp::NumberDataPoint { + attributes: attributes + .into_iter() + .map(|(key, value)| string_key_value(key, value)) + .collect(), + start_time_unix_nano: now, + time_unix_nano: now, + value: Some(otlp::number_data_point::Value::AsInt(count)), + flags: 0, + }) + .collect::>(); + + if data_points.is_empty() { + return None; + } + + let request = otlp::ExportMetricsServiceRequest { + resource_metrics: vec![otlp::ResourceMetrics { + resource: Some(resource(context)), + scope_metrics: vec![otlp::ScopeMetrics { + scope: Some(InstrumentationScope { + name: METER_NAME.to_owned(), + version: String::new(), + attributes: vec![], + dropped_attributes_count: 0, + }), + metrics: vec![otlp::Metric { + name: METRIC_NAME.to_owned(), + description: METRIC_DESCRIPTION.to_owned(), + unit: METRIC_UNIT.to_owned(), + data: Some(otlp::metric::Data::Sum(otlp::Sum { + data_points, + aggregation_temporality: otlp::AggregationTemporality::Delta as i32, + is_monotonic: true, + })), + }], + schema_url: String::new(), + }], + schema_url: String::new(), + }], + }; + + Some(request.encode_to_vec()) +} + +fn aggregate(metrics: Vec) -> BTreeMap, i64> { + let mut counts = BTreeMap::new(); + for metric in metrics { + if metric.flag_key.is_empty() { + continue; + } + let attrs = metric_attributes(metric); + *counts.entry(attrs).or_insert(0) += 1; + } + counts +} + +fn metric_attributes(metric: FfeEvaluationMetric) -> BTreeMap { + let reason = normalize(&metric.reason, "unknown"); + let mut attrs = BTreeMap::from([ + (ATTR_FLAG_KEY.to_owned(), metric.flag_key), + (ATTR_VARIANT.to_owned(), metric.variant), + (ATTR_REASON.to_owned(), reason.clone()), + ]); + + if let Some(error_type) = metric.error_type { + if !error_type.is_empty() { + attrs.insert( + ATTR_ERROR_TYPE.to_owned(), + normalize(&error_type, "general"), + ); + } + } + + if let Some(allocation_key) = metric.allocation_key { + if !allocation_key.is_empty() + && !matches!(reason.as_str(), "error" | "default" | "disabled") + { + attrs.insert(ATTR_ALLOCATION_KEY.to_owned(), allocation_key); + } + } + + attrs +} + +fn normalize(value: &str, default: &str) -> String { + let value = value.trim(); + if value.is_empty() { + default.to_owned() + } else { + value.to_ascii_lowercase() + } +} + +fn resource(context: FfeTelemetryContext) -> Resource { + let mut attributes = vec![]; + if !context.service.is_empty() { + attributes.push(string_key_value( + ATTR_SERVICE_NAME.to_owned(), + context.service, + )); + } + if !context.env.is_empty() { + attributes.push(string_key_value(ATTR_ENV.to_owned(), context.env)); + } + if !context.version.is_empty() { + attributes.push(string_key_value(ATTR_VERSION.to_owned(), context.version)); + } + Resource { + attributes, + dropped_attributes_count: 0, + entity_refs: vec![], + } +} + +fn string_key_value(key: String, value: String) -> KeyValue { + KeyValue { + key, + value: Some(AnyValue { + value: Some(any_value::Value::StringValue(value)), + }), + key_ref: 0, + } +} + +fn unix_nano_now() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_nanos().min(u128::from(u64::MAX)) as u64) + .unwrap_or_default() +} + +mod otlp { + use libdd_trace_protobuf::opentelemetry::proto::common::v1::{InstrumentationScope, KeyValue}; + use libdd_trace_protobuf::opentelemetry::proto::resource::v1::Resource; + + #[derive(Clone, PartialEq, ::prost::Message)] + pub struct ExportMetricsServiceRequest { + #[prost(message, repeated, tag = "1")] + pub resource_metrics: ::prost::alloc::vec::Vec, + } + + #[derive(Clone, PartialEq, ::prost::Message)] + pub struct ResourceMetrics { + #[prost(message, optional, tag = "1")] + pub resource: ::core::option::Option, + #[prost(message, repeated, tag = "2")] + pub scope_metrics: ::prost::alloc::vec::Vec, + #[prost(string, tag = "3")] + pub schema_url: ::prost::alloc::string::String, + } + + #[derive(Clone, PartialEq, ::prost::Message)] + pub struct ScopeMetrics { + #[prost(message, optional, tag = "1")] + pub scope: ::core::option::Option, + #[prost(message, repeated, tag = "2")] + pub metrics: ::prost::alloc::vec::Vec, + #[prost(string, tag = "3")] + pub schema_url: ::prost::alloc::string::String, + } + + #[derive(Clone, PartialEq, ::prost::Message)] + pub struct Metric { + #[prost(string, tag = "1")] + pub name: ::prost::alloc::string::String, + #[prost(string, tag = "2")] + pub description: ::prost::alloc::string::String, + #[prost(string, tag = "3")] + pub unit: ::prost::alloc::string::String, + #[prost(oneof = "metric::Data", tags = "7")] + pub data: ::core::option::Option, + } + + pub mod metric { + #[derive(Clone, PartialEq, ::prost::Oneof)] + pub enum Data { + #[prost(message, tag = "7")] + Sum(super::Sum), + } + } + + #[derive(Clone, PartialEq, ::prost::Message)] + pub struct Sum { + #[prost(message, repeated, tag = "1")] + pub data_points: ::prost::alloc::vec::Vec, + #[prost(enumeration = "AggregationTemporality", tag = "2")] + pub aggregation_temporality: i32, + #[prost(bool, tag = "3")] + pub is_monotonic: bool, + } + + #[derive(Clone, PartialEq, ::prost::Message)] + pub struct NumberDataPoint { + #[prost(fixed64, tag = "2")] + pub start_time_unix_nano: u64, + #[prost(fixed64, tag = "3")] + pub time_unix_nano: u64, + #[prost(oneof = "number_data_point::Value", tags = "6")] + pub value: ::core::option::Option, + #[prost(message, repeated, tag = "7")] + pub attributes: ::prost::alloc::vec::Vec, + #[prost(uint32, tag = "8")] + pub flags: u32, + } + + pub mod number_data_point { + #[derive(Clone, PartialEq, ::prost::Oneof)] + pub enum Value { + #[prost(sfixed64, tag = "6")] + AsInt(i64), + } + } + + #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] + #[repr(i32)] + pub enum AggregationTemporality { + Unspecified = 0, + Delta = 1, + Cumulative = 2, + } +} + +#[cfg(test)] +mod tests { + use super::*; + use prost::Message; + + fn context() -> FfeTelemetryContext { + FfeTelemetryContext { + service: "svc".to_owned(), + env: "prod".to_owned(), + version: "1".to_owned(), + } + } + + fn metric(flag_key: &str, variant: &str, reason: &str) -> FfeEvaluationMetric { + FfeEvaluationMetric { + flag_key: flag_key.to_owned(), + variant: variant.to_owned(), + reason: reason.to_owned(), + error_type: None, + allocation_key: Some("alloc".to_owned()), + } + } + + #[test] + fn encodes_otlp_counter_and_aggregates_matching_attributes() { + let payload = encode_metrics_payload( + context(), + vec![ + metric("flag", "variant", "TARGETING_MATCH"), + metric("flag", "variant", "targeting_match"), + ], + ) + .unwrap(); + + let decoded = otlp::ExportMetricsServiceRequest::decode(payload.as_slice()).unwrap(); + let resource_metrics = &decoded.resource_metrics[0]; + let attrs = &resource_metrics.resource.as_ref().unwrap().attributes; + assert!(attrs.iter().any(|kv| kv.key == ATTR_SERVICE_NAME)); + + let data_points = match resource_metrics.scope_metrics[0].metrics[0] + .data + .as_ref() + .unwrap() + { + otlp::metric::Data::Sum(sum) => &sum.data_points, + }; + assert_eq!(data_points.len(), 1); + assert_eq!( + data_points[0].value, + Some(otlp::number_data_point::Value::AsInt(2)) + ); + } + + #[test] + fn excludes_allocation_key_for_error_default_and_disabled() { + for reason in ["ERROR", "DEFAULT", "DISABLED"] { + let attrs = metric_attributes(FfeEvaluationMetric { + flag_key: "flag".to_owned(), + variant: String::new(), + reason: reason.to_owned(), + error_type: Some("FLAG_NOT_FOUND".to_owned()), + allocation_key: Some("alloc".to_owned()), + }); + assert!(!attrs.contains_key(ATTR_ALLOCATION_KEY)); + assert_eq!(attrs[ATTR_ERROR_TYPE], "flag_not_found"); + } + } +} diff --git a/datadog-ffe/src/telemetry/mod.rs b/datadog-ffe/src/telemetry/mod.rs new file mode 100644 index 0000000000..d64e5d4eae --- /dev/null +++ b/datadog-ffe/src/telemetry/mod.rs @@ -0,0 +1,13 @@ +// Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +pub mod evaluation_metrics; + +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, Default, Deserialize, Eq, Hash, PartialEq, Serialize)] +pub struct FfeTelemetryContext { + pub service: String, + pub env: String, + pub version: String, +} diff --git a/datadog-sidecar-ffi/src/lib.rs b/datadog-sidecar-ffi/src/lib.rs index 7cc6a3267d..ef37847258 100644 --- a/datadog-sidecar-ffi/src/lib.rs +++ b/datadog-sidecar-ffi/src/lib.rs @@ -27,7 +27,8 @@ use datadog_sidecar::service::agent_info::AgentInfoReader; use datadog_sidecar::service::telemetry::InternalTelemetryAction; use datadog_sidecar::service::{ blocking::{self, SidecarTransport}, - DynamicInstrumentationConfigState, InstanceId, QueueId, RuntimeMetadata, + DynamicInstrumentationConfigState, FfeEvaluationMetric as SidecarFfeEvaluationMetric, + FfeTelemetryContext as SidecarFfeTelemetryContext, InstanceId, QueueId, RuntimeMetadata, SerializedTracerHeaderTags, SessionConfig, SidecarAction, SidecarFlushOptions, }; use datadog_sidecar::service::{get_telemetry_action_sender, InternalTelemetryActions}; @@ -35,7 +36,7 @@ use datadog_sidecar::shm_remote_config::{path_for_remote_config, RemoteConfigRea use libc::c_char; use libdd_common::tag::Tag; use libdd_common::Endpoint; -use libdd_common_ffi::slice::{AsBytes, CharSlice}; +use libdd_common_ffi::slice::{AsBytes, CharSlice, Slice}; use libdd_common_ffi::{self as ffi, MaybeError}; #[cfg(windows)] use libdd_crashtracker_ffi::Metadata; @@ -1116,6 +1117,101 @@ pub unsafe extern "C" fn ddog_sidecar_send_debugger_datum( ddog_sidecar_send_debugger_data(transport, instance_id, queue_id, vec![*payload]) } +#[repr(C)] +pub struct FfeTelemetryContext<'a> { + pub service: CharSlice<'a>, + pub env: CharSlice<'a>, + pub version: CharSlice<'a>, +} + +#[repr(C)] +pub struct FfeEvaluationMetric<'a> { + pub flag_key: CharSlice<'a>, + pub variant: CharSlice<'a>, + pub reason: CharSlice<'a>, + pub error_type: CharSlice<'a>, + pub allocation_key: CharSlice<'a>, +} + +/// Send structured FFE evaluation metric events to the sidecar. The sidecar +/// owns aggregation, OTLP/protobuf serialization, and OTLP HTTP delivery. This +/// function is caller-driven so SDKs with existing host-language hooks can +/// safely coexist until they explicitly migrate. +/// +/// # Safety +/// `endpoint`, `context`, and every element in `metrics` must contain valid +/// UTF-8 `CharSlice` values. Empty `endpoint` or `metrics` is a no-op. +#[no_mangle] +#[allow(clippy::missing_safety_doc)] +pub unsafe extern "C" fn ddog_sidecar_send_ffe_evaluation_metrics( + transport: &mut Box, + instance_id: &InstanceId, + queue_id: &QueueId, + endpoint: CharSlice, + context: &FfeTelemetryContext<'_>, + metrics: Slice>, +) -> MaybeError { + if endpoint.is_empty() || metrics.is_empty() { + return MaybeError::None; + } + + let endpoint = try_c!(char_slice_to_string(endpoint)); + let context = try_c!(ffe_context_from_ffi(context)); + let metrics = try_c!(metrics + .try_as_slice() + .map_err(|e| format!("Invalid metric slice: {e}")) + .and_then(|metrics| metrics + .iter() + .map(ffe_metric_from_ffi) + .collect::, _>>())); + + if metrics.is_empty() { + return MaybeError::None; + } + + try_c!(blocking::enqueue_actions( + transport, + instance_id, + queue_id, + vec![SidecarAction::FfeEvaluationMetrics { + endpoint, + context, + metrics, + }], + )); + MaybeError::None +} + +fn ffe_context_from_ffi( + context: &FfeTelemetryContext<'_>, +) -> Result { + Ok(SidecarFfeTelemetryContext { + service: char_slice_to_string(context.service)?, + env: char_slice_to_string(context.env)?, + version: char_slice_to_string(context.version)?, + }) +} + +fn ffe_metric_from_ffi( + metric: &FfeEvaluationMetric<'_>, +) -> Result { + Ok(SidecarFfeEvaluationMetric { + flag_key: char_slice_to_string(metric.flag_key)?, + variant: char_slice_to_string(metric.variant)?, + reason: char_slice_to_string(metric.reason)?, + error_type: optional_string(metric.error_type)?, + allocation_key: optional_string(metric.allocation_key)?, + }) +} + +fn optional_string(slice: CharSlice) -> Result, String> { + if slice.is_empty() { + Ok(None) + } else { + char_slice_to_string(slice).map(Some) + } +} + #[no_mangle] #[allow(clippy::missing_safety_doc)] #[allow(improper_ctypes_definitions)] // DebuggerPayload is just a pointer, we hide its internals diff --git a/datadog-sidecar/Cargo.toml b/datadog-sidecar/Cargo.toml index 8d04b2d28d..48e83cc312 100644 --- a/datadog-sidecar/Cargo.toml +++ b/datadog-sidecar/Cargo.toml @@ -29,6 +29,7 @@ libdd-trace-utils = { path = "../libdd-trace-utils" } libdd-trace-stats = { path = "../libdd-trace-stats", default-features=false, features = ["https"] } datadog-remote-config = { path = "../datadog-remote-config" , features = ["live-debugger"]} datadog-live-debugger = { path = "../datadog-live-debugger" } +datadog-ffe = { path = "../datadog-ffe", features = ["evaluation-metrics"] } libdd-crashtracker = { path = "../libdd-crashtracker" } libdd-dogstatsd-client = { path = "../libdd-dogstatsd-client" } libdd-tinybytes = { path = "../libdd-tinybytes" } @@ -43,7 +44,6 @@ datadog-ipc-macros = { path = "../datadog-ipc-macros" } rand = "0.8.3" rmp-serde = "1.3.0" -libdd-trace-protobuf = { path = "../libdd-trace-protobuf" } serde = { version = "1.0", features = ["derive", "rc"] } serde_with = "3.6.0" bincode = { version = "1.3.3" } diff --git a/datadog-sidecar/src/service/ffe_metrics_flusher.rs b/datadog-sidecar/src/service/ffe_metrics_flusher.rs new file mode 100644 index 0000000000..d71bc17ac6 --- /dev/null +++ b/datadog-sidecar/src/service/ffe_metrics_flusher.rs @@ -0,0 +1,236 @@ +// Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! Aggregates, serializes, and forwards FFE (Feature Flag Evaluation) metric +//! events to a user-configured OTLP HTTP metrics intake. + +use crate::service::{FfeEvaluationMetric, FfeTelemetryContext}; +use datadog_ffe::telemetry::evaluation_metrics::encode_metrics_payload; +use http::Method; +use libdd_capabilities::{Bytes, HttpClientCapability, SleepCapability}; +use libdd_common::Endpoint; +use std::time::Duration; +use tracing::{debug, warn}; + +const USER_AGENT: &str = concat!("ddtrace-sidecar/", env!("CARGO_PKG_VERSION")); + +/// Build an `Endpoint` for an OTLP metrics intake from a fully-qualified URL. +/// +/// Production callers supply the URL via the FFI (typically the value of +/// `OTEL_EXPORTER_OTLP_METRICS_ENDPOINT`; the OpenTelemetry spec default is +/// `http://localhost:4318/v1/metrics`). +/// Returns `None` if the URL is unparseable. The OTLP endpoint is unrelated +/// to the Agent base, so we don't preserve any session fields here. +pub(crate) fn otlp_metrics_endpoint(url: &str) -> Option { + let url = url.parse().ok()?; + Some(Endpoint { + url, + ..Endpoint::default() + }) +} + +/// POST structured FFE metric events as OTLP/protobuf to the configured intake. +/// Fire-and-forget: non-2xx responses and network errors are logged and +/// dropped (matches dd-trace-go/py OTLP exporter behavior). +pub(crate) async fn send_metrics( + client: &C, + endpoint: &Endpoint, + context: FfeTelemetryContext, + metrics: Vec, +) { + let Some(payload) = encode_metrics_payload(context, metrics) else { + return; + }; + send_payload(client, endpoint, payload).await; +} + +async fn send_payload( + client: &C, + endpoint: &Endpoint, + payload: Vec, +) { + let builder = match endpoint.to_request_builder(USER_AGENT) { + Ok(b) => b, + Err(e) => { + debug!("ffe_metrics_flusher: failed to build request: {e:?}"); + return; + } + }; + + let req = match builder + .method(Method::POST) + .header("Content-Type", "application/x-protobuf") + .body(Bytes::from(payload)) + { + Ok(r) => r, + Err(e) => { + debug!("ffe_metrics_flusher: failed to construct request body: {e:?}"); + return; + } + }; + + let timeout = Duration::from_millis(endpoint.timeout_ms); + let response = tokio::select! { + biased; + result = client.request(req) => result, + _ = client.sleep(timeout) => { + debug!("ffe_metrics_flusher: request timed out after {timeout:?}"); + return; + } + }; + + match response { + Ok(resp) => { + let status = resp.status(); + if !status.is_success() { + let body_preview = truncate(resp.body().as_ref(), 256); + warn!("ffe_metrics_flusher: non-2xx response {status}: {body_preview}"); + } else { + debug!("ffe_metrics_flusher: sent metric batch, status={status}"); + } + } + Err(e) => { + debug!("ffe_metrics_flusher: request failed: {e:?}"); + } + } +} + +fn truncate(bytes: &[u8], cap: usize) -> String { + let take = bytes.len().min(cap); + String::from_utf8_lossy(&bytes[..take]).into_owned() +} + +#[cfg(test)] +mod tests { + use super::*; + use httpmock::MockServer; + use libdd_capabilities::{HttpError, MaybeSend}; + use libdd_capabilities_impl::NativeCapabilities; + use std::future; + + fn context() -> FfeTelemetryContext { + FfeTelemetryContext { + service: "svc".to_owned(), + env: "prod".to_owned(), + version: "1".to_owned(), + } + } + + fn metric(flag_key: &str, variant: &str, reason: &str) -> FfeEvaluationMetric { + FfeEvaluationMetric { + flag_key: flag_key.to_owned(), + variant: variant.to_owned(), + reason: reason.to_owned(), + error_type: None, + allocation_key: Some("alloc".to_owned()), + } + } + + /// POST hits the configured OTLP metrics path with application/x-protobuf. + #[tokio::test] + #[cfg_attr(miri, ignore)] + async fn posts_protobuf_to_configured_endpoint() { + let server = MockServer::start_async().await; + let mock = server + .mock_async(|when, then| { + when.method(httpmock::Method::POST) + .path("/v1/metrics") + .header("content-type", "application/x-protobuf"); + then.status(202); + }) + .await; + + let url = server.url("/v1/metrics"); + let ep = otlp_metrics_endpoint(&url).unwrap(); + let client = NativeCapabilities::new_client(); + + send_metrics( + &client, + &ep, + context(), + vec![metric("flag", "variant", "TARGETING_MATCH")], + ) + .await; + + mock.assert_async().await; + assert_eq!(mock.calls_async().await, 1); + } + + /// Non-2xx responses are logged and dropped; no panic, no retry. + #[tokio::test] + #[cfg_attr(miri, ignore)] + async fn non_2xx_does_not_panic() { + let server = MockServer::start_async().await; + let _mock = server + .mock_async(|when, then| { + when.method(httpmock::Method::POST).path("/v1/metrics"); + then.status(500).body("intake overloaded"); + }) + .await; + + let url = server.url("/v1/metrics"); + let ep = otlp_metrics_endpoint(&url).unwrap(); + let client = NativeCapabilities::new_client(); + send_metrics( + &client, + &ep, + context(), + vec![metric("flag", "variant", "TARGETING_MATCH")], + ) + .await; + } + + #[tokio::test] + async fn timeout_returns_without_waiting_for_http_response() { + let ep = Endpoint { + url: "http://localhost:4318/v1/metrics".parse().unwrap(), + timeout_ms: 1, + ..Endpoint::default() + }; + + send_metrics( + &HangingCapabilities, + &ep, + context(), + vec![metric("flag", "variant", "TARGETING_MATCH")], + ) + .await; + } + + #[test] + fn default_endpoint_is_parseable() { + let ep = otlp_metrics_endpoint("http://localhost:4318/v1/metrics").unwrap(); + assert_eq!(ep.url.scheme_str(), Some("http")); + assert_eq!(ep.url.path(), "/v1/metrics"); + } + + #[test] + fn invalid_url_returns_none() { + assert!(otlp_metrics_endpoint("not a url").is_none()); + } + + #[derive(Clone, Debug)] + struct HangingCapabilities; + + impl HttpClientCapability for HangingCapabilities { + fn new_client() -> Self { + Self + } + + fn request( + &self, + _req: http::Request, + ) -> impl future::Future, HttpError>> + MaybeSend + { + future::pending() + } + } + + impl SleepCapability for HangingCapabilities { + fn new() -> Self { + Self + } + + async fn sleep(&self, _duration: Duration) {} + } +} diff --git a/datadog-sidecar/src/service/mod.rs b/datadog-sidecar/src/service/mod.rs index bc5930fc78..e88a190ef1 100644 --- a/datadog-sidecar/src/service/mod.rs +++ b/datadog-sidecar/src/service/mod.rs @@ -3,6 +3,8 @@ // imports for structs defined in this file use crate::config; +pub use datadog_ffe::telemetry::evaluation_metrics::FfeEvaluationMetric; +pub use datadog_ffe::telemetry::FfeTelemetryContext; use datadog_remote_config::{RemoteConfigCapabilities, RemoteConfigProduct}; use libdd_common::tag::Tag; use libdd_common::Endpoint; @@ -28,6 +30,7 @@ pub mod agent_info; pub mod blocking; mod debugger_diagnostics_bookkeeper; pub mod exception_hash_rate_limiter; +pub(crate) mod ffe_metrics_flusher; mod instance_id; mod queue_id; mod remote_configs; @@ -82,4 +85,12 @@ pub enum SidecarAction { Telemetry(TelemetryActions), AddTelemetryMetricPoint((String, f64, Vec)), PhpComposerTelemetryFile(PathBuf), + /// Structured FFE evaluation metrics. The sidecar owns OTLP/protobuf + /// aggregation, serialization, and delivery. This action must be sent only + /// by SDKs that explicitly opted into native FFE metric ownership. + FfeEvaluationMetrics { + endpoint: String, + context: FfeTelemetryContext, + metrics: Vec, + }, } diff --git a/datadog-sidecar/src/service/sidecar_server.rs b/datadog-sidecar/src/service/sidecar_server.rs index e4841081a6..6233e69741 100644 --- a/datadog-sidecar/src/service/sidecar_server.rs +++ b/datadog-sidecar/src/service/sidecar_server.rs @@ -35,6 +35,7 @@ use crate::service::debugger_diagnostics_bookkeeper::{ DebuggerDiagnosticsBookkeeper, DebuggerDiagnosticsBookkeeperStats, }; use crate::service::exception_hash_rate_limiter::EXCEPTION_HASH_LIMITER; +use crate::service::ffe_metrics_flusher; use crate::service::remote_configs::{RemoteConfigNotifyTarget, RemoteConfigs}; use crate::service::stats_flusher::{ flush_all_stats_now, get_or_create_concentrator, stats_endpoint, ConcentratorKey, @@ -44,6 +45,7 @@ use crate::service::tracing::trace_flusher::TraceFlusherStats; use crate::tokio_util::run_or_spawn_shared; use datadog_live_debugger::sender::{agent_info_supports_debugger_v2_endpoint, DebuggerType}; use datadog_remote_config::fetch::{ConfigInvariants, ConfigOptions, MultiTargetStats}; +use libdd_capabilities_impl::NativeCapabilities; use libdd_common::tag::Tag; use libdd_dogstatsd_client::{new, DogStatsDActionOwned}; use libdd_telemetry::config::Config; @@ -109,6 +111,8 @@ pub struct SidecarServer { debugger_diagnostics_bookkeeper: Arc, /// Per-env&version SHM span concentrators (global across all sessions). pub(crate) span_concentrators: Arc>>>, + /// HTTP client shared by FFE fire-and-forget forwarders for connection reuse. + pub(crate) ffe_http_client: NativeCapabilities, } /// Per-connection handler wrapper that tracks sessions/instances for cleanup on disconnect. @@ -405,6 +409,43 @@ impl SidecarInterface for ConnectionSidecarHandler { trace_config.tracer_version.clone(), ); + // FFE metric actions are session-scoped, not application-scoped: + // dispatch them before the `applications.entry(queue_id)` check so they + // are not silently dropped when the PHP runtime hasn't yet registered the + // application via remote-config metadata. The PHP metric writer can fire + // as soon as evaluations begin, which is often earlier than the first RC + // config registration call. + let ffe_http_client = self.server.ffe_http_client.clone(); + let actions: Vec = actions + .into_iter() + .filter(|a| match a { + SidecarAction::FfeEvaluationMetrics { + endpoint, + context, + metrics, + } => { + if let Some(ep) = ffe_metrics_flusher::otlp_metrics_endpoint(endpoint) { + let client = ffe_http_client.clone(); + let context = context.clone(); + let metrics = metrics.clone(); + tokio::spawn(async move { + ffe_metrics_flusher::send_metrics(&client, &ep, context, metrics).await; + }); + } else { + debug!( + "ffe_metrics_flusher: unparseable endpoint {endpoint:?}, dropping batch" + ); + } + false + } + _ => true, + }) + .collect(); + + if actions.is_empty() { + return; + } + let rt_info = self.server.get_runtime(&instance_id); let mut applications = rt_info.lock_applications(); @@ -1075,4 +1116,143 @@ impl SidecarInterface for ConnectionSidecarHandler { } } +#[cfg(test)] +mod tests { + use super::*; + use crate::service::{FfeEvaluationMetric, FfeTelemetryContext}; + use httpmock::{Method::POST, MockServer}; + use tokio::time::{sleep, Duration as TokioDuration}; + + fn ffe_context() -> FfeTelemetryContext { + FfeTelemetryContext { + service: "svc".to_owned(), + env: "prod".to_owned(), + version: "1".to_owned(), + } + } + + fn ffe_metric() -> FfeEvaluationMetric { + FfeEvaluationMetric { + flag_key: "flag".to_owned(), + variant: "variant".to_owned(), + reason: "TARGETING_MATCH".to_owned(), + error_type: None, + allocation_key: Some("alloc".to_owned()), + } + } + + #[tokio::test] + #[cfg_attr(miri, ignore)] + async fn ffe_metric_actions_dispatch_without_registered_application() { + let http_server = MockServer::start_async().await; + let metrics_mock = http_server + .mock_async(|when, then| { + when.method(POST).path("/v1/metrics"); + then.status(202); + }) + .await; + + let handler = ConnectionSidecarHandler::new(SidecarServer::default()); + let instance_id = InstanceId::new("session", "runtime"); + let queue_id = QueueId::from(42); + + handler + .server + .get_session(&instance_id.session_id) + .modify_trace_config(|cfg| { + let endpoint = Endpoint { + url: http_server.url("/").parse().unwrap(), + ..Endpoint::default() + }; + cfg.set_endpoint(endpoint).unwrap(); + }); + + assert!(!handler + .server + .get_runtime(&instance_id) + .lock_applications() + .contains_key(&queue_id)); + + handler + .enqueue_actions( + PeerCredentials::default(), + instance_id.clone(), + queue_id, + vec![SidecarAction::FfeEvaluationMetrics { + endpoint: http_server.url("/v1/metrics"), + context: ffe_context(), + metrics: vec![ffe_metric()], + }], + ) + .await; + + for _ in 0..100 { + if metrics_mock.calls_async().await == 1 { + break; + } + sleep(TokioDuration::from_millis(10)).await; + } + + metrics_mock.assert_async().await; + assert!(!handler + .server + .get_runtime(&instance_id) + .lock_applications() + .contains_key(&queue_id)); + } + + #[tokio::test] + #[cfg_attr(miri, ignore)] + async fn registered_sdk_without_ffe_actions_does_not_emit_ffe_telemetry() { + let http_server = MockServer::start_async().await; + let metrics_mock = http_server + .mock_async(|when, then| { + when.method(POST).path("/v1/metrics"); + then.status(202); + }) + .await; + + let handler = ConnectionSidecarHandler::new(SidecarServer::default()); + let instance_id = InstanceId::new("session", "runtime"); + let queue_id = QueueId::from(42); + + handler + .server + .get_session(&instance_id.session_id) + .modify_trace_config(|cfg| { + let endpoint = Endpoint { + url: http_server.url("/").parse().unwrap(), + ..Endpoint::default() + }; + cfg.set_endpoint(endpoint).unwrap(); + }); + + handler + .server + .get_runtime(&instance_id) + .lock_applications() + .entry(queue_id) + .or_default(); + + assert!(handler + .server + .get_runtime(&instance_id) + .lock_applications() + .contains_key(&queue_id)); + + handler + .enqueue_actions( + PeerCredentials::default(), + instance_id, + queue_id, + Vec::new(), + ) + .await; + + sleep(TokioDuration::from_millis(50)).await; + + assert_eq!(metrics_mock.calls_async().await, 0); + } +} + // TODO: APMSP-1079 - Unit tests are sparse for the sidecar server. We should add more. diff --git a/datadog-sidecar/src/service/telemetry.rs b/datadog-sidecar/src/service/telemetry.rs index 201f72fb33..2c7ecda42a 100644 --- a/datadog-sidecar/src/service/telemetry.rs +++ b/datadog-sidecar/src/service/telemetry.rs @@ -454,6 +454,7 @@ impl TelemetryCachedClient { } } SidecarAction::PhpComposerTelemetryFile(_) => {} // handled separately + SidecarAction::FfeEvaluationMetrics { .. } => {} // handled in sidecar_server } } actions