Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions crates/cli/src/alignment/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,9 +245,10 @@ impl SessionAlignmentState {
}

// Resolves the session id for a gateway request in precedence order:
// explicit NeMo Relay header, agent-native headers, then agent-specific body fallbacks. Keeping the
// provider fallbacks behind one function makes a new agent integration add one small alignment
// adapter instead of threading bespoke checks through gateway request construction.
// explicit NeMo Relay header, agent-native headers, agent-specific body fallbacks, then the
// generic OpenAI-compatible `session_id` body field. Keeping the provider fallbacks behind one
// function makes a new agent integration add one small alignment adapter instead of threading
// bespoke checks through gateway request construction.
pub(crate) fn gateway_session_id(
headers: &HeaderMap,
body: &Value,
Expand All @@ -256,6 +257,21 @@ pub(crate) fn gateway_session_id(
header_string(headers, "x-nemo-relay-session-id")
.or_else(|| claude_code::session_id_from_headers(headers))
.or_else(|| codex::prompt_cache_session_id(body, route))
.or_else(|| openai_body_session_id(body, route))
}

fn openai_body_session_id(body: &Value, route: GatewayRouteKind) -> Option<String> {
if !matches!(
route,
GatewayRouteKind::OpenAiChatCompletions | GatewayRouteKind::OpenAiResponses
) {
return None;
}
body.get("session_id")
.and_then(Value::as_str)
.map(str::trim)
.filter(|session_id| !session_id.is_empty())
.map(ToOwned::to_owned)
}

// Gives provider adapters a chance to select an agent-native upstream before the gateway falls
Expand Down
42 changes: 41 additions & 1 deletion crates/cli/tests/coverage/alignment_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ fn gateway_session_id_uses_explicit_claude_then_codex_fallbacks() {
let mut headers = HeaderMap::new();
let codex_body = json!({
"prompt_cache_key": "codex-thread",
"client_metadata": { "x-codex-installation-id": "install-1" }
"client_metadata": { "x-codex-installation-id": "install-1" },
"session_id": "body-thread"
});

assert_eq!(
Expand All @@ -119,6 +120,45 @@ fn gateway_session_id_uses_explicit_claude_then_codex_fallbacks() {
);
}

#[test]
fn gateway_session_id_accepts_openai_body_session_id_fallback() {
let headers = HeaderMap::new();

assert_eq!(
gateway_session_id(
&headers,
&json!({ "session_id": " body-session " }),
GatewayRouteKind::OpenAiChatCompletions,
)
.as_deref(),
Some("body-session")
);
assert_eq!(
gateway_session_id(
&headers,
&json!({ "session_id": "body-session" }),
GatewayRouteKind::AnthropicMessages,
),
None
);
assert_eq!(
gateway_session_id(
&headers,
&json!({ "session_id": "" }),
GatewayRouteKind::OpenAiChatCompletions,
),
None
);
assert_eq!(
gateway_session_id(
&headers,
&json!({ "session_id": 42 }),
GatewayRouteKind::OpenAiResponses,
),
None
);
}

#[test]
fn gateway_subagent_and_identifier_helpers_respect_header_precedence() {
let mut headers = HeaderMap::new();
Expand Down
21 changes: 20 additions & 1 deletion crates/cli/tests/coverage/gateway_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,8 @@ fn gateway_session_id_prefers_headers_and_has_fallbacks() {
let mut headers = HeaderMap::new();
let codex_body = json!({
"prompt_cache_key": "codex-session",
"client_metadata": { "x-codex-installation-id": "install-1" }
"client_metadata": { "x-codex-installation-id": "install-1" },
"session_id": "body-session"
});
headers.insert(
"anthropic-beta",
Expand Down Expand Up @@ -316,6 +317,24 @@ fn gateway_session_id_prefers_headers_and_has_fallbacks() {
&HeaderMap::new(),
&codex_body,
ProviderRoute::OpenAiChatCompletions,
)
.as_deref(),
Some("body-session")
);
assert_eq!(
gateway_session_id(
&HeaderMap::new(),
&json!({ "session_id": " body-session " }),
ProviderRoute::OpenAiResponses,
)
.as_deref(),
Some("body-session")
);
assert_eq!(
gateway_session_id(
&HeaderMap::new(),
&json!({ "session_id": "body-session" }),
ProviderRoute::AnthropicMessages,
),
None
);
Expand Down
73 changes: 71 additions & 2 deletions crates/core/src/observability/atif.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1194,6 +1194,7 @@ struct LlmSpanCandidate {
start_ts: DateTime<Utc>,
end_ts: DateTime<Utc>,
request_signature: String,
request_correlation_keys: HashSet<String>,
response_signature: String,
model_name: Option<String>,
fidelity_score: u8,
Expand Down Expand Up @@ -1248,6 +1249,7 @@ impl LlmSpanCandidate {
start_ts: *start.timestamp(),
end_ts: *end.timestamp(),
request_signature,
request_correlation_keys: llm_request_correlation_keys(start, end),
response_signature,
model_name: start
.model_name()
Expand All @@ -1274,6 +1276,61 @@ fn llm_response_signature(output: &Json) -> String {
json_to_string(&extract_llm_response_message(output))
}

fn llm_request_correlation_keys(start: &Event, end: &Event) -> HashSet<String> {
let mut keys = HashSet::new();
collect_llm_request_correlation_keys(start, &mut keys);
collect_llm_request_correlation_keys(end, &mut keys);
keys
}

fn collect_llm_request_correlation_keys(event: &Event, keys: &mut HashSet<String>) {
if let Some(metadata) = event.metadata() {
collect_request_correlation_values(metadata, keys);
}
if let Some(data) = event.data() {
collect_request_correlation_values(data, keys);
collect_request_correlation_values(&unwrap_llm_request(data), keys);
}
}

fn collect_request_correlation_values(value: &Json, keys: &mut HashSet<String>) {
for path in [
&["api_call_id"][..],
&["apiCallId"],
&["request_id"],
&["requestId"],
&["request", "id"],
&["metadata", "request_id"],
&["metadata", "requestId"],
&["extra", "api_call_id"],
&["extra", "apiCallId"],
&["extra", "request_id"],
&["extra", "requestId"],
&["llm_correlation_request_id"],
] {
insert_correlation_key(keys, "request", json_string_at(value, path));
}

for path in [
&["generation_id"][..],
&["generationId"],
&["generation", "id"],
&["metadata", "generation_id"],
&["metadata", "generationId"],
&["extra", "generation_id"],
&["extra", "generationId"],
&["llm_correlation_generation_id"],
] {
insert_correlation_key(keys, "generation", json_string_at(value, path));
}
}

fn insert_correlation_key(keys: &mut HashSet<String>, kind: &str, value: Option<String>) {
if let Some(value) = value.filter(|value| !value.is_empty()) {
keys.insert(format!("{kind}:{value}"));
}
}

fn same_physical_llm_request(left: &LlmSpanCandidate, right: &LlmSpanCandidate) -> bool {
same_parent(left, right)
&& compatible_model_names(left, right)
Expand All @@ -1288,10 +1345,22 @@ fn same_llm_payload_signatures(left: &LlmSpanCandidate, right: &LlmSpanCandidate
}

fn complementary_hook_and_gateway_spans(left: &LlmSpanCandidate, right: &LlmSpanCandidate) -> bool {
(left.non_exact_provider_payload && left.hook_instrumentation && right.gateway_instrumentation)
let complementary_polarity = (left.non_exact_provider_payload
&& left.hook_instrumentation
&& right.gateway_instrumentation)
|| (right.non_exact_provider_payload
&& right.hook_instrumentation
&& left.gateway_instrumentation)
&& left.gateway_instrumentation);

complementary_polarity
&& (left.request_signature == right.request_signature
|| shared_llm_request_correlation_key(left, right))
}

fn shared_llm_request_correlation_key(left: &LlmSpanCandidate, right: &LlmSpanCandidate) -> bool {
!left
.request_correlation_keys
.is_disjoint(&right.request_correlation_keys)
}

fn same_parent(left: &LlmSpanCandidate, right: &LlmSpanCandidate) -> bool {
Expand Down
112 changes: 110 additions & 2 deletions crates/core/tests/unit/atif_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1935,6 +1935,7 @@ fn test_exporter_prefers_gateway_span_over_non_exact_hook_summary() {
.model_name("test-model")
.metadata(json!({
"hook_event_name": "pre_api_request",
"api_call_id": "request-1",
"fidelity_source": "agent_api_hooks",
"provider_payload_exact": false
}))
Expand All @@ -1948,15 +1949,21 @@ fn test_exporter_prefers_gateway_span_over_non_exact_hook_summary() {
.parent_uuid(parent_uuid)
.scope_type(ScopeType::Llm)
.model_name("test-model")
.metadata(json!({"gateway_path": "/v1/chat/completions"}))
.metadata(json!({
"gateway_path": "/v1/chat/completions",
"llm_correlation_request_id": "request-1"
}))
.input(json!({"content": request.clone(), "headers": {}}))
.build();
let mut gateway_end = event_builder(gateway_uuid, EventType::End)
.name("openai.chat_completions")
.parent_uuid(parent_uuid)
.scope_type(ScopeType::Llm)
.model_name("test-model")
.metadata(json!({"gateway_path": "/v1/chat/completions"}))
.metadata(json!({
"gateway_path": "/v1/chat/completions",
"llm_correlation_request_id": "request-1"
}))
.output(json!({"choices": [{"message": {"content": "gateway_ok"}}]}))
.build();
let mut hook_end = event_builder(hook_uuid, EventType::End)
Expand All @@ -1966,6 +1973,7 @@ fn test_exporter_prefers_gateway_span_over_non_exact_hook_summary() {
.model_name("test-model")
.metadata(json!({
"hook_event_name": "post_api_request",
"api_call_id": "request-1",
"fidelity_source": "agent_api_hooks",
"provider_payload_exact": false
}))
Expand Down Expand Up @@ -2005,6 +2013,106 @@ fn test_exporter_prefers_gateway_span_over_non_exact_hook_summary() {
}));
}

#[test]
fn test_exporter_keeps_overlapping_non_exact_hook_and_gateway_spans_without_shared_request_key() {
let exporter = AtifExporter::new("session-1".to_string(), make_agent_info());
let base = base_timestamp();
let parent_uuid = Uuid::now_v7();
let hook_uuid = Uuid::now_v7();
let gateway_uuid = Uuid::now_v7();
let request = json!({
"messages": [{"role": "user", "content": "Reply with exactly gateway_distinct_ok"}],
"model": "test-model"
});

let mut hook_start = event_builder(hook_uuid, EventType::Start)
.name("openrouter")
.parent_uuid(parent_uuid)
.scope_type(ScopeType::Llm)
.model_name("test-model")
.metadata(json!({
"hook_event_name": "pre_api_request",
"api_call_id": "hook-request",
"fidelity_source": "agent_api_hooks",
"provider_payload_exact": false
}))
.input(json!({
"content": {"message_count": 2, "request_char_count": 128},
"headers": {}
}))
.build();
let mut gateway_start = event_builder(gateway_uuid, EventType::Start)
.name("openai.chat_completions")
.parent_uuid(parent_uuid)
.scope_type(ScopeType::Llm)
.model_name("test-model")
.metadata(json!({
"gateway_path": "/v1/chat/completions",
"llm_correlation_request_id": "gateway-request"
}))
.input(json!({"content": request.clone(), "headers": {}}))
.build();
let mut gateway_end = event_builder(gateway_uuid, EventType::End)
.name("openai.chat_completions")
.parent_uuid(parent_uuid)
.scope_type(ScopeType::Llm)
.model_name("test-model")
.metadata(json!({
"gateway_path": "/v1/chat/completions",
"llm_correlation_request_id": "gateway-request"
}))
.output(json!({"choices": [{"message": {"content": "gateway_distinct_ok"}}]}))
.build();
let mut hook_end = event_builder(hook_uuid, EventType::End)
.name("openrouter")
.parent_uuid(parent_uuid)
.scope_type(ScopeType::Llm)
.model_name("test-model")
.metadata(json!({
"hook_event_name": "post_api_request",
"api_call_id": "hook-request",
"fidelity_source": "agent_api_hooks",
"provider_payload_exact": false
}))
.output(json!({"assistant_content_chars": 10, "finish_reason": "stop"}))
.build();

for (idx, event) in [
&mut hook_start,
&mut gateway_start,
&mut gateway_end,
&mut hook_end,
]
.into_iter()
.enumerate()
{
set_event_timestamp(event, base + chrono::Duration::milliseconds(idx as i64));
}

{
let mut state = exporter.state.lock().unwrap();
state
.events
.extend([hook_start, gateway_start, gateway_end, hook_end]);
}

let trajectory = exporter.export().unwrap();
assert_eq!(trajectory.steps.len(), 4);
let function_name_count = |name: &str| {
trajectory
.steps
.iter()
.filter(|step| {
step.extra
.as_ref()
.is_some_and(|extra| extra["ancestry"]["function_name"] == name)
})
.count()
};
assert!(function_name_count("openrouter") > 0);
assert!(function_name_count("openai.chat_completions") > 0);
}

#[test]
fn test_exporter_keeps_sequential_same_content_llm_spans() {
let exporter = AtifExporter::new("session-1".to_string(), make_agent_info());
Expand Down
Loading