From 40bd7a03cb55f49751ad6a8ffe245fac828151c2 Mon Sep 17 00:00:00 2001 From: Bryan Bednarski Date: Fri, 29 May 2026 14:48:20 -0700 Subject: [PATCH] fix: repair Hermes gateway session fallback (#189) #### Overview Fixes the Hermes gateway session fallback and tightens ATIF LLM dedupe so complementary hook/gateway spans are only collapsed when they represent the same physical request. - [x] I confirm this contribution is my own work, or I have the right to submit it under this project's license. - [x] I searched existing issues and open pull requests, and this does not duplicate existing work. #### Details - Uses the OpenAI-compatible request body session_id as a gateway fallback when explicit session headers are absent. - Keeps the existing explicit Claude/Codex session fallbacks ahead of the OpenAI body fallback. - Requires complementary hook/gateway LLM spans to share a request signature or strong request correlation key before ATIF dedupes them. - Adds regression coverage for gateway fallback selection and concurrent overlapping LLM spans that should remain distinct. #### Where should the reviewer start? Start with `crates/cli/src/alignment/mod.rs` for the gateway fallback behavior, then review `crates/core/src/observability/atif.rs` for the strengthened complementary hook/gateway dedupe guard. The focused regression tests are in `crates/cli/tests/coverage/alignment_tests.rs`, `crates/cli/tests/coverage/gateway_tests.rs`, and `crates/core/tests/unit/atif_tests.rs`. #### Related Issues: (use one of the action keywords Closes / Fixes / Resolves / Relates to) - Closes #176 ## Summary by CodeRabbit * **Bug Fixes** * Session ID resolution enhanced to properly support OpenAI-compatible API request formats, including additional fallback to request body identifiers * LLM span correlation and deduplication logic improved with request-level identifier matching, enabling more accurate observability tracking and better event correlation for request tracing [![Review Change Stack](https://storage.googleapis.com/coderabbit_public_assets/review-stack-in-coderabbit-ui.svg)](https://app.coderabbit.ai/change-stack/NVIDIA/NeMo-Relay/pull/189?utm_source=github_walkthrough&utm_medium=github&utm_campaign=change_stack) Authors: - Bryan Bednarski (https://github.com/bbednarski9) Approvers: - Will Killian (https://github.com/willkill07) URL: https://github.com/NVIDIA/NeMo-Relay/pull/189 --- crates/cli/src/alignment/mod.rs | 22 +++- crates/cli/tests/coverage/alignment_tests.rs | 42 ++++++- crates/cli/tests/coverage/gateway_tests.rs | 21 +++- crates/core/src/observability/atif.rs | 73 +++++++++++- crates/core/tests/unit/atif_tests.rs | 112 ++++++++++++++++++- 5 files changed, 261 insertions(+), 9 deletions(-) diff --git a/crates/cli/src/alignment/mod.rs b/crates/cli/src/alignment/mod.rs index 3c582525..48e5f2f1 100644 --- a/crates/cli/src/alignment/mod.rs +++ b/crates/cli/src/alignment/mod.rs @@ -245,9 +245,10 @@ impl SessionAlignmentState { } // Resolves the session id for a gateway request in precedence order: -// explicit NeMo Relay header, agent-native headers, then agent-specific body fallbacks. Keeping the -// provider fallbacks behind one function makes a new agent integration add one small alignment -// adapter instead of threading bespoke checks through gateway request construction. +// explicit NeMo Relay header, agent-native headers, agent-specific body fallbacks, then the +// generic OpenAI-compatible `session_id` body field. Keeping the provider fallbacks behind one +// function makes a new agent integration add one small alignment adapter instead of threading +// bespoke checks through gateway request construction. pub(crate) fn gateway_session_id( headers: &HeaderMap, body: &Value, @@ -256,6 +257,21 @@ pub(crate) fn gateway_session_id( header_string(headers, "x-nemo-relay-session-id") .or_else(|| claude_code::session_id_from_headers(headers)) .or_else(|| codex::prompt_cache_session_id(body, route)) + .or_else(|| openai_body_session_id(body, route)) +} + +fn openai_body_session_id(body: &Value, route: GatewayRouteKind) -> Option { + if !matches!( + route, + GatewayRouteKind::OpenAiChatCompletions | GatewayRouteKind::OpenAiResponses + ) { + return None; + } + body.get("session_id") + .and_then(Value::as_str) + .map(str::trim) + .filter(|session_id| !session_id.is_empty()) + .map(ToOwned::to_owned) } // Gives provider adapters a chance to select an agent-native upstream before the gateway falls diff --git a/crates/cli/tests/coverage/alignment_tests.rs b/crates/cli/tests/coverage/alignment_tests.rs index c8077f50..8806e0dd 100644 --- a/crates/cli/tests/coverage/alignment_tests.rs +++ b/crates/cli/tests/coverage/alignment_tests.rs @@ -92,7 +92,8 @@ fn gateway_session_id_uses_explicit_claude_then_codex_fallbacks() { let mut headers = HeaderMap::new(); let codex_body = json!({ "prompt_cache_key": "codex-thread", - "client_metadata": { "x-codex-installation-id": "install-1" } + "client_metadata": { "x-codex-installation-id": "install-1" }, + "session_id": "body-thread" }); assert_eq!( @@ -119,6 +120,45 @@ fn gateway_session_id_uses_explicit_claude_then_codex_fallbacks() { ); } +#[test] +fn gateway_session_id_accepts_openai_body_session_id_fallback() { + let headers = HeaderMap::new(); + + assert_eq!( + gateway_session_id( + &headers, + &json!({ "session_id": " body-session " }), + GatewayRouteKind::OpenAiChatCompletions, + ) + .as_deref(), + Some("body-session") + ); + assert_eq!( + gateway_session_id( + &headers, + &json!({ "session_id": "body-session" }), + GatewayRouteKind::AnthropicMessages, + ), + None + ); + assert_eq!( + gateway_session_id( + &headers, + &json!({ "session_id": "" }), + GatewayRouteKind::OpenAiChatCompletions, + ), + None + ); + assert_eq!( + gateway_session_id( + &headers, + &json!({ "session_id": 42 }), + GatewayRouteKind::OpenAiResponses, + ), + None + ); +} + #[test] fn gateway_subagent_and_identifier_helpers_respect_header_precedence() { let mut headers = HeaderMap::new(); diff --git a/crates/cli/tests/coverage/gateway_tests.rs b/crates/cli/tests/coverage/gateway_tests.rs index 6aa6742a..748b389d 100644 --- a/crates/cli/tests/coverage/gateway_tests.rs +++ b/crates/cli/tests/coverage/gateway_tests.rs @@ -265,7 +265,8 @@ fn gateway_session_id_prefers_headers_and_has_fallbacks() { let mut headers = HeaderMap::new(); let codex_body = json!({ "prompt_cache_key": "codex-session", - "client_metadata": { "x-codex-installation-id": "install-1" } + "client_metadata": { "x-codex-installation-id": "install-1" }, + "session_id": "body-session" }); headers.insert( "anthropic-beta", @@ -316,6 +317,24 @@ fn gateway_session_id_prefers_headers_and_has_fallbacks() { &HeaderMap::new(), &codex_body, ProviderRoute::OpenAiChatCompletions, + ) + .as_deref(), + Some("body-session") + ); + assert_eq!( + gateway_session_id( + &HeaderMap::new(), + &json!({ "session_id": " body-session " }), + ProviderRoute::OpenAiResponses, + ) + .as_deref(), + Some("body-session") + ); + assert_eq!( + gateway_session_id( + &HeaderMap::new(), + &json!({ "session_id": "body-session" }), + ProviderRoute::AnthropicMessages, ), None ); diff --git a/crates/core/src/observability/atif.rs b/crates/core/src/observability/atif.rs index f5801551..96674db1 100644 --- a/crates/core/src/observability/atif.rs +++ b/crates/core/src/observability/atif.rs @@ -1194,6 +1194,7 @@ struct LlmSpanCandidate { start_ts: DateTime, end_ts: DateTime, request_signature: String, + request_correlation_keys: HashSet, response_signature: String, model_name: Option, fidelity_score: u8, @@ -1248,6 +1249,7 @@ impl LlmSpanCandidate { start_ts: *start.timestamp(), end_ts: *end.timestamp(), request_signature, + request_correlation_keys: llm_request_correlation_keys(start, end), response_signature, model_name: start .model_name() @@ -1274,6 +1276,61 @@ fn llm_response_signature(output: &Json) -> String { json_to_string(&extract_llm_response_message(output)) } +fn llm_request_correlation_keys(start: &Event, end: &Event) -> HashSet { + let mut keys = HashSet::new(); + collect_llm_request_correlation_keys(start, &mut keys); + collect_llm_request_correlation_keys(end, &mut keys); + keys +} + +fn collect_llm_request_correlation_keys(event: &Event, keys: &mut HashSet) { + if let Some(metadata) = event.metadata() { + collect_request_correlation_values(metadata, keys); + } + if let Some(data) = event.data() { + collect_request_correlation_values(data, keys); + collect_request_correlation_values(&unwrap_llm_request(data), keys); + } +} + +fn collect_request_correlation_values(value: &Json, keys: &mut HashSet) { + for path in [ + &["api_call_id"][..], + &["apiCallId"], + &["request_id"], + &["requestId"], + &["request", "id"], + &["metadata", "request_id"], + &["metadata", "requestId"], + &["extra", "api_call_id"], + &["extra", "apiCallId"], + &["extra", "request_id"], + &["extra", "requestId"], + &["llm_correlation_request_id"], + ] { + insert_correlation_key(keys, "request", json_string_at(value, path)); + } + + for path in [ + &["generation_id"][..], + &["generationId"], + &["generation", "id"], + &["metadata", "generation_id"], + &["metadata", "generationId"], + &["extra", "generation_id"], + &["extra", "generationId"], + &["llm_correlation_generation_id"], + ] { + insert_correlation_key(keys, "generation", json_string_at(value, path)); + } +} + +fn insert_correlation_key(keys: &mut HashSet, kind: &str, value: Option) { + if let Some(value) = value.filter(|value| !value.is_empty()) { + keys.insert(format!("{kind}:{value}")); + } +} + fn same_physical_llm_request(left: &LlmSpanCandidate, right: &LlmSpanCandidate) -> bool { same_parent(left, right) && compatible_model_names(left, right) @@ -1288,10 +1345,22 @@ fn same_llm_payload_signatures(left: &LlmSpanCandidate, right: &LlmSpanCandidate } fn complementary_hook_and_gateway_spans(left: &LlmSpanCandidate, right: &LlmSpanCandidate) -> bool { - (left.non_exact_provider_payload && left.hook_instrumentation && right.gateway_instrumentation) + let complementary_polarity = (left.non_exact_provider_payload + && left.hook_instrumentation + && right.gateway_instrumentation) || (right.non_exact_provider_payload && right.hook_instrumentation - && left.gateway_instrumentation) + && left.gateway_instrumentation); + + complementary_polarity + && (left.request_signature == right.request_signature + || shared_llm_request_correlation_key(left, right)) +} + +fn shared_llm_request_correlation_key(left: &LlmSpanCandidate, right: &LlmSpanCandidate) -> bool { + !left + .request_correlation_keys + .is_disjoint(&right.request_correlation_keys) } fn same_parent(left: &LlmSpanCandidate, right: &LlmSpanCandidate) -> bool { diff --git a/crates/core/tests/unit/atif_tests.rs b/crates/core/tests/unit/atif_tests.rs index c42d4333..a59048e6 100644 --- a/crates/core/tests/unit/atif_tests.rs +++ b/crates/core/tests/unit/atif_tests.rs @@ -1935,6 +1935,7 @@ fn test_exporter_prefers_gateway_span_over_non_exact_hook_summary() { .model_name("test-model") .metadata(json!({ "hook_event_name": "pre_api_request", + "api_call_id": "request-1", "fidelity_source": "agent_api_hooks", "provider_payload_exact": false })) @@ -1948,7 +1949,10 @@ fn test_exporter_prefers_gateway_span_over_non_exact_hook_summary() { .parent_uuid(parent_uuid) .scope_type(ScopeType::Llm) .model_name("test-model") - .metadata(json!({"gateway_path": "/v1/chat/completions"})) + .metadata(json!({ + "gateway_path": "/v1/chat/completions", + "llm_correlation_request_id": "request-1" + })) .input(json!({"content": request.clone(), "headers": {}})) .build(); let mut gateway_end = event_builder(gateway_uuid, EventType::End) @@ -1956,7 +1960,10 @@ fn test_exporter_prefers_gateway_span_over_non_exact_hook_summary() { .parent_uuid(parent_uuid) .scope_type(ScopeType::Llm) .model_name("test-model") - .metadata(json!({"gateway_path": "/v1/chat/completions"})) + .metadata(json!({ + "gateway_path": "/v1/chat/completions", + "llm_correlation_request_id": "request-1" + })) .output(json!({"choices": [{"message": {"content": "gateway_ok"}}]})) .build(); let mut hook_end = event_builder(hook_uuid, EventType::End) @@ -1966,6 +1973,7 @@ fn test_exporter_prefers_gateway_span_over_non_exact_hook_summary() { .model_name("test-model") .metadata(json!({ "hook_event_name": "post_api_request", + "api_call_id": "request-1", "fidelity_source": "agent_api_hooks", "provider_payload_exact": false })) @@ -2005,6 +2013,106 @@ fn test_exporter_prefers_gateway_span_over_non_exact_hook_summary() { })); } +#[test] +fn test_exporter_keeps_overlapping_non_exact_hook_and_gateway_spans_without_shared_request_key() { + let exporter = AtifExporter::new("session-1".to_string(), make_agent_info()); + let base = base_timestamp(); + let parent_uuid = Uuid::now_v7(); + let hook_uuid = Uuid::now_v7(); + let gateway_uuid = Uuid::now_v7(); + let request = json!({ + "messages": [{"role": "user", "content": "Reply with exactly gateway_distinct_ok"}], + "model": "test-model" + }); + + let mut hook_start = event_builder(hook_uuid, EventType::Start) + .name("openrouter") + .parent_uuid(parent_uuid) + .scope_type(ScopeType::Llm) + .model_name("test-model") + .metadata(json!({ + "hook_event_name": "pre_api_request", + "api_call_id": "hook-request", + "fidelity_source": "agent_api_hooks", + "provider_payload_exact": false + })) + .input(json!({ + "content": {"message_count": 2, "request_char_count": 128}, + "headers": {} + })) + .build(); + let mut gateway_start = event_builder(gateway_uuid, EventType::Start) + .name("openai.chat_completions") + .parent_uuid(parent_uuid) + .scope_type(ScopeType::Llm) + .model_name("test-model") + .metadata(json!({ + "gateway_path": "/v1/chat/completions", + "llm_correlation_request_id": "gateway-request" + })) + .input(json!({"content": request.clone(), "headers": {}})) + .build(); + let mut gateway_end = event_builder(gateway_uuid, EventType::End) + .name("openai.chat_completions") + .parent_uuid(parent_uuid) + .scope_type(ScopeType::Llm) + .model_name("test-model") + .metadata(json!({ + "gateway_path": "/v1/chat/completions", + "llm_correlation_request_id": "gateway-request" + })) + .output(json!({"choices": [{"message": {"content": "gateway_distinct_ok"}}]})) + .build(); + let mut hook_end = event_builder(hook_uuid, EventType::End) + .name("openrouter") + .parent_uuid(parent_uuid) + .scope_type(ScopeType::Llm) + .model_name("test-model") + .metadata(json!({ + "hook_event_name": "post_api_request", + "api_call_id": "hook-request", + "fidelity_source": "agent_api_hooks", + "provider_payload_exact": false + })) + .output(json!({"assistant_content_chars": 10, "finish_reason": "stop"})) + .build(); + + for (idx, event) in [ + &mut hook_start, + &mut gateway_start, + &mut gateway_end, + &mut hook_end, + ] + .into_iter() + .enumerate() + { + set_event_timestamp(event, base + chrono::Duration::milliseconds(idx as i64)); + } + + { + let mut state = exporter.state.lock().unwrap(); + state + .events + .extend([hook_start, gateway_start, gateway_end, hook_end]); + } + + let trajectory = exporter.export().unwrap(); + assert_eq!(trajectory.steps.len(), 4); + let function_name_count = |name: &str| { + trajectory + .steps + .iter() + .filter(|step| { + step.extra + .as_ref() + .is_some_and(|extra| extra["ancestry"]["function_name"] == name) + }) + .count() + }; + assert!(function_name_count("openrouter") > 0); + assert!(function_name_count("openai.chat_completions") > 0); +} + #[test] fn test_exporter_keeps_sequential_same_content_llm_spans() { let exporter = AtifExporter::new("session-1".to_string(), make_agent_info());