Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .sampo/changesets/warn-ai-gateway-double-capture.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
pypi/posthog: minor
---

Warn when an AI wrapper's `base_url` points at the PostHog AI Gateway. The gateway emits its own `$ai_generation`, so each call would be captured (and billed) twice. The wrapper only warns and never drops the event. Detection covers the wrapper funnels (OpenAI, Anthropic, LangChain) and the OTel span path.
83 changes: 83 additions & 0 deletions posthog/ai/gateway.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# Warn when a wrapper's base_url points at the PostHog AI Gateway: the gateway
# emits its own $ai_generation, so each call would be captured (and, for billable
# products, billed) twice. We only warn — the wrapper's event carries data the
# gateway never sees (groups, custom properties, trace hierarchy).

import logging
import re
from typing import Any, Mapping, Optional
from urllib.parse import urlparse

log = logging.getLogger("posthog")

# Keep in sync with the gateway's deployed hosts (see services/llm-gateway in the
# main repo). gateway.us.posthog.com is live today; the rest are listed ahead of
# any traffic moving to them.
POSTHOG_AI_GATEWAY_HOSTS = [
"gateway.posthog.com",
"gateway.us.posthog.com",
"gateway.eu.posthog.com",
"ai-gateway.us.posthog.com",
"ai-gateway.eu.posthog.com",
]
Comment thread
richardsolomou marked this conversation as resolved.

# Swap for the dedicated AI Gateway page once it ships.
_GATEWAY_DOCS_URL = "https://posthog.com/docs/ai-observability"

_SCHEME_RE = re.compile(r"^[a-z][a-z0-9+.-]*://", re.IGNORECASE)

# OTel spans don't pass through the capture funnels, so detect the gateway from
# the span's host/URL attributes instead. These follow the GenAI / HTTP semantic
# conventions: `server.address` is a bare host, `url.full` a full URL, both of
# which is_posthog_ai_gateway_url accepts.
_OTEL_GATEWAY_URL_ATTRIBUTES = ("server.address", "url.full")


def _extract_host(base_url: str) -> Optional[str]:
try:
# Tolerate bare hosts that omit a scheme, e.g. "gateway.us.posthog.com/v1".
url = base_url if _SCHEME_RE.match(base_url) else f"https://{base_url}"
host = urlparse(url).hostname
return host.lower() if host else None
except Exception:
return None


def is_posthog_ai_gateway_url(base_url: Any) -> bool:
"""Return True if base_url points at a known PostHog AI Gateway host."""
if not base_url:
return False
host = _extract_host(str(base_url))
return host is not None and host in POSTHOG_AI_GATEWAY_HOSTS


def warn_if_posthog_ai_gateway(base_url: Any) -> None:
"""
Warn when an AI wrapper is pointed at the PostHog AI Gateway.

Warns on every gateway call by design: the misconfiguration is impossible to
miss that way, and a doubled bill is worse than noisy logs. We only warn and
never drop the event, because the wrapper event carries data the gateway
never sees (groups, custom properties, trace hierarchy).
"""
if not is_posthog_ai_gateway_url(base_url):
return
log.warning(
"[PostHog] The PostHog AI wrapper is pointed at the PostHog AI Gateway. "
"Both capture $ai_generation, so every call is double-counted and "
"double-billed. Use one or the other — see %s.",
_GATEWAY_DOCS_URL,
)


def warn_if_posthog_ai_gateway_otel_attributes(
attributes: Optional[Mapping[str, Any]],
) -> None:
"""Warn at most once per span when its host/URL attributes point at the gateway."""
if not attributes:
return
for key in _OTEL_GATEWAY_URL_ATTRIBUTES:
value = attributes.get(key)
if isinstance(value, str) and is_posthog_ai_gateway_url(value):
warn_if_posthog_ai_gateway(value)
return
3 changes: 3 additions & 0 deletions posthog/ai/langchain/callbacks.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
from pydantic import BaseModel

from posthog import setup
from posthog.ai.gateway import warn_if_posthog_ai_gateway
from posthog.ai.sanitization import sanitize_langchain
from posthog.ai.utils import get_model_params, with_privacy_mode
from posthog.client import Client
Expand Down Expand Up @@ -622,6 +623,8 @@ def _capture_generation(
"$ai_framework": "langchain",
}

warn_if_posthog_ai_gateway(run.base_url)

if isinstance(run.posthog_properties, dict):
event_properties.update(run.posthog_properties)

Expand Down
3 changes: 3 additions & 0 deletions posthog/ai/otel/exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter

from ..gateway import warn_if_posthog_ai_gateway_otel_attributes
from .spans import DEFAULT_HOST, is_ai_span


Expand Down Expand Up @@ -65,6 +66,8 @@ def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
ai_spans = [span for span in spans if is_ai_span(span)]
if not ai_spans:
return SpanExportResult.SUCCESS
for span in ai_spans:
warn_if_posthog_ai_gateway_otel_attributes(span.attributes)
return self._exporter.export(ai_spans)

def shutdown(self) -> None:
Expand Down
2 changes: 2 additions & 0 deletions posthog/ai/otel/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter

from ..gateway import warn_if_posthog_ai_gateway_otel_attributes
from .spans import DEFAULT_HOST, is_ai_span


Expand Down Expand Up @@ -70,6 +71,7 @@ def on_end(self, span: ReadableSpan) -> None:
"""
if not is_ai_span(span):
return
warn_if_posthog_ai_gateway_otel_attributes(span.attributes)
self._processor.on_end(span)

def shutdown(self) -> None:
Expand Down
5 changes: 5 additions & 0 deletions posthog/ai/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import Any, Callable, Dict, List, Optional, cast

from posthog import get_tags, identify_context, new_context, tag, contexts
from posthog.ai.gateway import warn_if_posthog_ai_gateway
from posthog.ai.sanitization import (
sanitize_anthropic,
sanitize_gemini,
Expand Down Expand Up @@ -423,6 +424,7 @@ def call_llm_and_track_usage(
tag("$ai_latency", latency)
tag("$ai_trace_id", posthog_trace_id)
tag("$ai_base_url", str(base_url))
warn_if_posthog_ai_gateway(base_url)

available_tool_calls = extract_available_tool_calls(provider, kwargs)

Expand Down Expand Up @@ -572,6 +574,7 @@ async def call_llm_and_track_usage_async(
tag("$ai_latency", latency)
tag("$ai_trace_id", posthog_trace_id)
tag("$ai_base_url", str(base_url))
warn_if_posthog_ai_gateway(base_url)

available_tool_calls = extract_available_tool_calls(provider, kwargs)

Expand Down Expand Up @@ -706,6 +709,8 @@ def capture_streaming_event(
**(event_data.get("properties") or {}),
}

warn_if_posthog_ai_gateway(event_data["base_url"])

# Determine token source: SDK-computed vs externally overridden
sdk_token_tags = {
"$ai_input_tokens": event_data["usage_stats"].get("input_tokens", 0),
Expand Down
148 changes: 148 additions & 0 deletions posthog/test/ai/test_gateway.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
import logging

import pytest

from posthog.ai.gateway import (
POSTHOG_AI_GATEWAY_HOSTS,
is_posthog_ai_gateway_url,
warn_if_posthog_ai_gateway,
warn_if_posthog_ai_gateway_otel_attributes,
)


@pytest.mark.parametrize("host", POSTHOG_AI_GATEWAY_HOSTS)
def test_detects_every_gateway_host(host):
assert is_posthog_ai_gateway_url(f"https://{host}/v1") is True


@pytest.mark.parametrize(
"url",
[
"https://gateway.us.posthog.com/v1",
"https://gateway.us.posthog.com/signals/v1",
"https://gateway.us.posthog.com/anthropic",
],
)
def test_detects_live_host_with_route_prefix(url):
assert is_posthog_ai_gateway_url(url) is True


@pytest.mark.parametrize(
"label,url",
[
("http scheme", "http://gateway.us.posthog.com/v1"),
("uppercase host", "https://GATEWAY.US.POSTHOG.COM/v1"),
("missing scheme", "gateway.us.posthog.com/v1"),
("port", "http://gateway.us.posthog.com:443/anthropic"),
],
)
def test_matches_gateway_host_variants(label, url):
assert is_posthog_ai_gateway_url(url) is True


@pytest.mark.parametrize(
"label,url",
[
("ingestion host", "https://us.i.posthog.com"),
("app host", "https://eu.posthog.com"),
("openai", "https://api.openai.com/v1"),
("anthropic", "https://api.anthropic.com"),
("look-alike domain", "https://gateway.us.posthog.com.evil.example/v1"),
],
)
def test_does_not_match_non_gateway_url(label, url):
assert is_posthog_ai_gateway_url(url) is False


@pytest.mark.parametrize(
"label,value",
[
("empty string", ""),
("none", None),
("malformed", "::::not a url"),
],
)
def test_does_not_match_empty_or_malformed(label, value):
assert is_posthog_ai_gateway_url(value) is False


def _warnings(caplog):
return [r for r in caplog.records if r.levelno == logging.WARNING]


def test_warns_with_double_counting_message(caplog):
with caplog.at_level(logging.WARNING, logger="posthog"):
warn_if_posthog_ai_gateway("https://gateway.us.posthog.com/v1")

warnings = _warnings(caplog)
assert len(warnings) == 1
message = warnings[0].getMessage()
assert "[PostHog]" in message
assert "PostHog AI Gateway" in message
assert "$ai_generation" in message
assert "double-counted and double-billed" in message
assert "https://posthog.com/docs/ai-observability" in message


def test_warns_on_every_gateway_call(caplog):
with caplog.at_level(logging.WARNING, logger="posthog"):
for _ in range(5):
warn_if_posthog_ai_gateway("https://gateway.us.posthog.com/v1")

assert len(_warnings(caplog)) == 5


@pytest.mark.parametrize(
"base_url",
["https://api.openai.com/v1", None, ""],
)
def test_does_not_warn_for_non_gateway(base_url, caplog):
with caplog.at_level(logging.WARNING, logger="posthog"):
warn_if_posthog_ai_gateway(base_url)

assert _warnings(caplog) == []


@pytest.mark.parametrize(
"label,attributes",
[
("server.address bare host", {"server.address": "gateway.us.posthog.com"}),
(
"url.full full URL",
{"url.full": "https://gateway.us.posthog.com/v1/chat/completions"},
),
],
)
def test_otel_attributes_warn_when_gateway_detected(label, attributes, caplog):
with caplog.at_level(logging.WARNING, logger="posthog"):
warn_if_posthog_ai_gateway_otel_attributes(attributes)

assert len(_warnings(caplog)) == 1
assert "PostHog AI Gateway" in _warnings(caplog)[0].getMessage()


def test_otel_attributes_warn_at_most_once_per_span(caplog):
with caplog.at_level(logging.WARNING, logger="posthog"):
warn_if_posthog_ai_gateway_otel_attributes(
{
"server.address": "gateway.us.posthog.com",
"url.full": "https://gateway.us.posthog.com/v1",
}
)

assert len(_warnings(caplog)) == 1


@pytest.mark.parametrize(
"label,attributes",
[
("none", None),
("empty", {}),
("non-gateway", {"server.address": "api.openai.com"}),
],
)
def test_otel_attributes_silent_for_non_gateway(label, attributes, caplog):
with caplog.at_level(logging.WARNING, logger="posthog"):
warn_if_posthog_ai_gateway_otel_attributes(attributes)

assert _warnings(caplog) == []
Loading