From 1f1fca4392ac61ace419c47268133ed98fc838a7 Mon Sep 17 00:00:00 2001 From: Leo Romanovsky Date: Thu, 28 May 2026 10:45:39 -0400 Subject: [PATCH 1/3] feat(sidecar): forward FFE evaluation metrics to OTLP intake --- .github/CODEOWNERS | 1 + Cargo.lock | 1 + datadog-sidecar-ffi/src/lib.rs | 100 +++- datadog-sidecar/Cargo.toml | 1 + .../src/service/ffe_metrics_flusher.rs | 535 ++++++++++++++++++ datadog-sidecar/src/service/mod.rs | 25 + datadog-sidecar/src/service/sidecar_server.rs | 180 ++++++ datadog-sidecar/src/service/telemetry.rs | 1 + 8 files changed, 842 insertions(+), 2 deletions(-) create mode 100644 datadog-sidecar/src/service/ffe_metrics_flusher.rs diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 433077e910..0d89d0e47b 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -103,3 +103,4 @@ tools/sidecar_mockgen/ @DataDog/libdatadog-php libdd-data-pipeline/src/otlp/ @DataDog/apm-sdk-capabilities-rust libdd-data-pipeline/tests/test_trace_exporter_otlp_export.rs @DataDog/apm-sdk-capabilities-rust libdd-trace-utils/src/otlp_encoder/ @DataDog/apm-sdk-capabilities-rust +datadog-sidecar/src/service/ffe_metrics_flusher.rs @DataDog/libdatadog-php @DataDog/libdatadog-apm @DataDog/feature-flagging-and-experimentation-sdk diff --git a/Cargo.lock b/Cargo.lock index ba5b18e3bb..76145c76d9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1552,6 +1552,7 @@ dependencies = [ "nix 0.29.0", "prctl", "priority-queue", + "prost", "rand 0.8.5", "rmp-serde", "sendfd", diff --git a/datadog-sidecar-ffi/src/lib.rs b/datadog-sidecar-ffi/src/lib.rs index 7cc6a3267d..ef37847258 100644 --- a/datadog-sidecar-ffi/src/lib.rs +++ b/datadog-sidecar-ffi/src/lib.rs @@ -27,7 +27,8 @@ use datadog_sidecar::service::agent_info::AgentInfoReader; use datadog_sidecar::service::telemetry::InternalTelemetryAction; use datadog_sidecar::service::{ blocking::{self, SidecarTransport}, - DynamicInstrumentationConfigState, InstanceId, QueueId, RuntimeMetadata, + DynamicInstrumentationConfigState, FfeEvaluationMetric as SidecarFfeEvaluationMetric, + FfeTelemetryContext as SidecarFfeTelemetryContext, InstanceId, QueueId, RuntimeMetadata, SerializedTracerHeaderTags, SessionConfig, SidecarAction, SidecarFlushOptions, }; use datadog_sidecar::service::{get_telemetry_action_sender, InternalTelemetryActions}; @@ -35,7 +36,7 @@ use datadog_sidecar::shm_remote_config::{path_for_remote_config, RemoteConfigRea use libc::c_char; use libdd_common::tag::Tag; use libdd_common::Endpoint; -use libdd_common_ffi::slice::{AsBytes, CharSlice}; +use libdd_common_ffi::slice::{AsBytes, CharSlice, Slice}; use libdd_common_ffi::{self as ffi, MaybeError}; #[cfg(windows)] use libdd_crashtracker_ffi::Metadata; @@ -1116,6 +1117,101 @@ pub unsafe extern "C" fn ddog_sidecar_send_debugger_datum( ddog_sidecar_send_debugger_data(transport, instance_id, queue_id, vec![*payload]) } +#[repr(C)] +pub struct FfeTelemetryContext<'a> { + pub service: CharSlice<'a>, + pub env: CharSlice<'a>, + pub version: CharSlice<'a>, +} + +#[repr(C)] +pub struct FfeEvaluationMetric<'a> { + pub flag_key: CharSlice<'a>, + pub variant: CharSlice<'a>, + pub reason: CharSlice<'a>, + pub error_type: CharSlice<'a>, + pub allocation_key: CharSlice<'a>, +} + +/// Send structured FFE evaluation metric events to the sidecar. The sidecar +/// owns aggregation, OTLP/protobuf serialization, and OTLP HTTP delivery. This +/// function is caller-driven so SDKs with existing host-language hooks can +/// safely coexist until they explicitly migrate. +/// +/// # Safety +/// `endpoint`, `context`, and every element in `metrics` must contain valid +/// UTF-8 `CharSlice` values. Empty `endpoint` or `metrics` is a no-op. +#[no_mangle] +#[allow(clippy::missing_safety_doc)] +pub unsafe extern "C" fn ddog_sidecar_send_ffe_evaluation_metrics( + transport: &mut Box, + instance_id: &InstanceId, + queue_id: &QueueId, + endpoint: CharSlice, + context: &FfeTelemetryContext<'_>, + metrics: Slice>, +) -> MaybeError { + if endpoint.is_empty() || metrics.is_empty() { + return MaybeError::None; + } + + let endpoint = try_c!(char_slice_to_string(endpoint)); + let context = try_c!(ffe_context_from_ffi(context)); + let metrics = try_c!(metrics + .try_as_slice() + .map_err(|e| format!("Invalid metric slice: {e}")) + .and_then(|metrics| metrics + .iter() + .map(ffe_metric_from_ffi) + .collect::, _>>())); + + if metrics.is_empty() { + return MaybeError::None; + } + + try_c!(blocking::enqueue_actions( + transport, + instance_id, + queue_id, + vec![SidecarAction::FfeEvaluationMetrics { + endpoint, + context, + metrics, + }], + )); + MaybeError::None +} + +fn ffe_context_from_ffi( + context: &FfeTelemetryContext<'_>, +) -> Result { + Ok(SidecarFfeTelemetryContext { + service: char_slice_to_string(context.service)?, + env: char_slice_to_string(context.env)?, + version: char_slice_to_string(context.version)?, + }) +} + +fn ffe_metric_from_ffi( + metric: &FfeEvaluationMetric<'_>, +) -> Result { + Ok(SidecarFfeEvaluationMetric { + flag_key: char_slice_to_string(metric.flag_key)?, + variant: char_slice_to_string(metric.variant)?, + reason: char_slice_to_string(metric.reason)?, + error_type: optional_string(metric.error_type)?, + allocation_key: optional_string(metric.allocation_key)?, + }) +} + +fn optional_string(slice: CharSlice) -> Result, String> { + if slice.is_empty() { + Ok(None) + } else { + char_slice_to_string(slice).map(Some) + } +} + #[no_mangle] #[allow(clippy::missing_safety_doc)] #[allow(improper_ctypes_definitions)] // DebuggerPayload is just a pointer, we hide its internals diff --git a/datadog-sidecar/Cargo.toml b/datadog-sidecar/Cargo.toml index 8d04b2d28d..345fb4db39 100644 --- a/datadog-sidecar/Cargo.toml +++ b/datadog-sidecar/Cargo.toml @@ -48,6 +48,7 @@ serde = { version = "1.0", features = ["derive", "rc"] } serde_with = "3.6.0" bincode = { version = "1.3.3" } serde_json = "1.0" +prost = "0.14.1" base64 = "0.22.1" spawn_worker = { path = "../spawn_worker" } zwohash = "0.1.2" diff --git a/datadog-sidecar/src/service/ffe_metrics_flusher.rs b/datadog-sidecar/src/service/ffe_metrics_flusher.rs new file mode 100644 index 0000000000..8d5e1524cb --- /dev/null +++ b/datadog-sidecar/src/service/ffe_metrics_flusher.rs @@ -0,0 +1,535 @@ +// Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! Aggregates, serializes, and forwards FFE (Feature Flag Evaluation) metric +//! events to a user-configured OTLP HTTP metrics intake. + +use crate::service::{FfeEvaluationMetric, FfeTelemetryContext}; +use http::Method; +use libdd_capabilities::{Bytes, HttpClientCapability, SleepCapability}; +use libdd_common::Endpoint; +use libdd_trace_protobuf::opentelemetry::proto::common::v1::any_value; +use libdd_trace_protobuf::opentelemetry::proto::common::v1::{ + AnyValue, InstrumentationScope, KeyValue, +}; +use libdd_trace_protobuf::opentelemetry::proto::resource::v1::Resource; +use prost::Message; +use std::collections::BTreeMap; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use tracing::{debug, warn}; + +const USER_AGENT: &str = concat!("ddtrace-sidecar/", env!("CARGO_PKG_VERSION")); + +const METER_NAME: &str = "ddtrace.openfeature"; +const METRIC_NAME: &str = "feature_flag.evaluations"; +const METRIC_UNIT: &str = "{evaluation}"; +const METRIC_DESCRIPTION: &str = "Number of feature flag evaluations"; + +const ATTR_SERVICE_NAME: &str = "service.name"; +const ATTR_ENV: &str = "deployment.environment.name"; +const ATTR_VERSION: &str = "service.version"; +const ATTR_FLAG_KEY: &str = "feature_flag.key"; +const ATTR_VARIANT: &str = "feature_flag.result.variant"; +const ATTR_REASON: &str = "feature_flag.result.reason"; +const ATTR_ERROR_TYPE: &str = "error.type"; +const ATTR_ALLOCATION_KEY: &str = "feature_flag.result.allocation_key"; + +/// Build an `Endpoint` for an OTLP metrics intake from a fully-qualified URL. +/// +/// Production callers supply the URL via the FFI (typically the value of +/// `OTEL_EXPORTER_OTLP_METRICS_ENDPOINT`; the OpenTelemetry spec default is +/// `http://localhost:4318/v1/metrics`). +/// Returns `None` if the URL is unparseable. The OTLP endpoint is unrelated +/// to the Agent base, so we don't preserve any session fields here. +pub(crate) fn otlp_metrics_endpoint(url: &str) -> Option { + let url = url.parse().ok()?; + Some(Endpoint { + url, + ..Endpoint::default() + }) +} + +pub(crate) fn encode_metrics_payload( + context: FfeTelemetryContext, + metrics: Vec, +) -> Option> { + if metrics.is_empty() { + return None; + } + + let now = unix_nano_now(); + let data_points = aggregate(metrics) + .into_iter() + .map(|(attributes, count)| otlp::NumberDataPoint { + attributes: attributes + .into_iter() + .map(|(key, value)| string_key_value(key, value)) + .collect(), + start_time_unix_nano: now, + time_unix_nano: now, + value: Some(otlp::number_data_point::Value::AsInt(count)), + flags: 0, + }) + .collect::>(); + + if data_points.is_empty() { + return None; + } + + let request = otlp::ExportMetricsServiceRequest { + resource_metrics: vec![otlp::ResourceMetrics { + resource: Some(resource(context)), + scope_metrics: vec![otlp::ScopeMetrics { + scope: Some(InstrumentationScope { + name: METER_NAME.to_owned(), + version: String::new(), + attributes: vec![], + dropped_attributes_count: 0, + }), + metrics: vec![otlp::Metric { + name: METRIC_NAME.to_owned(), + description: METRIC_DESCRIPTION.to_owned(), + unit: METRIC_UNIT.to_owned(), + data: Some(otlp::metric::Data::Sum(otlp::Sum { + data_points, + aggregation_temporality: otlp::AggregationTemporality::Delta as i32, + is_monotonic: true, + })), + }], + schema_url: String::new(), + }], + schema_url: String::new(), + }], + }; + + Some(request.encode_to_vec()) +} + +/// POST structured FFE metric events as OTLP/protobuf to the configured intake. +/// Fire-and-forget: non-2xx responses and network errors are logged and +/// dropped (matches dd-trace-go/py OTLP exporter behavior). +pub(crate) async fn send_metrics( + client: &C, + endpoint: &Endpoint, + context: FfeTelemetryContext, + metrics: Vec, +) { + let Some(payload) = encode_metrics_payload(context, metrics) else { + return; + }; + send_payload(client, endpoint, payload).await; +} + +async fn send_payload( + client: &C, + endpoint: &Endpoint, + payload: Vec, +) { + let builder = match endpoint.to_request_builder(USER_AGENT) { + Ok(b) => b, + Err(e) => { + debug!("ffe_metrics_flusher: failed to build request: {e:?}"); + return; + } + }; + + let req = match builder + .method(Method::POST) + .header("Content-Type", "application/x-protobuf") + .body(Bytes::from(payload)) + { + Ok(r) => r, + Err(e) => { + debug!("ffe_metrics_flusher: failed to construct request body: {e:?}"); + return; + } + }; + + let timeout = Duration::from_millis(endpoint.timeout_ms); + let response = tokio::select! { + biased; + result = client.request(req) => result, + _ = client.sleep(timeout) => { + debug!("ffe_metrics_flusher: request timed out after {timeout:?}"); + return; + } + }; + + match response { + Ok(resp) => { + let status = resp.status(); + if !status.is_success() { + let body_preview = truncate(resp.body().as_ref(), 256); + warn!("ffe_metrics_flusher: non-2xx response {status}: {body_preview}"); + } else { + debug!("ffe_metrics_flusher: sent metric batch, status={status}"); + } + } + Err(e) => { + debug!("ffe_metrics_flusher: request failed: {e:?}"); + } + } +} + +fn aggregate(metrics: Vec) -> BTreeMap, i64> { + let mut counts = BTreeMap::new(); + for metric in metrics { + if metric.flag_key.is_empty() { + continue; + } + let attrs = metric_attributes(metric); + *counts.entry(attrs).or_insert(0) += 1; + } + counts +} + +fn metric_attributes(metric: FfeEvaluationMetric) -> BTreeMap { + let reason = normalize(&metric.reason, "unknown"); + let mut attrs = BTreeMap::from([ + (ATTR_FLAG_KEY.to_owned(), metric.flag_key), + (ATTR_VARIANT.to_owned(), metric.variant), + (ATTR_REASON.to_owned(), reason.clone()), + ]); + + if let Some(error_type) = metric.error_type { + if !error_type.is_empty() { + attrs.insert( + ATTR_ERROR_TYPE.to_owned(), + normalize(&error_type, "general"), + ); + } + } + + if let Some(allocation_key) = metric.allocation_key { + if !allocation_key.is_empty() + && !matches!(reason.as_str(), "error" | "default" | "disabled") + { + attrs.insert(ATTR_ALLOCATION_KEY.to_owned(), allocation_key); + } + } + + attrs +} + +fn normalize(value: &str, default: &str) -> String { + let value = value.trim(); + if value.is_empty() { + default.to_owned() + } else { + value.to_ascii_lowercase() + } +} + +fn resource(context: FfeTelemetryContext) -> Resource { + let mut attributes = vec![]; + if !context.service.is_empty() { + attributes.push(string_key_value( + ATTR_SERVICE_NAME.to_owned(), + context.service, + )); + } + if !context.env.is_empty() { + attributes.push(string_key_value(ATTR_ENV.to_owned(), context.env)); + } + if !context.version.is_empty() { + attributes.push(string_key_value(ATTR_VERSION.to_owned(), context.version)); + } + Resource { + attributes, + dropped_attributes_count: 0, + entity_refs: vec![], + } +} + +fn string_key_value(key: String, value: String) -> KeyValue { + KeyValue { + key, + value: Some(AnyValue { + value: Some(any_value::Value::StringValue(value)), + }), + key_ref: 0, + } +} + +fn unix_nano_now() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_nanos().min(u128::from(u64::MAX)) as u64) + .unwrap_or_default() +} + +fn truncate(bytes: &[u8], cap: usize) -> String { + let take = bytes.len().min(cap); + String::from_utf8_lossy(&bytes[..take]).into_owned() +} + +mod otlp { + use libdd_trace_protobuf::opentelemetry::proto::common::v1::{InstrumentationScope, KeyValue}; + use libdd_trace_protobuf::opentelemetry::proto::resource::v1::Resource; + + #[derive(Clone, PartialEq, ::prost::Message)] + pub struct ExportMetricsServiceRequest { + #[prost(message, repeated, tag = "1")] + pub resource_metrics: ::prost::alloc::vec::Vec, + } + + #[derive(Clone, PartialEq, ::prost::Message)] + pub struct ResourceMetrics { + #[prost(message, optional, tag = "1")] + pub resource: ::core::option::Option, + #[prost(message, repeated, tag = "2")] + pub scope_metrics: ::prost::alloc::vec::Vec, + #[prost(string, tag = "3")] + pub schema_url: ::prost::alloc::string::String, + } + + #[derive(Clone, PartialEq, ::prost::Message)] + pub struct ScopeMetrics { + #[prost(message, optional, tag = "1")] + pub scope: ::core::option::Option, + #[prost(message, repeated, tag = "2")] + pub metrics: ::prost::alloc::vec::Vec, + #[prost(string, tag = "3")] + pub schema_url: ::prost::alloc::string::String, + } + + #[derive(Clone, PartialEq, ::prost::Message)] + pub struct Metric { + #[prost(string, tag = "1")] + pub name: ::prost::alloc::string::String, + #[prost(string, tag = "2")] + pub description: ::prost::alloc::string::String, + #[prost(string, tag = "3")] + pub unit: ::prost::alloc::string::String, + #[prost(oneof = "metric::Data", tags = "7")] + pub data: ::core::option::Option, + } + + pub mod metric { + #[derive(Clone, PartialEq, ::prost::Oneof)] + pub enum Data { + #[prost(message, tag = "7")] + Sum(super::Sum), + } + } + + #[derive(Clone, PartialEq, ::prost::Message)] + pub struct Sum { + #[prost(message, repeated, tag = "1")] + pub data_points: ::prost::alloc::vec::Vec, + #[prost(enumeration = "AggregationTemporality", tag = "2")] + pub aggregation_temporality: i32, + #[prost(bool, tag = "3")] + pub is_monotonic: bool, + } + + #[derive(Clone, PartialEq, ::prost::Message)] + pub struct NumberDataPoint { + #[prost(fixed64, tag = "2")] + pub start_time_unix_nano: u64, + #[prost(fixed64, tag = "3")] + pub time_unix_nano: u64, + #[prost(oneof = "number_data_point::Value", tags = "6")] + pub value: ::core::option::Option, + #[prost(message, repeated, tag = "7")] + pub attributes: ::prost::alloc::vec::Vec, + #[prost(uint32, tag = "8")] + pub flags: u32, + } + + pub mod number_data_point { + #[derive(Clone, PartialEq, ::prost::Oneof)] + pub enum Value { + #[prost(sfixed64, tag = "6")] + AsInt(i64), + } + } + + #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] + #[repr(i32)] + pub enum AggregationTemporality { + Unspecified = 0, + Delta = 1, + Cumulative = 2, + } +} + +#[cfg(test)] +mod tests { + use super::*; + use httpmock::MockServer; + use libdd_capabilities::{HttpError, MaybeSend}; + use libdd_capabilities_impl::NativeCapabilities; + use std::future; + + fn context() -> FfeTelemetryContext { + FfeTelemetryContext { + service: "svc".to_owned(), + env: "prod".to_owned(), + version: "1".to_owned(), + } + } + + fn metric(flag_key: &str, variant: &str, reason: &str) -> FfeEvaluationMetric { + FfeEvaluationMetric { + flag_key: flag_key.to_owned(), + variant: variant.to_owned(), + reason: reason.to_owned(), + error_type: None, + allocation_key: Some("alloc".to_owned()), + } + } + + #[test] + fn encodes_otlp_counter_and_aggregates_matching_attributes() { + let payload = encode_metrics_payload( + context(), + vec![ + metric("flag", "variant", "TARGETING_MATCH"), + metric("flag", "variant", "targeting_match"), + ], + ) + .unwrap(); + + let decoded = otlp::ExportMetricsServiceRequest::decode(payload.as_slice()).unwrap(); + let resource_metrics = &decoded.resource_metrics[0]; + let attrs = &resource_metrics.resource.as_ref().unwrap().attributes; + assert!(attrs.iter().any(|kv| kv.key == ATTR_SERVICE_NAME)); + + let data_points = match resource_metrics.scope_metrics[0].metrics[0] + .data + .as_ref() + .unwrap() + { + otlp::metric::Data::Sum(sum) => &sum.data_points, + }; + assert_eq!(data_points.len(), 1); + assert_eq!( + data_points[0].value, + Some(otlp::number_data_point::Value::AsInt(2)) + ); + } + + #[test] + fn excludes_allocation_key_for_error_default_and_disabled() { + for reason in ["ERROR", "DEFAULT", "DISABLED"] { + let attrs = metric_attributes(FfeEvaluationMetric { + flag_key: "flag".to_owned(), + variant: String::new(), + reason: reason.to_owned(), + error_type: Some("FLAG_NOT_FOUND".to_owned()), + allocation_key: Some("alloc".to_owned()), + }); + assert!(!attrs.contains_key(ATTR_ALLOCATION_KEY)); + assert_eq!(attrs[ATTR_ERROR_TYPE], "flag_not_found"); + } + } + + /// POST hits the configured OTLP metrics path with application/x-protobuf. + #[tokio::test] + #[cfg_attr(miri, ignore)] + async fn posts_protobuf_to_configured_endpoint() { + let server = MockServer::start_async().await; + let mock = server + .mock_async(|when, then| { + when.method(httpmock::Method::POST) + .path("/v1/metrics") + .header("content-type", "application/x-protobuf"); + then.status(202); + }) + .await; + + let url = server.url("/v1/metrics"); + let ep = otlp_metrics_endpoint(&url).unwrap(); + let client = NativeCapabilities::new_client(); + + send_metrics( + &client, + &ep, + context(), + vec![metric("flag", "variant", "TARGETING_MATCH")], + ) + .await; + + mock.assert_async().await; + assert_eq!(mock.calls_async().await, 1); + } + + /// Non-2xx responses are logged and dropped; no panic, no retry. + #[tokio::test] + #[cfg_attr(miri, ignore)] + async fn non_2xx_does_not_panic() { + let server = MockServer::start_async().await; + let _mock = server + .mock_async(|when, then| { + when.method(httpmock::Method::POST).path("/v1/metrics"); + then.status(500).body("intake overloaded"); + }) + .await; + + let url = server.url("/v1/metrics"); + let ep = otlp_metrics_endpoint(&url).unwrap(); + let client = NativeCapabilities::new_client(); + send_metrics( + &client, + &ep, + context(), + vec![metric("flag", "variant", "TARGETING_MATCH")], + ) + .await; + } + + #[tokio::test] + async fn timeout_returns_without_waiting_for_http_response() { + let ep = Endpoint { + url: "http://localhost:4318/v1/metrics".parse().unwrap(), + timeout_ms: 1, + ..Endpoint::default() + }; + + send_metrics( + &HangingCapabilities, + &ep, + context(), + vec![metric("flag", "variant", "TARGETING_MATCH")], + ) + .await; + } + + #[test] + fn default_endpoint_is_parseable() { + let ep = otlp_metrics_endpoint("http://localhost:4318/v1/metrics").unwrap(); + assert_eq!(ep.url.scheme_str(), Some("http")); + assert_eq!(ep.url.path(), "/v1/metrics"); + } + + #[test] + fn invalid_url_returns_none() { + assert!(otlp_metrics_endpoint("not a url").is_none()); + } + + #[derive(Clone, Debug)] + struct HangingCapabilities; + + impl HttpClientCapability for HangingCapabilities { + fn new_client() -> Self { + Self + } + + fn request( + &self, + _req: http::Request, + ) -> impl future::Future, HttpError>> + MaybeSend + { + future::pending() + } + } + + impl SleepCapability for HangingCapabilities { + fn new() -> Self { + Self + } + + async fn sleep(&self, _duration: Duration) {} + } +} diff --git a/datadog-sidecar/src/service/mod.rs b/datadog-sidecar/src/service/mod.rs index bc5930fc78..a5beff9f3a 100644 --- a/datadog-sidecar/src/service/mod.rs +++ b/datadog-sidecar/src/service/mod.rs @@ -28,6 +28,7 @@ pub mod agent_info; pub mod blocking; mod debugger_diagnostics_bookkeeper; pub mod exception_hash_rate_limiter; +pub(crate) mod ffe_metrics_flusher; mod instance_id; mod queue_id; mod remote_configs; @@ -82,4 +83,28 @@ pub enum SidecarAction { Telemetry(TelemetryActions), AddTelemetryMetricPoint((String, f64, Vec)), PhpComposerTelemetryFile(PathBuf), + /// Structured FFE evaluation metrics. The sidecar owns OTLP/protobuf + /// aggregation, serialization, and delivery. This action must be sent only + /// by SDKs that explicitly opted into native FFE metric ownership. + FfeEvaluationMetrics { + endpoint: String, + context: FfeTelemetryContext, + metrics: Vec, + }, +} + +#[derive(Clone, Debug, Default, Deserialize, Eq, Hash, PartialEq, Serialize)] +pub struct FfeTelemetryContext { + pub service: String, + pub env: String, + pub version: String, +} + +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] +pub struct FfeEvaluationMetric { + pub flag_key: String, + pub variant: String, + pub reason: String, + pub error_type: Option, + pub allocation_key: Option, } diff --git a/datadog-sidecar/src/service/sidecar_server.rs b/datadog-sidecar/src/service/sidecar_server.rs index e4841081a6..6233e69741 100644 --- a/datadog-sidecar/src/service/sidecar_server.rs +++ b/datadog-sidecar/src/service/sidecar_server.rs @@ -35,6 +35,7 @@ use crate::service::debugger_diagnostics_bookkeeper::{ DebuggerDiagnosticsBookkeeper, DebuggerDiagnosticsBookkeeperStats, }; use crate::service::exception_hash_rate_limiter::EXCEPTION_HASH_LIMITER; +use crate::service::ffe_metrics_flusher; use crate::service::remote_configs::{RemoteConfigNotifyTarget, RemoteConfigs}; use crate::service::stats_flusher::{ flush_all_stats_now, get_or_create_concentrator, stats_endpoint, ConcentratorKey, @@ -44,6 +45,7 @@ use crate::service::tracing::trace_flusher::TraceFlusherStats; use crate::tokio_util::run_or_spawn_shared; use datadog_live_debugger::sender::{agent_info_supports_debugger_v2_endpoint, DebuggerType}; use datadog_remote_config::fetch::{ConfigInvariants, ConfigOptions, MultiTargetStats}; +use libdd_capabilities_impl::NativeCapabilities; use libdd_common::tag::Tag; use libdd_dogstatsd_client::{new, DogStatsDActionOwned}; use libdd_telemetry::config::Config; @@ -109,6 +111,8 @@ pub struct SidecarServer { debugger_diagnostics_bookkeeper: Arc, /// Per-env&version SHM span concentrators (global across all sessions). pub(crate) span_concentrators: Arc>>>, + /// HTTP client shared by FFE fire-and-forget forwarders for connection reuse. + pub(crate) ffe_http_client: NativeCapabilities, } /// Per-connection handler wrapper that tracks sessions/instances for cleanup on disconnect. @@ -405,6 +409,43 @@ impl SidecarInterface for ConnectionSidecarHandler { trace_config.tracer_version.clone(), ); + // FFE metric actions are session-scoped, not application-scoped: + // dispatch them before the `applications.entry(queue_id)` check so they + // are not silently dropped when the PHP runtime hasn't yet registered the + // application via remote-config metadata. The PHP metric writer can fire + // as soon as evaluations begin, which is often earlier than the first RC + // config registration call. + let ffe_http_client = self.server.ffe_http_client.clone(); + let actions: Vec = actions + .into_iter() + .filter(|a| match a { + SidecarAction::FfeEvaluationMetrics { + endpoint, + context, + metrics, + } => { + if let Some(ep) = ffe_metrics_flusher::otlp_metrics_endpoint(endpoint) { + let client = ffe_http_client.clone(); + let context = context.clone(); + let metrics = metrics.clone(); + tokio::spawn(async move { + ffe_metrics_flusher::send_metrics(&client, &ep, context, metrics).await; + }); + } else { + debug!( + "ffe_metrics_flusher: unparseable endpoint {endpoint:?}, dropping batch" + ); + } + false + } + _ => true, + }) + .collect(); + + if actions.is_empty() { + return; + } + let rt_info = self.server.get_runtime(&instance_id); let mut applications = rt_info.lock_applications(); @@ -1075,4 +1116,143 @@ impl SidecarInterface for ConnectionSidecarHandler { } } +#[cfg(test)] +mod tests { + use super::*; + use crate::service::{FfeEvaluationMetric, FfeTelemetryContext}; + use httpmock::{Method::POST, MockServer}; + use tokio::time::{sleep, Duration as TokioDuration}; + + fn ffe_context() -> FfeTelemetryContext { + FfeTelemetryContext { + service: "svc".to_owned(), + env: "prod".to_owned(), + version: "1".to_owned(), + } + } + + fn ffe_metric() -> FfeEvaluationMetric { + FfeEvaluationMetric { + flag_key: "flag".to_owned(), + variant: "variant".to_owned(), + reason: "TARGETING_MATCH".to_owned(), + error_type: None, + allocation_key: Some("alloc".to_owned()), + } + } + + #[tokio::test] + #[cfg_attr(miri, ignore)] + async fn ffe_metric_actions_dispatch_without_registered_application() { + let http_server = MockServer::start_async().await; + let metrics_mock = http_server + .mock_async(|when, then| { + when.method(POST).path("/v1/metrics"); + then.status(202); + }) + .await; + + let handler = ConnectionSidecarHandler::new(SidecarServer::default()); + let instance_id = InstanceId::new("session", "runtime"); + let queue_id = QueueId::from(42); + + handler + .server + .get_session(&instance_id.session_id) + .modify_trace_config(|cfg| { + let endpoint = Endpoint { + url: http_server.url("/").parse().unwrap(), + ..Endpoint::default() + }; + cfg.set_endpoint(endpoint).unwrap(); + }); + + assert!(!handler + .server + .get_runtime(&instance_id) + .lock_applications() + .contains_key(&queue_id)); + + handler + .enqueue_actions( + PeerCredentials::default(), + instance_id.clone(), + queue_id, + vec![SidecarAction::FfeEvaluationMetrics { + endpoint: http_server.url("/v1/metrics"), + context: ffe_context(), + metrics: vec![ffe_metric()], + }], + ) + .await; + + for _ in 0..100 { + if metrics_mock.calls_async().await == 1 { + break; + } + sleep(TokioDuration::from_millis(10)).await; + } + + metrics_mock.assert_async().await; + assert!(!handler + .server + .get_runtime(&instance_id) + .lock_applications() + .contains_key(&queue_id)); + } + + #[tokio::test] + #[cfg_attr(miri, ignore)] + async fn registered_sdk_without_ffe_actions_does_not_emit_ffe_telemetry() { + let http_server = MockServer::start_async().await; + let metrics_mock = http_server + .mock_async(|when, then| { + when.method(POST).path("/v1/metrics"); + then.status(202); + }) + .await; + + let handler = ConnectionSidecarHandler::new(SidecarServer::default()); + let instance_id = InstanceId::new("session", "runtime"); + let queue_id = QueueId::from(42); + + handler + .server + .get_session(&instance_id.session_id) + .modify_trace_config(|cfg| { + let endpoint = Endpoint { + url: http_server.url("/").parse().unwrap(), + ..Endpoint::default() + }; + cfg.set_endpoint(endpoint).unwrap(); + }); + + handler + .server + .get_runtime(&instance_id) + .lock_applications() + .entry(queue_id) + .or_default(); + + assert!(handler + .server + .get_runtime(&instance_id) + .lock_applications() + .contains_key(&queue_id)); + + handler + .enqueue_actions( + PeerCredentials::default(), + instance_id, + queue_id, + Vec::new(), + ) + .await; + + sleep(TokioDuration::from_millis(50)).await; + + assert_eq!(metrics_mock.calls_async().await, 0); + } +} + // TODO: APMSP-1079 - Unit tests are sparse for the sidecar server. We should add more. diff --git a/datadog-sidecar/src/service/telemetry.rs b/datadog-sidecar/src/service/telemetry.rs index 201f72fb33..2c7ecda42a 100644 --- a/datadog-sidecar/src/service/telemetry.rs +++ b/datadog-sidecar/src/service/telemetry.rs @@ -454,6 +454,7 @@ impl TelemetryCachedClient { } } SidecarAction::PhpComposerTelemetryFile(_) => {} // handled separately + SidecarAction::FfeEvaluationMetrics { .. } => {} // handled in sidecar_server } } actions From 4863155ee0141add5fb2c171e908186366da03cd Mon Sep 17 00:00:00 2001 From: Leo Romanovsky Date: Thu, 28 May 2026 12:18:28 -0400 Subject: [PATCH 2/3] Move FFE metric encoding into datadog-ffe --- Cargo.lock | 5 +- datadog-ffe/Cargo.toml | 3 + datadog-ffe/src/lib.rs | 2 + .../src/telemetry/evaluation_metrics.rs | 341 ++++++++++++++++++ datadog-ffe/src/telemetry/mod.rs | 13 + datadog-sidecar/Cargo.toml | 3 +- .../src/service/ffe_metrics_flusher.rs | 303 +--------------- datadog-sidecar/src/service/mod.rs | 18 +- 8 files changed, 367 insertions(+), 321 deletions(-) create mode 100644 datadog-ffe/src/telemetry/evaluation_metrics.rs create mode 100644 datadog-ffe/src/telemetry/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 76145c76d9..45b82ddcba 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1357,8 +1357,10 @@ dependencies = [ "derive_more", "faststr", "libdd-common", + "libdd-trace-protobuf", "log", "md5", + "prost", "pyo3", "semver", "serde", @@ -1523,6 +1525,7 @@ dependencies = [ "bincode", "chrono", "console-subscriber", + "datadog-ffe", "datadog-ipc", "datadog-ipc-macros", "datadog-live-debugger", @@ -1543,7 +1546,6 @@ dependencies = [ "libdd-dogstatsd-client", "libdd-telemetry", "libdd-tinybytes", - "libdd-trace-protobuf", "libdd-trace-stats", "libdd-trace-utils", "manual_future", @@ -1552,7 +1554,6 @@ dependencies = [ "nix 0.29.0", "prctl", "priority-queue", - "prost", "rand 0.8.5", "rmp-serde", "sendfd", diff --git a/datadog-ffe/Cargo.toml b/datadog-ffe/Cargo.toml index 358b4b4016..074c741969 100644 --- a/datadog-ffe/Cargo.toml +++ b/datadog-ffe/Cargo.toml @@ -24,7 +24,10 @@ serde-bool = { version = "0.1.3", default-features = false } serde_with = { version = "3.11.0", default-features = false, features = ["base64", "hex", "macros"] } thiserror = { version = "2.0.3", default-features = false } url = { version = "2.5.0", default-features = false, features = ["std"] } +libdd-trace-protobuf = { path = "../libdd-trace-protobuf", optional = true } +prost = { version = "0.14.1", optional = true } pyo3 = { version = "0.28", optional = true, default-features = false, features = ["macros"] } [features] +telemetry = ["dep:libdd-trace-protobuf", "dep:prost"] pyo3 = ["dep:pyo3"] diff --git a/datadog-ffe/src/lib.rs b/datadog-ffe/src/lib.rs index a32b8b757c..3cd763d982 100644 --- a/datadog-ffe/src/lib.rs +++ b/datadog-ffe/src/lib.rs @@ -4,5 +4,7 @@ mod flag_type; pub mod rules_based; +#[cfg(feature = "telemetry")] +pub mod telemetry; pub use flag_type::{ExpectedFlagType, FlagType}; diff --git a/datadog-ffe/src/telemetry/evaluation_metrics.rs b/datadog-ffe/src/telemetry/evaluation_metrics.rs new file mode 100644 index 0000000000..23117b1e10 --- /dev/null +++ b/datadog-ffe/src/telemetry/evaluation_metrics.rs @@ -0,0 +1,341 @@ +// Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! Reusable FFE evaluation metric aggregation and OTLP encoding primitives. + +use super::FfeTelemetryContext; +use libdd_trace_protobuf::opentelemetry::proto::common::v1::any_value; +use libdd_trace_protobuf::opentelemetry::proto::common::v1::{ + AnyValue, InstrumentationScope, KeyValue, +}; +use libdd_trace_protobuf::opentelemetry::proto::resource::v1::Resource; +use prost::Message; +use serde::{Deserialize, Serialize}; +use std::collections::BTreeMap; +use std::time::{SystemTime, UNIX_EPOCH}; + +const METER_NAME: &str = "ddtrace.openfeature"; +const METRIC_NAME: &str = "feature_flag.evaluations"; +const METRIC_UNIT: &str = "{evaluation}"; +const METRIC_DESCRIPTION: &str = "Number of feature flag evaluations"; + +const ATTR_SERVICE_NAME: &str = "service.name"; +const ATTR_ENV: &str = "deployment.environment.name"; +const ATTR_VERSION: &str = "service.version"; +const ATTR_FLAG_KEY: &str = "feature_flag.key"; +const ATTR_VARIANT: &str = "feature_flag.result.variant"; +const ATTR_REASON: &str = "feature_flag.result.reason"; +const ATTR_ERROR_TYPE: &str = "error.type"; +const ATTR_ALLOCATION_KEY: &str = "feature_flag.result.allocation_key"; + +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] +pub struct FfeEvaluationMetric { + pub flag_key: String, + pub variant: String, + pub reason: String, + pub error_type: Option, + pub allocation_key: Option, +} + +pub fn encode_metrics_payload( + context: FfeTelemetryContext, + metrics: Vec, +) -> Option> { + if metrics.is_empty() { + return None; + } + + let now = unix_nano_now(); + let data_points = aggregate(metrics) + .into_iter() + .map(|(attributes, count)| otlp::NumberDataPoint { + attributes: attributes + .into_iter() + .map(|(key, value)| string_key_value(key, value)) + .collect(), + start_time_unix_nano: now, + time_unix_nano: now, + value: Some(otlp::number_data_point::Value::AsInt(count)), + flags: 0, + }) + .collect::>(); + + if data_points.is_empty() { + return None; + } + + let request = otlp::ExportMetricsServiceRequest { + resource_metrics: vec![otlp::ResourceMetrics { + resource: Some(resource(context)), + scope_metrics: vec![otlp::ScopeMetrics { + scope: Some(InstrumentationScope { + name: METER_NAME.to_owned(), + version: String::new(), + attributes: vec![], + dropped_attributes_count: 0, + }), + metrics: vec![otlp::Metric { + name: METRIC_NAME.to_owned(), + description: METRIC_DESCRIPTION.to_owned(), + unit: METRIC_UNIT.to_owned(), + data: Some(otlp::metric::Data::Sum(otlp::Sum { + data_points, + aggregation_temporality: otlp::AggregationTemporality::Delta as i32, + is_monotonic: true, + })), + }], + schema_url: String::new(), + }], + schema_url: String::new(), + }], + }; + + Some(request.encode_to_vec()) +} + +fn aggregate(metrics: Vec) -> BTreeMap, i64> { + let mut counts = BTreeMap::new(); + for metric in metrics { + if metric.flag_key.is_empty() { + continue; + } + let attrs = metric_attributes(metric); + *counts.entry(attrs).or_insert(0) += 1; + } + counts +} + +fn metric_attributes(metric: FfeEvaluationMetric) -> BTreeMap { + let reason = normalize(&metric.reason, "unknown"); + let mut attrs = BTreeMap::from([ + (ATTR_FLAG_KEY.to_owned(), metric.flag_key), + (ATTR_VARIANT.to_owned(), metric.variant), + (ATTR_REASON.to_owned(), reason.clone()), + ]); + + if let Some(error_type) = metric.error_type { + if !error_type.is_empty() { + attrs.insert( + ATTR_ERROR_TYPE.to_owned(), + normalize(&error_type, "general"), + ); + } + } + + if let Some(allocation_key) = metric.allocation_key { + if !allocation_key.is_empty() + && !matches!(reason.as_str(), "error" | "default" | "disabled") + { + attrs.insert(ATTR_ALLOCATION_KEY.to_owned(), allocation_key); + } + } + + attrs +} + +fn normalize(value: &str, default: &str) -> String { + let value = value.trim(); + if value.is_empty() { + default.to_owned() + } else { + value.to_ascii_lowercase() + } +} + +fn resource(context: FfeTelemetryContext) -> Resource { + let mut attributes = vec![]; + if !context.service.is_empty() { + attributes.push(string_key_value( + ATTR_SERVICE_NAME.to_owned(), + context.service, + )); + } + if !context.env.is_empty() { + attributes.push(string_key_value(ATTR_ENV.to_owned(), context.env)); + } + if !context.version.is_empty() { + attributes.push(string_key_value(ATTR_VERSION.to_owned(), context.version)); + } + Resource { + attributes, + dropped_attributes_count: 0, + entity_refs: vec![], + } +} + +fn string_key_value(key: String, value: String) -> KeyValue { + KeyValue { + key, + value: Some(AnyValue { + value: Some(any_value::Value::StringValue(value)), + }), + key_ref: 0, + } +} + +fn unix_nano_now() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_nanos().min(u128::from(u64::MAX)) as u64) + .unwrap_or_default() +} + +mod otlp { + use libdd_trace_protobuf::opentelemetry::proto::common::v1::{InstrumentationScope, KeyValue}; + use libdd_trace_protobuf::opentelemetry::proto::resource::v1::Resource; + + #[derive(Clone, PartialEq, ::prost::Message)] + pub struct ExportMetricsServiceRequest { + #[prost(message, repeated, tag = "1")] + pub resource_metrics: ::prost::alloc::vec::Vec, + } + + #[derive(Clone, PartialEq, ::prost::Message)] + pub struct ResourceMetrics { + #[prost(message, optional, tag = "1")] + pub resource: ::core::option::Option, + #[prost(message, repeated, tag = "2")] + pub scope_metrics: ::prost::alloc::vec::Vec, + #[prost(string, tag = "3")] + pub schema_url: ::prost::alloc::string::String, + } + + #[derive(Clone, PartialEq, ::prost::Message)] + pub struct ScopeMetrics { + #[prost(message, optional, tag = "1")] + pub scope: ::core::option::Option, + #[prost(message, repeated, tag = "2")] + pub metrics: ::prost::alloc::vec::Vec, + #[prost(string, tag = "3")] + pub schema_url: ::prost::alloc::string::String, + } + + #[derive(Clone, PartialEq, ::prost::Message)] + pub struct Metric { + #[prost(string, tag = "1")] + pub name: ::prost::alloc::string::String, + #[prost(string, tag = "2")] + pub description: ::prost::alloc::string::String, + #[prost(string, tag = "3")] + pub unit: ::prost::alloc::string::String, + #[prost(oneof = "metric::Data", tags = "7")] + pub data: ::core::option::Option, + } + + pub mod metric { + #[derive(Clone, PartialEq, ::prost::Oneof)] + pub enum Data { + #[prost(message, tag = "7")] + Sum(super::Sum), + } + } + + #[derive(Clone, PartialEq, ::prost::Message)] + pub struct Sum { + #[prost(message, repeated, tag = "1")] + pub data_points: ::prost::alloc::vec::Vec, + #[prost(enumeration = "AggregationTemporality", tag = "2")] + pub aggregation_temporality: i32, + #[prost(bool, tag = "3")] + pub is_monotonic: bool, + } + + #[derive(Clone, PartialEq, ::prost::Message)] + pub struct NumberDataPoint { + #[prost(fixed64, tag = "2")] + pub start_time_unix_nano: u64, + #[prost(fixed64, tag = "3")] + pub time_unix_nano: u64, + #[prost(oneof = "number_data_point::Value", tags = "6")] + pub value: ::core::option::Option, + #[prost(message, repeated, tag = "7")] + pub attributes: ::prost::alloc::vec::Vec, + #[prost(uint32, tag = "8")] + pub flags: u32, + } + + pub mod number_data_point { + #[derive(Clone, PartialEq, ::prost::Oneof)] + pub enum Value { + #[prost(sfixed64, tag = "6")] + AsInt(i64), + } + } + + #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] + #[repr(i32)] + pub enum AggregationTemporality { + Unspecified = 0, + Delta = 1, + Cumulative = 2, + } +} + +#[cfg(test)] +mod tests { + use super::*; + use prost::Message; + + fn context() -> FfeTelemetryContext { + FfeTelemetryContext { + service: "svc".to_owned(), + env: "prod".to_owned(), + version: "1".to_owned(), + } + } + + fn metric(flag_key: &str, variant: &str, reason: &str) -> FfeEvaluationMetric { + FfeEvaluationMetric { + flag_key: flag_key.to_owned(), + variant: variant.to_owned(), + reason: reason.to_owned(), + error_type: None, + allocation_key: Some("alloc".to_owned()), + } + } + + #[test] + fn encodes_otlp_counter_and_aggregates_matching_attributes() { + let payload = encode_metrics_payload( + context(), + vec![ + metric("flag", "variant", "TARGETING_MATCH"), + metric("flag", "variant", "targeting_match"), + ], + ) + .unwrap(); + + let decoded = otlp::ExportMetricsServiceRequest::decode(payload.as_slice()).unwrap(); + let resource_metrics = &decoded.resource_metrics[0]; + let attrs = &resource_metrics.resource.as_ref().unwrap().attributes; + assert!(attrs.iter().any(|kv| kv.key == ATTR_SERVICE_NAME)); + + let data_points = match resource_metrics.scope_metrics[0].metrics[0] + .data + .as_ref() + .unwrap() + { + otlp::metric::Data::Sum(sum) => &sum.data_points, + }; + assert_eq!(data_points.len(), 1); + assert_eq!( + data_points[0].value, + Some(otlp::number_data_point::Value::AsInt(2)) + ); + } + + #[test] + fn excludes_allocation_key_for_error_default_and_disabled() { + for reason in ["ERROR", "DEFAULT", "DISABLED"] { + let attrs = metric_attributes(FfeEvaluationMetric { + flag_key: "flag".to_owned(), + variant: String::new(), + reason: reason.to_owned(), + error_type: Some("FLAG_NOT_FOUND".to_owned()), + allocation_key: Some("alloc".to_owned()), + }); + assert!(!attrs.contains_key(ATTR_ALLOCATION_KEY)); + assert_eq!(attrs[ATTR_ERROR_TYPE], "flag_not_found"); + } + } +} diff --git a/datadog-ffe/src/telemetry/mod.rs b/datadog-ffe/src/telemetry/mod.rs new file mode 100644 index 0000000000..d64e5d4eae --- /dev/null +++ b/datadog-ffe/src/telemetry/mod.rs @@ -0,0 +1,13 @@ +// Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +pub mod evaluation_metrics; + +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, Default, Deserialize, Eq, Hash, PartialEq, Serialize)] +pub struct FfeTelemetryContext { + pub service: String, + pub env: String, + pub version: String, +} diff --git a/datadog-sidecar/Cargo.toml b/datadog-sidecar/Cargo.toml index 345fb4db39..46954a3c91 100644 --- a/datadog-sidecar/Cargo.toml +++ b/datadog-sidecar/Cargo.toml @@ -29,6 +29,7 @@ libdd-trace-utils = { path = "../libdd-trace-utils" } libdd-trace-stats = { path = "../libdd-trace-stats", default-features=false, features = ["https"] } datadog-remote-config = { path = "../datadog-remote-config" , features = ["live-debugger"]} datadog-live-debugger = { path = "../datadog-live-debugger" } +datadog-ffe = { path = "../datadog-ffe", features = ["telemetry"] } libdd-crashtracker = { path = "../libdd-crashtracker" } libdd-dogstatsd-client = { path = "../libdd-dogstatsd-client" } libdd-tinybytes = { path = "../libdd-tinybytes" } @@ -43,12 +44,10 @@ datadog-ipc-macros = { path = "../datadog-ipc-macros" } rand = "0.8.3" rmp-serde = "1.3.0" -libdd-trace-protobuf = { path = "../libdd-trace-protobuf" } serde = { version = "1.0", features = ["derive", "rc"] } serde_with = "3.6.0" bincode = { version = "1.3.3" } serde_json = "1.0" -prost = "0.14.1" base64 = "0.22.1" spawn_worker = { path = "../spawn_worker" } zwohash = "0.1.2" diff --git a/datadog-sidecar/src/service/ffe_metrics_flusher.rs b/datadog-sidecar/src/service/ffe_metrics_flusher.rs index 8d5e1524cb..d71bc17ac6 100644 --- a/datadog-sidecar/src/service/ffe_metrics_flusher.rs +++ b/datadog-sidecar/src/service/ffe_metrics_flusher.rs @@ -5,35 +5,15 @@ //! events to a user-configured OTLP HTTP metrics intake. use crate::service::{FfeEvaluationMetric, FfeTelemetryContext}; +use datadog_ffe::telemetry::evaluation_metrics::encode_metrics_payload; use http::Method; use libdd_capabilities::{Bytes, HttpClientCapability, SleepCapability}; use libdd_common::Endpoint; -use libdd_trace_protobuf::opentelemetry::proto::common::v1::any_value; -use libdd_trace_protobuf::opentelemetry::proto::common::v1::{ - AnyValue, InstrumentationScope, KeyValue, -}; -use libdd_trace_protobuf::opentelemetry::proto::resource::v1::Resource; -use prost::Message; -use std::collections::BTreeMap; -use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use std::time::Duration; use tracing::{debug, warn}; const USER_AGENT: &str = concat!("ddtrace-sidecar/", env!("CARGO_PKG_VERSION")); -const METER_NAME: &str = "ddtrace.openfeature"; -const METRIC_NAME: &str = "feature_flag.evaluations"; -const METRIC_UNIT: &str = "{evaluation}"; -const METRIC_DESCRIPTION: &str = "Number of feature flag evaluations"; - -const ATTR_SERVICE_NAME: &str = "service.name"; -const ATTR_ENV: &str = "deployment.environment.name"; -const ATTR_VERSION: &str = "service.version"; -const ATTR_FLAG_KEY: &str = "feature_flag.key"; -const ATTR_VARIANT: &str = "feature_flag.result.variant"; -const ATTR_REASON: &str = "feature_flag.result.reason"; -const ATTR_ERROR_TYPE: &str = "error.type"; -const ATTR_ALLOCATION_KEY: &str = "feature_flag.result.allocation_key"; - /// Build an `Endpoint` for an OTLP metrics intake from a fully-qualified URL. /// /// Production callers supply the URL via the FFI (typically the value of @@ -49,62 +29,6 @@ pub(crate) fn otlp_metrics_endpoint(url: &str) -> Option { }) } -pub(crate) fn encode_metrics_payload( - context: FfeTelemetryContext, - metrics: Vec, -) -> Option> { - if metrics.is_empty() { - return None; - } - - let now = unix_nano_now(); - let data_points = aggregate(metrics) - .into_iter() - .map(|(attributes, count)| otlp::NumberDataPoint { - attributes: attributes - .into_iter() - .map(|(key, value)| string_key_value(key, value)) - .collect(), - start_time_unix_nano: now, - time_unix_nano: now, - value: Some(otlp::number_data_point::Value::AsInt(count)), - flags: 0, - }) - .collect::>(); - - if data_points.is_empty() { - return None; - } - - let request = otlp::ExportMetricsServiceRequest { - resource_metrics: vec![otlp::ResourceMetrics { - resource: Some(resource(context)), - scope_metrics: vec![otlp::ScopeMetrics { - scope: Some(InstrumentationScope { - name: METER_NAME.to_owned(), - version: String::new(), - attributes: vec![], - dropped_attributes_count: 0, - }), - metrics: vec![otlp::Metric { - name: METRIC_NAME.to_owned(), - description: METRIC_DESCRIPTION.to_owned(), - unit: METRIC_UNIT.to_owned(), - data: Some(otlp::metric::Data::Sum(otlp::Sum { - data_points, - aggregation_temporality: otlp::AggregationTemporality::Delta as i32, - is_monotonic: true, - })), - }], - schema_url: String::new(), - }], - schema_url: String::new(), - }], - }; - - Some(request.encode_to_vec()) -} - /// POST structured FFE metric events as OTLP/protobuf to the configured intake. /// Fire-and-forget: non-2xx responses and network errors are logged and /// dropped (matches dd-trace-go/py OTLP exporter behavior). @@ -171,189 +95,11 @@ async fn send_payload( } } -fn aggregate(metrics: Vec) -> BTreeMap, i64> { - let mut counts = BTreeMap::new(); - for metric in metrics { - if metric.flag_key.is_empty() { - continue; - } - let attrs = metric_attributes(metric); - *counts.entry(attrs).or_insert(0) += 1; - } - counts -} - -fn metric_attributes(metric: FfeEvaluationMetric) -> BTreeMap { - let reason = normalize(&metric.reason, "unknown"); - let mut attrs = BTreeMap::from([ - (ATTR_FLAG_KEY.to_owned(), metric.flag_key), - (ATTR_VARIANT.to_owned(), metric.variant), - (ATTR_REASON.to_owned(), reason.clone()), - ]); - - if let Some(error_type) = metric.error_type { - if !error_type.is_empty() { - attrs.insert( - ATTR_ERROR_TYPE.to_owned(), - normalize(&error_type, "general"), - ); - } - } - - if let Some(allocation_key) = metric.allocation_key { - if !allocation_key.is_empty() - && !matches!(reason.as_str(), "error" | "default" | "disabled") - { - attrs.insert(ATTR_ALLOCATION_KEY.to_owned(), allocation_key); - } - } - - attrs -} - -fn normalize(value: &str, default: &str) -> String { - let value = value.trim(); - if value.is_empty() { - default.to_owned() - } else { - value.to_ascii_lowercase() - } -} - -fn resource(context: FfeTelemetryContext) -> Resource { - let mut attributes = vec![]; - if !context.service.is_empty() { - attributes.push(string_key_value( - ATTR_SERVICE_NAME.to_owned(), - context.service, - )); - } - if !context.env.is_empty() { - attributes.push(string_key_value(ATTR_ENV.to_owned(), context.env)); - } - if !context.version.is_empty() { - attributes.push(string_key_value(ATTR_VERSION.to_owned(), context.version)); - } - Resource { - attributes, - dropped_attributes_count: 0, - entity_refs: vec![], - } -} - -fn string_key_value(key: String, value: String) -> KeyValue { - KeyValue { - key, - value: Some(AnyValue { - value: Some(any_value::Value::StringValue(value)), - }), - key_ref: 0, - } -} - -fn unix_nano_now() -> u64 { - SystemTime::now() - .duration_since(UNIX_EPOCH) - .map(|d| d.as_nanos().min(u128::from(u64::MAX)) as u64) - .unwrap_or_default() -} - fn truncate(bytes: &[u8], cap: usize) -> String { let take = bytes.len().min(cap); String::from_utf8_lossy(&bytes[..take]).into_owned() } -mod otlp { - use libdd_trace_protobuf::opentelemetry::proto::common::v1::{InstrumentationScope, KeyValue}; - use libdd_trace_protobuf::opentelemetry::proto::resource::v1::Resource; - - #[derive(Clone, PartialEq, ::prost::Message)] - pub struct ExportMetricsServiceRequest { - #[prost(message, repeated, tag = "1")] - pub resource_metrics: ::prost::alloc::vec::Vec, - } - - #[derive(Clone, PartialEq, ::prost::Message)] - pub struct ResourceMetrics { - #[prost(message, optional, tag = "1")] - pub resource: ::core::option::Option, - #[prost(message, repeated, tag = "2")] - pub scope_metrics: ::prost::alloc::vec::Vec, - #[prost(string, tag = "3")] - pub schema_url: ::prost::alloc::string::String, - } - - #[derive(Clone, PartialEq, ::prost::Message)] - pub struct ScopeMetrics { - #[prost(message, optional, tag = "1")] - pub scope: ::core::option::Option, - #[prost(message, repeated, tag = "2")] - pub metrics: ::prost::alloc::vec::Vec, - #[prost(string, tag = "3")] - pub schema_url: ::prost::alloc::string::String, - } - - #[derive(Clone, PartialEq, ::prost::Message)] - pub struct Metric { - #[prost(string, tag = "1")] - pub name: ::prost::alloc::string::String, - #[prost(string, tag = "2")] - pub description: ::prost::alloc::string::String, - #[prost(string, tag = "3")] - pub unit: ::prost::alloc::string::String, - #[prost(oneof = "metric::Data", tags = "7")] - pub data: ::core::option::Option, - } - - pub mod metric { - #[derive(Clone, PartialEq, ::prost::Oneof)] - pub enum Data { - #[prost(message, tag = "7")] - Sum(super::Sum), - } - } - - #[derive(Clone, PartialEq, ::prost::Message)] - pub struct Sum { - #[prost(message, repeated, tag = "1")] - pub data_points: ::prost::alloc::vec::Vec, - #[prost(enumeration = "AggregationTemporality", tag = "2")] - pub aggregation_temporality: i32, - #[prost(bool, tag = "3")] - pub is_monotonic: bool, - } - - #[derive(Clone, PartialEq, ::prost::Message)] - pub struct NumberDataPoint { - #[prost(fixed64, tag = "2")] - pub start_time_unix_nano: u64, - #[prost(fixed64, tag = "3")] - pub time_unix_nano: u64, - #[prost(oneof = "number_data_point::Value", tags = "6")] - pub value: ::core::option::Option, - #[prost(message, repeated, tag = "7")] - pub attributes: ::prost::alloc::vec::Vec, - #[prost(uint32, tag = "8")] - pub flags: u32, - } - - pub mod number_data_point { - #[derive(Clone, PartialEq, ::prost::Oneof)] - pub enum Value { - #[prost(sfixed64, tag = "6")] - AsInt(i64), - } - } - - #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] - #[repr(i32)] - pub enum AggregationTemporality { - Unspecified = 0, - Delta = 1, - Cumulative = 2, - } -} - #[cfg(test)] mod tests { use super::*; @@ -380,51 +126,6 @@ mod tests { } } - #[test] - fn encodes_otlp_counter_and_aggregates_matching_attributes() { - let payload = encode_metrics_payload( - context(), - vec![ - metric("flag", "variant", "TARGETING_MATCH"), - metric("flag", "variant", "targeting_match"), - ], - ) - .unwrap(); - - let decoded = otlp::ExportMetricsServiceRequest::decode(payload.as_slice()).unwrap(); - let resource_metrics = &decoded.resource_metrics[0]; - let attrs = &resource_metrics.resource.as_ref().unwrap().attributes; - assert!(attrs.iter().any(|kv| kv.key == ATTR_SERVICE_NAME)); - - let data_points = match resource_metrics.scope_metrics[0].metrics[0] - .data - .as_ref() - .unwrap() - { - otlp::metric::Data::Sum(sum) => &sum.data_points, - }; - assert_eq!(data_points.len(), 1); - assert_eq!( - data_points[0].value, - Some(otlp::number_data_point::Value::AsInt(2)) - ); - } - - #[test] - fn excludes_allocation_key_for_error_default_and_disabled() { - for reason in ["ERROR", "DEFAULT", "DISABLED"] { - let attrs = metric_attributes(FfeEvaluationMetric { - flag_key: "flag".to_owned(), - variant: String::new(), - reason: reason.to_owned(), - error_type: Some("FLAG_NOT_FOUND".to_owned()), - allocation_key: Some("alloc".to_owned()), - }); - assert!(!attrs.contains_key(ATTR_ALLOCATION_KEY)); - assert_eq!(attrs[ATTR_ERROR_TYPE], "flag_not_found"); - } - } - /// POST hits the configured OTLP metrics path with application/x-protobuf. #[tokio::test] #[cfg_attr(miri, ignore)] diff --git a/datadog-sidecar/src/service/mod.rs b/datadog-sidecar/src/service/mod.rs index a5beff9f3a..e88a190ef1 100644 --- a/datadog-sidecar/src/service/mod.rs +++ b/datadog-sidecar/src/service/mod.rs @@ -3,6 +3,8 @@ // imports for structs defined in this file use crate::config; +pub use datadog_ffe::telemetry::evaluation_metrics::FfeEvaluationMetric; +pub use datadog_ffe::telemetry::FfeTelemetryContext; use datadog_remote_config::{RemoteConfigCapabilities, RemoteConfigProduct}; use libdd_common::tag::Tag; use libdd_common::Endpoint; @@ -92,19 +94,3 @@ pub enum SidecarAction { metrics: Vec, }, } - -#[derive(Clone, Debug, Default, Deserialize, Eq, Hash, PartialEq, Serialize)] -pub struct FfeTelemetryContext { - pub service: String, - pub env: String, - pub version: String, -} - -#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] -pub struct FfeEvaluationMetric { - pub flag_key: String, - pub variant: String, - pub reason: String, - pub error_type: Option, - pub allocation_key: Option, -} From 96d9a7bae105449f16da96554b3a2e989b096027 Mon Sep 17 00:00:00 2001 From: Leo Romanovsky Date: Thu, 28 May 2026 17:11:46 -0400 Subject: [PATCH 3/3] Rename FFE metric feature gate --- datadog-ffe/Cargo.toml | 2 +- datadog-ffe/src/lib.rs | 2 +- datadog-sidecar/Cargo.toml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/datadog-ffe/Cargo.toml b/datadog-ffe/Cargo.toml index 074c741969..743cc59915 100644 --- a/datadog-ffe/Cargo.toml +++ b/datadog-ffe/Cargo.toml @@ -29,5 +29,5 @@ prost = { version = "0.14.1", optional = true } pyo3 = { version = "0.28", optional = true, default-features = false, features = ["macros"] } [features] -telemetry = ["dep:libdd-trace-protobuf", "dep:prost"] +evaluation-metrics = ["dep:libdd-trace-protobuf", "dep:prost"] pyo3 = ["dep:pyo3"] diff --git a/datadog-ffe/src/lib.rs b/datadog-ffe/src/lib.rs index 3cd763d982..8cc4a062df 100644 --- a/datadog-ffe/src/lib.rs +++ b/datadog-ffe/src/lib.rs @@ -4,7 +4,7 @@ mod flag_type; pub mod rules_based; -#[cfg(feature = "telemetry")] +#[cfg(feature = "evaluation-metrics")] pub mod telemetry; pub use flag_type::{ExpectedFlagType, FlagType}; diff --git a/datadog-sidecar/Cargo.toml b/datadog-sidecar/Cargo.toml index 46954a3c91..48e83cc312 100644 --- a/datadog-sidecar/Cargo.toml +++ b/datadog-sidecar/Cargo.toml @@ -29,7 +29,7 @@ libdd-trace-utils = { path = "../libdd-trace-utils" } libdd-trace-stats = { path = "../libdd-trace-stats", default-features=false, features = ["https"] } datadog-remote-config = { path = "../datadog-remote-config" , features = ["live-debugger"]} datadog-live-debugger = { path = "../datadog-live-debugger" } -datadog-ffe = { path = "../datadog-ffe", features = ["telemetry"] } +datadog-ffe = { path = "../datadog-ffe", features = ["evaluation-metrics"] } libdd-crashtracker = { path = "../libdd-crashtracker" } libdd-dogstatsd-client = { path = "../libdd-dogstatsd-client" } libdd-tinybytes = { path = "../libdd-tinybytes" }