diff --git a/.sampo/changesets/warn-ai-gateway-double-capture.md b/.sampo/changesets/warn-ai-gateway-double-capture.md new file mode 100644 index 00000000..2da26723 --- /dev/null +++ b/.sampo/changesets/warn-ai-gateway-double-capture.md @@ -0,0 +1,5 @@ +--- +pypi/posthog: minor +--- + +Warn when an AI wrapper's `base_url` points at the PostHog AI Gateway. The gateway emits its own `$ai_generation`, so each call would be captured (and billed) twice. The wrapper only warns and never drops the event. Detection covers the wrapper funnels (OpenAI, Anthropic, LangChain) and the OTel span path. diff --git a/posthog/ai/gateway.py b/posthog/ai/gateway.py new file mode 100644 index 00000000..cdda6c68 --- /dev/null +++ b/posthog/ai/gateway.py @@ -0,0 +1,83 @@ +# Warn when a wrapper's base_url points at the PostHog AI Gateway: the gateway +# emits its own $ai_generation, so each call would be captured (and, for billable +# products, billed) twice. We only warn — the wrapper's event carries data the +# gateway never sees (groups, custom properties, trace hierarchy). + +import logging +import re +from typing import Any, Mapping, Optional +from urllib.parse import urlparse + +log = logging.getLogger("posthog") + +# Keep in sync with the gateway's deployed hosts (see services/llm-gateway in the +# main repo). gateway.us.posthog.com is live today; the rest are listed ahead of +# any traffic moving to them. +POSTHOG_AI_GATEWAY_HOSTS = [ + "gateway.posthog.com", + "gateway.us.posthog.com", + "gateway.eu.posthog.com", + "ai-gateway.us.posthog.com", + "ai-gateway.eu.posthog.com", +] + +# Swap for the dedicated AI Gateway page once it ships. +_GATEWAY_DOCS_URL = "https://posthog.com/docs/ai-observability" + +_SCHEME_RE = re.compile(r"^[a-z][a-z0-9+.-]*://", re.IGNORECASE) + +# OTel spans don't pass through the capture funnels, so detect the gateway from +# the span's host/URL attributes instead. These follow the GenAI / HTTP semantic +# conventions: `server.address` is a bare host, `url.full` a full URL, both of +# which is_posthog_ai_gateway_url accepts. +_OTEL_GATEWAY_URL_ATTRIBUTES = ("server.address", "url.full") + + +def _extract_host(base_url: str) -> Optional[str]: + try: + # Tolerate bare hosts that omit a scheme, e.g. "gateway.us.posthog.com/v1". + url = base_url if _SCHEME_RE.match(base_url) else f"https://{base_url}" + host = urlparse(url).hostname + return host.lower() if host else None + except Exception: + return None + + +def is_posthog_ai_gateway_url(base_url: Any) -> bool: + """Return True if base_url points at a known PostHog AI Gateway host.""" + if not base_url: + return False + host = _extract_host(str(base_url)) + return host is not None and host in POSTHOG_AI_GATEWAY_HOSTS + + +def warn_if_posthog_ai_gateway(base_url: Any) -> None: + """ + Warn when an AI wrapper is pointed at the PostHog AI Gateway. + + Warns on every gateway call by design: the misconfiguration is impossible to + miss that way, and a doubled bill is worse than noisy logs. We only warn and + never drop the event, because the wrapper event carries data the gateway + never sees (groups, custom properties, trace hierarchy). + """ + if not is_posthog_ai_gateway_url(base_url): + return + log.warning( + "[PostHog] The PostHog AI wrapper is pointed at the PostHog AI Gateway. " + "Both capture $ai_generation, so every call is double-counted and " + "double-billed. Use one or the other — see %s.", + _GATEWAY_DOCS_URL, + ) + + +def warn_if_posthog_ai_gateway_otel_attributes( + attributes: Optional[Mapping[str, Any]], +) -> None: + """Warn at most once per span when its host/URL attributes point at the gateway.""" + if not attributes: + return + for key in _OTEL_GATEWAY_URL_ATTRIBUTES: + value = attributes.get(key) + if isinstance(value, str) and is_posthog_ai_gateway_url(value): + warn_if_posthog_ai_gateway(value) + return diff --git a/posthog/ai/langchain/callbacks.py b/posthog/ai/langchain/callbacks.py index 6d8cda51..7045aa70 100644 --- a/posthog/ai/langchain/callbacks.py +++ b/posthog/ai/langchain/callbacks.py @@ -42,6 +42,7 @@ from pydantic import BaseModel from posthog import setup +from posthog.ai.gateway import warn_if_posthog_ai_gateway from posthog.ai.sanitization import sanitize_langchain from posthog.ai.utils import get_model_params, with_privacy_mode from posthog.client import Client @@ -622,6 +623,8 @@ def _capture_generation( "$ai_framework": "langchain", } + warn_if_posthog_ai_gateway(run.base_url) + if isinstance(run.posthog_properties, dict): event_properties.update(run.posthog_properties) diff --git a/posthog/ai/otel/exporter.py b/posthog/ai/otel/exporter.py index bd2bd996..02c1c22e 100644 --- a/posthog/ai/otel/exporter.py +++ b/posthog/ai/otel/exporter.py @@ -11,6 +11,7 @@ from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter +from ..gateway import warn_if_posthog_ai_gateway_otel_attributes from .spans import DEFAULT_HOST, is_ai_span @@ -65,6 +66,8 @@ def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: ai_spans = [span for span in spans if is_ai_span(span)] if not ai_spans: return SpanExportResult.SUCCESS + for span in ai_spans: + warn_if_posthog_ai_gateway_otel_attributes(span.attributes) return self._exporter.export(ai_spans) def shutdown(self) -> None: diff --git a/posthog/ai/otel/processor.py b/posthog/ai/otel/processor.py index 1a1f7bb7..4f520591 100644 --- a/posthog/ai/otel/processor.py +++ b/posthog/ai/otel/processor.py @@ -12,6 +12,7 @@ from opentelemetry.sdk.trace.export import BatchSpanProcessor from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter +from ..gateway import warn_if_posthog_ai_gateway_otel_attributes from .spans import DEFAULT_HOST, is_ai_span @@ -70,6 +71,7 @@ def on_end(self, span: ReadableSpan) -> None: """ if not is_ai_span(span): return + warn_if_posthog_ai_gateway_otel_attributes(span.attributes) self._processor.on_end(span) def shutdown(self) -> None: diff --git a/posthog/ai/utils.py b/posthog/ai/utils.py index 7d170f6a..127f5d11 100644 --- a/posthog/ai/utils.py +++ b/posthog/ai/utils.py @@ -3,6 +3,7 @@ from typing import Any, Callable, Dict, List, Optional, cast from posthog import get_tags, identify_context, new_context, tag, contexts +from posthog.ai.gateway import warn_if_posthog_ai_gateway from posthog.ai.sanitization import ( sanitize_anthropic, sanitize_gemini, @@ -423,6 +424,7 @@ def call_llm_and_track_usage( tag("$ai_latency", latency) tag("$ai_trace_id", posthog_trace_id) tag("$ai_base_url", str(base_url)) + warn_if_posthog_ai_gateway(base_url) available_tool_calls = extract_available_tool_calls(provider, kwargs) @@ -572,6 +574,7 @@ async def call_llm_and_track_usage_async( tag("$ai_latency", latency) tag("$ai_trace_id", posthog_trace_id) tag("$ai_base_url", str(base_url)) + warn_if_posthog_ai_gateway(base_url) available_tool_calls = extract_available_tool_calls(provider, kwargs) @@ -706,6 +709,8 @@ def capture_streaming_event( **(event_data.get("properties") or {}), } + warn_if_posthog_ai_gateway(event_data["base_url"]) + # Determine token source: SDK-computed vs externally overridden sdk_token_tags = { "$ai_input_tokens": event_data["usage_stats"].get("input_tokens", 0), diff --git a/posthog/test/ai/test_gateway.py b/posthog/test/ai/test_gateway.py new file mode 100644 index 00000000..584517b0 --- /dev/null +++ b/posthog/test/ai/test_gateway.py @@ -0,0 +1,148 @@ +import logging + +import pytest + +from posthog.ai.gateway import ( + POSTHOG_AI_GATEWAY_HOSTS, + is_posthog_ai_gateway_url, + warn_if_posthog_ai_gateway, + warn_if_posthog_ai_gateway_otel_attributes, +) + + +@pytest.mark.parametrize("host", POSTHOG_AI_GATEWAY_HOSTS) +def test_detects_every_gateway_host(host): + assert is_posthog_ai_gateway_url(f"https://{host}/v1") is True + + +@pytest.mark.parametrize( + "url", + [ + "https://gateway.us.posthog.com/v1", + "https://gateway.us.posthog.com/signals/v1", + "https://gateway.us.posthog.com/anthropic", + ], +) +def test_detects_live_host_with_route_prefix(url): + assert is_posthog_ai_gateway_url(url) is True + + +@pytest.mark.parametrize( + "label,url", + [ + ("http scheme", "http://gateway.us.posthog.com/v1"), + ("uppercase host", "https://GATEWAY.US.POSTHOG.COM/v1"), + ("missing scheme", "gateway.us.posthog.com/v1"), + ("port", "http://gateway.us.posthog.com:443/anthropic"), + ], +) +def test_matches_gateway_host_variants(label, url): + assert is_posthog_ai_gateway_url(url) is True + + +@pytest.mark.parametrize( + "label,url", + [ + ("ingestion host", "https://us.i.posthog.com"), + ("app host", "https://eu.posthog.com"), + ("openai", "https://api.openai.com/v1"), + ("anthropic", "https://api.anthropic.com"), + ("look-alike domain", "https://gateway.us.posthog.com.evil.example/v1"), + ], +) +def test_does_not_match_non_gateway_url(label, url): + assert is_posthog_ai_gateway_url(url) is False + + +@pytest.mark.parametrize( + "label,value", + [ + ("empty string", ""), + ("none", None), + ("malformed", "::::not a url"), + ], +) +def test_does_not_match_empty_or_malformed(label, value): + assert is_posthog_ai_gateway_url(value) is False + + +def _warnings(caplog): + return [r for r in caplog.records if r.levelno == logging.WARNING] + + +def test_warns_with_double_counting_message(caplog): + with caplog.at_level(logging.WARNING, logger="posthog"): + warn_if_posthog_ai_gateway("https://gateway.us.posthog.com/v1") + + warnings = _warnings(caplog) + assert len(warnings) == 1 + message = warnings[0].getMessage() + assert "[PostHog]" in message + assert "PostHog AI Gateway" in message + assert "$ai_generation" in message + assert "double-counted and double-billed" in message + assert "https://posthog.com/docs/ai-observability" in message + + +def test_warns_on_every_gateway_call(caplog): + with caplog.at_level(logging.WARNING, logger="posthog"): + for _ in range(5): + warn_if_posthog_ai_gateway("https://gateway.us.posthog.com/v1") + + assert len(_warnings(caplog)) == 5 + + +@pytest.mark.parametrize( + "base_url", + ["https://api.openai.com/v1", None, ""], +) +def test_does_not_warn_for_non_gateway(base_url, caplog): + with caplog.at_level(logging.WARNING, logger="posthog"): + warn_if_posthog_ai_gateway(base_url) + + assert _warnings(caplog) == [] + + +@pytest.mark.parametrize( + "label,attributes", + [ + ("server.address bare host", {"server.address": "gateway.us.posthog.com"}), + ( + "url.full full URL", + {"url.full": "https://gateway.us.posthog.com/v1/chat/completions"}, + ), + ], +) +def test_otel_attributes_warn_when_gateway_detected(label, attributes, caplog): + with caplog.at_level(logging.WARNING, logger="posthog"): + warn_if_posthog_ai_gateway_otel_attributes(attributes) + + assert len(_warnings(caplog)) == 1 + assert "PostHog AI Gateway" in _warnings(caplog)[0].getMessage() + + +def test_otel_attributes_warn_at_most_once_per_span(caplog): + with caplog.at_level(logging.WARNING, logger="posthog"): + warn_if_posthog_ai_gateway_otel_attributes( + { + "server.address": "gateway.us.posthog.com", + "url.full": "https://gateway.us.posthog.com/v1", + } + ) + + assert len(_warnings(caplog)) == 1 + + +@pytest.mark.parametrize( + "label,attributes", + [ + ("none", None), + ("empty", {}), + ("non-gateway", {"server.address": "api.openai.com"}), + ], +) +def test_otel_attributes_silent_for_non_gateway(label, attributes, caplog): + with caplog.at_level(logging.WARNING, logger="posthog"): + warn_if_posthog_ai_gateway_otel_attributes(attributes) + + assert _warnings(caplog) == []