From 33a4d49ee8ecef451fac57e915e0623e217106ce Mon Sep 17 00:00:00 2001 From: Tom Owers Date: Wed, 10 Jun 2026 18:21:35 +0100 Subject: [PATCH 1/2] feat(exceptions): opt-in Ed25519 signing of $exception events Backend services can configure an Ed25519 private key; the SDK then signs every captured $exception over a canonical, length-prefixed projection of its $exception_list and attaches $exception_signature / _key_id / _version. PostHog error tracking verifies this against the project's registered public key to prove the exception genuinely came from the customer's backend (not forged through the public ingest key). Signing happens after before_send so it covers the final content. Crypto is an optional [exception-signing] extra to keep the base SDK lean. --- .sampo/changesets/exception-signing.md | 5 + posthog/__init__.py | 9 ++ posthog/client.py | 22 +++ posthog/exception_signing.py | 135 ++++++++++++++++ posthog/test/test_exception_signing.py | 215 +++++++++++++++++++++++++ pyproject.toml | 3 + 6 files changed, 389 insertions(+) create mode 100644 .sampo/changesets/exception-signing.md create mode 100644 posthog/exception_signing.py create mode 100644 posthog/test/test_exception_signing.py diff --git a/.sampo/changesets/exception-signing.md b/.sampo/changesets/exception-signing.md new file mode 100644 index 00000000..38422d13 --- /dev/null +++ b/.sampo/changesets/exception-signing.md @@ -0,0 +1,5 @@ +--- +pypi/posthog: minor +--- + +feat(exceptions): add opt-in Ed25519 signing of `$exception` events. Set `enable_exception_signing=True` and provide an Ed25519 private key in `exception_signing_private_key`, then register the matching public key in your PostHog project. The SDK signs each captured exception over a canonical projection of its `$exception_list`, so error-tracking ingestion can verify it genuinely came from your backend (rather than being forged through the public ingest key) and mark it verified. Backend use only — never ship a private key in a browser/mobile app. Requires the new `[exception-signing]` extra (`pip install posthoganalytics[exception-signing]`). diff --git a/posthog/__init__.py b/posthog/__init__.py index 892a68c6..5af64188 100644 --- a/posthog/__init__.py +++ b/posthog/__init__.py @@ -338,6 +338,13 @@ def get_tags() -> Dict[str, Any]: code_variables_ignore_patterns = DEFAULT_CODE_VARIABLES_IGNORE_PATTERNS in_app_modules = None # type: Optional[list[str]] +# Opt-in Ed25519 signing of $exception events. Set enable_exception_signing=True and provide an +# Ed25519 private key (PEM) in exception_signing_private_key; register the matching public key in +# your PostHog project so ingestion can stamp a trusted $exception_verified flag. Backend use only +# (never ship a private key in a browser/mobile app). Requires the [exception-signing] extra. +enable_exception_signing = False # type: bool +exception_signing_private_key = None # type: Optional[str] + # NOTE - this and following functions take unpacked kwargs because we needed to make # it impossible to write `posthog.capture(distinct-id, event-name)` - basically, to enforce @@ -1103,6 +1110,8 @@ def setup() -> Client: code_variables_mask_patterns=code_variables_mask_patterns, code_variables_ignore_patterns=code_variables_ignore_patterns, in_app_modules=in_app_modules, + enable_exception_signing=enable_exception_signing, + exception_signing_private_key=exception_signing_private_key, ) # Always set in case user changes it. Preserve Client's auto-disabled state diff --git a/posthog/client.py b/posthog/client.py index 6caa0c3c..565ce76b 100644 --- a/posthog/client.py +++ b/posthog/client.py @@ -213,6 +213,8 @@ def __init__( code_variables_mask_patterns=None, code_variables_ignore_patterns=None, in_app_modules: list[str] | None = None, + enable_exception_signing=False, + exception_signing_private_key=None, _dedicated_ai_endpoint=False, ): """ @@ -376,6 +378,17 @@ def __init__( self._set_before_send(before_send) + # Opt-in Ed25519 signing of $exception events (parse the key once here). + self.enable_exception_signing = enable_exception_signing + self._exception_signer = None + if enable_exception_signing and exception_signing_private_key: + from posthog.exception_signing import make_signer + + try: + self._exception_signer = make_signer(exception_signing_private_key) + except Exception as e: + self.log.error("Failed to initialise exception signing: %s", e) + if self.enable_exception_autocapture: self.exception_capture = ExceptionCapture(self) @@ -1362,6 +1375,15 @@ def _enqueue(self, msg, disable_geoip): self.log.exception(f"Error in before_send callback: {e}") # Continue with the original message if callback fails + # Sign $exception events last, so the signature covers the final content actually sent + # (after any before_send mutation) and a before_send callback can't strip it. + if self._exception_signer is not None and msg.get("event") == "$exception": + try: + self._exception_signer.sign_event(msg) + except Exception as e: + self.log.exception(f"Error signing exception event: {e}") + # Leave the event unsigned rather than dropping it. + self.log.debug("queueing: %s", msg) # if send is False, return msg as if it was successfully queued diff --git a/posthog/exception_signing.py b/posthog/exception_signing.py new file mode 100644 index 00000000..75737dda --- /dev/null +++ b/posthog/exception_signing.py @@ -0,0 +1,135 @@ +"""Opt-in Ed25519 signing of ``$exception`` events. + +When a backend service configures an Ed25519 private key, the SDK signs every captured +``$exception`` event over a canonical projection of its ``$exception_list`` and attaches the +signature as event properties. PostHog's error-tracking ingestion (cymbal) re-derives the same +projection, verifies it against the project's registered *public* key, and stamps a trusted +``$exception_verified`` flag — proving the exception genuinely came from your backend rather +than being forged through the public ingest key. + +The canonical projection is a deliberately small, byte-stable subset of each exception +(type, message, and each frame's function/filename/lineno/module). It excludes everything +cymbal mutates during ingestion (in-app flags, absolute paths, source context, the injected +exception id) and anything float-valued, so the bytes the SDK signs match the bytes cymbal +verifies. The encoding is explicit length-prefixed binary rather than JSON, to avoid +cross-language canonicalisation pitfalls (key order, non-ASCII escaping, number formatting). + +Requires the optional ``cryptography`` dependency: ``pip install posthoganalytics[exception-signing]``. +""" + +import base64 +import hashlib +import struct +from typing import Any, Optional + +CANONICAL_MAGIC = b"PHEXC1\n" + +SIGNATURE_PROPERTY = "$exception_signature" +KEY_ID_PROPERTY = "$exception_signature_key_id" +VERSION_PROPERTY = "$exception_signature_version" +SIGNATURE_VERSION = 1 + + +def _lp(value: Any) -> bytes: + """Length-prefixed UTF-8 encoding: u32 big-endian length + bytes. None/missing -> empty.""" + if value is None: + data = b"" + else: + data = str(value).encode("utf-8") + return struct.pack(">I", len(data)) + data + + +def build_canonical(exception_list: Any) -> bytes: + """Deterministic, length-prefixed encoding of the signable projection of ``$exception_list``. + + Both the SDK (here) and cymbal must produce identical bytes for the same input, so this + reads only stable string/int fields and never floats. + """ + out = bytearray(CANONICAL_MAGIC) + exceptions = exception_list if isinstance(exception_list, list) else [] + out += struct.pack(">I", len(exceptions)) + for exc in exceptions: + exc = exc if isinstance(exc, dict) else {} + out += _lp(exc.get("type")) + out += _lp(exc.get("value")) + stacktrace = exc.get("stacktrace") + frames = stacktrace.get("frames") if isinstance(stacktrace, dict) else None + frames = frames if isinstance(frames, list) else [] + out += struct.pack(">I", len(frames)) + for frame in frames: + frame = frame if isinstance(frame, dict) else {} + out += _lp(frame.get("function")) + out += _lp(frame.get("filename")) + lineno = frame.get("lineno") + out += _lp(lineno if lineno is None else str(lineno)) + out += _lp(frame.get("module")) + return bytes(out) + + +def derive_key_id(public_key_raw: bytes) -> str: + """Stable short fingerprint of a raw 32-byte Ed25519 public key. + + Computed identically by the SDK (from the configured private key) and by PostHog (from the + registered public key), so a signature's key id resolves to the right stored key. + """ + digest = hashlib.sha256(public_key_raw).digest() + return base64.urlsafe_b64encode(digest).decode("ascii").rstrip("=")[:16] + + +class ExceptionSigner: + """Holds a parsed Ed25519 private key and signs ``$exception`` events. + + Constructed once at client init from a PEM private key. Raises a clear error if the optional + ``cryptography`` dependency is missing or the key isn't Ed25519. + """ + + def __init__(self, private_key_pem: str): + try: + from cryptography.hazmat.primitives import serialization + from cryptography.hazmat.primitives.asymmetric.ed25519 import ( + Ed25519PrivateKey, + ) + except ImportError as e: # pragma: no cover - exercised via install extras + raise ImportError( + "Exception signing requires the optional 'cryptography' dependency. " + "Install it with: pip install posthoganalytics[exception-signing]" + ) from e + + key = serialization.load_pem_private_key(private_key_pem.encode("utf-8"), password=None) + if not isinstance(key, Ed25519PrivateKey): + raise ValueError("exception_signing_private_key must be an Ed25519 private key (PEM)") + + from cryptography.hazmat.primitives.serialization import ( + Encoding, + PublicFormat, + ) + + self._key = key + public_raw = key.public_key().public_bytes(Encoding.Raw, PublicFormat.Raw) + self.key_id = derive_key_id(public_raw) + + def sign(self, canonical: bytes) -> str: + return base64.b64encode(self._key.sign(canonical)).decode("ascii") + + def sign_event(self, event: dict) -> dict: + """Attach signature properties to a ``$exception`` event in place; returns it. + + Non-exception events pass through untouched. + """ + if event.get("event") != "$exception": + return event + properties = event.get("properties") + if not isinstance(properties, dict): + return event + canonical = build_canonical(properties.get("$exception_list")) + properties[SIGNATURE_PROPERTY] = self.sign(canonical) + properties[KEY_ID_PROPERTY] = self.key_id + properties[VERSION_PROPERTY] = SIGNATURE_VERSION + return event + + +def make_signer(private_key_pem: Optional[str]) -> Optional[ExceptionSigner]: + """Build a signer from a PEM key, or None when no key is configured.""" + if not private_key_pem: + return None + return ExceptionSigner(private_key_pem) diff --git a/posthog/test/test_exception_signing.py b/posthog/test/test_exception_signing.py new file mode 100644 index 00000000..3fa7e5c0 --- /dev/null +++ b/posthog/test/test_exception_signing.py @@ -0,0 +1,215 @@ +import base64 +import unittest +from unittest import mock + +from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey + +from posthog.client import Client +from posthog.exception_signing import ( + KEY_ID_PROPERTY, + SIGNATURE_PROPERTY, + VERSION_PROPERTY, + ExceptionSigner, + build_canonical, + derive_key_id, +) +from posthog.test.test_utils import FAKE_TEST_API_KEY + +# --- Cross-language parity vector ------------------------------------------------------------- +# A fixed Ed25519 keypair (seed = bytes(range(32))) signing a fixed $exception_list. cymbal's +# Rust implementation MUST reproduce CANONICAL_HEX and verify SIGNATURE_B64 under PUBKEY_RAW_B64. +# If either side's canonical encoding or signing drifts, these assertions fail. +SEED = bytes(range(32)) +PUBKEY_RAW_B64 = "A6EHv/POEL4dcN0Y50vAmWfk1jCbpQ1fHdyGZBJVMbg=" +KEY_ID = "Vkdap1RjR0wChd9d" +CANONICAL_HEX = ( + "5048455843310a0000000100000009485454504572726f720000001e34303120436c69656e74204572726f72" + "3a20556e617574686f72697a65640000000200000007726571756573740000001272657175657374732f6d6f" + "64656c732e707900000004313032310000000f72657175657374732e6d6f64656c730000000b73796e635f73" + "747269706500000036706f7374686f672f74656d706f72616c2f646174615f696d706f7274732f736f757263" + "65732f7374726970652f736f757263652e707900000002343200000033706f7374686f672e74656d706f7261" + "6c2e646174615f696d706f7274732e736f75726365732e7374726970652e736f75726365" +) +SIGNATURE_B64 = ( + "Fyh19k2cC1k9M8cJr54TNH91MDdd67oaUnydyKm7E+QCPN3mK+h3N9Yp5nkM7xYtngD8km7ljqVXARGDmnfzAQ==" +) + +PARITY_EXCEPTION_LIST = [ + { + "type": "HTTPError", + "value": "401 Client Error: Unauthorized", + "stacktrace": { + "frames": [ + { + "function": "request", + "filename": "requests/models.py", + "lineno": 1021, + "module": "requests.models", + "in_app": False, + }, + { + "function": "sync_stripe", + "filename": "posthog/temporal/data_imports/sources/stripe/source.py", + "lineno": 42, + "module": "posthog.temporal.data_imports.sources.stripe.source", + "in_app": True, + }, + ] + }, + } +] + + +def _private_key_pem(seed=SEED): + sk = Ed25519PrivateKey.from_private_bytes(seed) + return sk.private_bytes( + serialization.Encoding.PEM, + serialization.PrivateFormat.PKCS8, + serialization.NoEncryption(), + ).decode() + + +class TestCanonical(unittest.TestCase): + def test_canonical_matches_parity_vector(self): + self.assertEqual(build_canonical(PARITY_EXCEPTION_LIST).hex(), CANONICAL_HEX) + + def test_canonical_is_deterministic(self): + self.assertEqual( + build_canonical(PARITY_EXCEPTION_LIST), build_canonical(PARITY_EXCEPTION_LIST) + ) + + def test_excluded_fields_do_not_affect_canonical(self): + # in_app / abs_path / context are excluded — changing them must not change the bytes. + mutated = [ + { + **PARITY_EXCEPTION_LIST[0], + "stacktrace": { + "frames": [ + {**f, "in_app": not f["in_app"], "abs_path": "/tmp/x", "context_line": "y"} + for f in PARITY_EXCEPTION_LIST[0]["stacktrace"]["frames"] + ] + }, + } + ] + self.assertEqual(build_canonical(mutated), build_canonical(PARITY_EXCEPTION_LIST)) + + def test_changing_message_changes_canonical(self): + mutated = [{**PARITY_EXCEPTION_LIST[0], "value": "different"}] + self.assertNotEqual(build_canonical(mutated), build_canonical(PARITY_EXCEPTION_LIST)) + + def test_tolerates_missing_and_malformed(self): + self.assertTrue(build_canonical([]).startswith(b"PHEXC1\n")) + self.assertTrue(build_canonical(None).startswith(b"PHEXC1\n")) + self.assertTrue(build_canonical([{}]).startswith(b"PHEXC1\n")) + + +class TestSigning(unittest.TestCase): + def test_key_id_matches_parity_vector(self): + self.assertEqual(derive_key_id(base64.b64decode(PUBKEY_RAW_B64)), KEY_ID) + + def test_signer_key_id_derives_from_private_key(self): + signer = ExceptionSigner(_private_key_pem()) + self.assertEqual(signer.key_id, KEY_ID) + + def test_signature_matches_parity_vector(self): + signer = ExceptionSigner(_private_key_pem()) + self.assertEqual(signer.sign(build_canonical(PARITY_EXCEPTION_LIST)), SIGNATURE_B64) + + def test_signature_verifies_with_public_key(self): + signer = ExceptionSigner(_private_key_pem()) + canonical = build_canonical(PARITY_EXCEPTION_LIST) + sig = base64.b64decode(signer.sign(canonical)) + public_key = Ed25519PrivateKey.from_private_bytes(SEED).public_key() + public_key.verify(sig, canonical) # raises if invalid + + def test_rejects_non_ed25519_key(self): + from cryptography.hazmat.primitives.asymmetric import rsa + + rsa_pem = ( + rsa.generate_private_key(public_exponent=65537, key_size=2048) + .private_bytes( + serialization.Encoding.PEM, + serialization.PrivateFormat.PKCS8, + serialization.NoEncryption(), + ) + .decode() + ) + with self.assertRaises(ValueError): + ExceptionSigner(rsa_pem) + + def test_sign_event_attaches_props_only_for_exceptions(self): + signer = ExceptionSigner(_private_key_pem()) + event = {"event": "$exception", "properties": {"$exception_list": PARITY_EXCEPTION_LIST}} + signer.sign_event(event) + self.assertEqual(event["properties"][SIGNATURE_PROPERTY], SIGNATURE_B64) + self.assertEqual(event["properties"][KEY_ID_PROPERTY], KEY_ID) + self.assertEqual(event["properties"][VERSION_PROPERTY], 1) + + other = {"event": "$pageview", "properties": {}} + signer.sign_event(other) + self.assertNotIn(SIGNATURE_PROPERTY, other["properties"]) + + +class TestClientIntegration(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.client_post_patcher = mock.patch("posthog.client.batch_post") + cls.consumer_post_patcher = mock.patch("posthog.consumer.batch_post") + cls.client_post_patcher.start() + cls.consumer_post_patcher.start() + + @classmethod + def tearDownClass(cls): + cls.client_post_patcher.stop() + cls.consumer_post_patcher.stop() + + def _enqueue_and_read(self, client): + # _enqueue signs the post-clean() message that actually gets queued, so read it + # back off the queue rather than the input dict. + msg = { + "event": "$exception", + "properties": {"$exception_list": PARITY_EXCEPTION_LIST}, + "timestamp": None, + } + client._enqueue(msg, disable_geoip=True) + return client.queue.get_nowait() + + def test_client_signs_exception_events(self): + client = Client( + FAKE_TEST_API_KEY, + enable_exception_signing=True, + exception_signing_private_key=_private_key_pem(), + ) + self.assertIsNotNone(client._exception_signer) + queued = self._enqueue_and_read(client) + self.assertEqual(queued["properties"][SIGNATURE_PROPERTY], SIGNATURE_B64) + self.assertEqual(queued["properties"][KEY_ID_PROPERTY], KEY_ID) + self.assertEqual(queued["properties"][VERSION_PROPERTY], 1) + + def test_signing_happens_after_before_send(self): + # A user's before_send runs before signing, so the signature covers the final content + # and the callback can't strip it. + def before_send(event): + if event.get("event") == "$exception": + event["properties"][SIGNATURE_PROPERTY] = "attacker-controlled" + return event + + client = Client( + FAKE_TEST_API_KEY, + before_send=before_send, + enable_exception_signing=True, + exception_signing_private_key=_private_key_pem(), + ) + queued = self._enqueue_and_read(client) + self.assertEqual(queued["properties"][SIGNATURE_PROPERTY], SIGNATURE_B64) + + def test_client_without_signing_adds_no_props(self): + client = Client(FAKE_TEST_API_KEY) + self.assertIsNone(client._exception_signer) + queued = self._enqueue_and_read(client) + self.assertNotIn(SIGNATURE_PROPERTY, queued["properties"]) + + +if __name__ == "__main__": + unittest.main() diff --git a/pyproject.toml b/pyproject.toml index 9c30229c..8c2f77ef 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -40,6 +40,9 @@ otel = [ "opentelemetry-sdk>=1.20.0", "opentelemetry-exporter-otlp-proto-http>=1.20.0", ] +# Required to sign $exception events (enable_exception_signing). Optional so the base SDK +# keeps a minimal dependency footprint. +exception-signing = ["cryptography>=41.0.0"] dev = [ "django-stubs", "lxml", From 9ab4d193efc4a9268a4aa8919e8ad8015b7508d6 Mon Sep 17 00:00:00 2001 From: Tom Owers Date: Wed, 10 Jun 2026 18:58:10 +0100 Subject: [PATCH 2/2] fix(exceptions): warn on signing misconfig; ruff format; split tests - Log a warning when enable_exception_signing=True but no key is provided, instead of silently sending unsigned (review). - Apply ruff format (CI code-quality). - Parametrise/split the two multi-case tests so each case fails independently. --- posthog/client.py | 20 +++++++---- posthog/exception_signing.py | 8 +++-- posthog/test/test_exception_signing.py | 50 +++++++++++++++++++------- 3 files changed, 57 insertions(+), 21 deletions(-) diff --git a/posthog/client.py b/posthog/client.py index 565ce76b..02610f14 100644 --- a/posthog/client.py +++ b/posthog/client.py @@ -381,13 +381,21 @@ def __init__( # Opt-in Ed25519 signing of $exception events (parse the key once here). self.enable_exception_signing = enable_exception_signing self._exception_signer = None - if enable_exception_signing and exception_signing_private_key: - from posthog.exception_signing import make_signer + if enable_exception_signing: + if not exception_signing_private_key: + # Opted into a security feature but gave us nothing to sign with — warn loudly + # rather than silently sending every $exception unsigned. + self.log.warning( + "enable_exception_signing is True but no exception_signing_private_key was " + "provided; $exception events will be sent UNSIGNED." + ) + else: + from posthog.exception_signing import make_signer - try: - self._exception_signer = make_signer(exception_signing_private_key) - except Exception as e: - self.log.error("Failed to initialise exception signing: %s", e) + try: + self._exception_signer = make_signer(exception_signing_private_key) + except Exception as e: + self.log.error("Failed to initialise exception signing: %s", e) if self.enable_exception_autocapture: self.exception_capture = ExceptionCapture(self) diff --git a/posthog/exception_signing.py b/posthog/exception_signing.py index 75737dda..3b6bd886 100644 --- a/posthog/exception_signing.py +++ b/posthog/exception_signing.py @@ -95,9 +95,13 @@ def __init__(self, private_key_pem: str): "Install it with: pip install posthoganalytics[exception-signing]" ) from e - key = serialization.load_pem_private_key(private_key_pem.encode("utf-8"), password=None) + key = serialization.load_pem_private_key( + private_key_pem.encode("utf-8"), password=None + ) if not isinstance(key, Ed25519PrivateKey): - raise ValueError("exception_signing_private_key must be an Ed25519 private key (PEM)") + raise ValueError( + "exception_signing_private_key must be an Ed25519 private key (PEM)" + ) from cryptography.hazmat.primitives.serialization import ( Encoding, diff --git a/posthog/test/test_exception_signing.py b/posthog/test/test_exception_signing.py index 3fa7e5c0..2cb8b5e4 100644 --- a/posthog/test/test_exception_signing.py +++ b/posthog/test/test_exception_signing.py @@ -31,9 +31,7 @@ "65732f7374726970652f736f757263652e707900000002343200000033706f7374686f672e74656d706f7261" "6c2e646174615f696d706f7274732e736f75726365732e7374726970652e736f75726365" ) -SIGNATURE_B64 = ( - "Fyh19k2cC1k9M8cJr54TNH91MDdd67oaUnydyKm7E+QCPN3mK+h3N9Yp5nkM7xYtngD8km7ljqVXARGDmnfzAQ==" -) +SIGNATURE_B64 = "Fyh19k2cC1k9M8cJr54TNH91MDdd67oaUnydyKm7E+QCPN3mK+h3N9Yp5nkM7xYtngD8km7ljqVXARGDmnfzAQ==" PARITY_EXCEPTION_LIST = [ { @@ -76,7 +74,8 @@ def test_canonical_matches_parity_vector(self): def test_canonical_is_deterministic(self): self.assertEqual( - build_canonical(PARITY_EXCEPTION_LIST), build_canonical(PARITY_EXCEPTION_LIST) + build_canonical(PARITY_EXCEPTION_LIST), + build_canonical(PARITY_EXCEPTION_LIST), ) def test_excluded_fields_do_not_affect_canonical(self): @@ -86,22 +85,31 @@ def test_excluded_fields_do_not_affect_canonical(self): **PARITY_EXCEPTION_LIST[0], "stacktrace": { "frames": [ - {**f, "in_app": not f["in_app"], "abs_path": "/tmp/x", "context_line": "y"} + { + **f, + "in_app": not f["in_app"], + "abs_path": "/tmp/x", + "context_line": "y", + } for f in PARITY_EXCEPTION_LIST[0]["stacktrace"]["frames"] ] }, } ] - self.assertEqual(build_canonical(mutated), build_canonical(PARITY_EXCEPTION_LIST)) + self.assertEqual( + build_canonical(mutated), build_canonical(PARITY_EXCEPTION_LIST) + ) def test_changing_message_changes_canonical(self): mutated = [{**PARITY_EXCEPTION_LIST[0], "value": "different"}] - self.assertNotEqual(build_canonical(mutated), build_canonical(PARITY_EXCEPTION_LIST)) + self.assertNotEqual( + build_canonical(mutated), build_canonical(PARITY_EXCEPTION_LIST) + ) def test_tolerates_missing_and_malformed(self): - self.assertTrue(build_canonical([]).startswith(b"PHEXC1\n")) - self.assertTrue(build_canonical(None).startswith(b"PHEXC1\n")) - self.assertTrue(build_canonical([{}]).startswith(b"PHEXC1\n")) + for case in ([], None, [{}], [{"stacktrace": {}}], [{"value": "x"}]): + with self.subTest(case=case): + self.assertTrue(build_canonical(case).startswith(b"PHEXC1\n")) class TestSigning(unittest.TestCase): @@ -114,7 +122,9 @@ def test_signer_key_id_derives_from_private_key(self): def test_signature_matches_parity_vector(self): signer = ExceptionSigner(_private_key_pem()) - self.assertEqual(signer.sign(build_canonical(PARITY_EXCEPTION_LIST)), SIGNATURE_B64) + self.assertEqual( + signer.sign(build_canonical(PARITY_EXCEPTION_LIST)), SIGNATURE_B64 + ) def test_signature_verifies_with_public_key(self): signer = ExceptionSigner(_private_key_pem()) @@ -138,14 +148,19 @@ def test_rejects_non_ed25519_key(self): with self.assertRaises(ValueError): ExceptionSigner(rsa_pem) - def test_sign_event_attaches_props_only_for_exceptions(self): + def test_sign_event_attaches_props_for_exceptions(self): signer = ExceptionSigner(_private_key_pem()) - event = {"event": "$exception", "properties": {"$exception_list": PARITY_EXCEPTION_LIST}} + event = { + "event": "$exception", + "properties": {"$exception_list": PARITY_EXCEPTION_LIST}, + } signer.sign_event(event) self.assertEqual(event["properties"][SIGNATURE_PROPERTY], SIGNATURE_B64) self.assertEqual(event["properties"][KEY_ID_PROPERTY], KEY_ID) self.assertEqual(event["properties"][VERSION_PROPERTY], 1) + def test_sign_event_passes_through_non_exceptions(self): + signer = ExceptionSigner(_private_key_pem()) other = {"event": "$pageview", "properties": {}} signer.sign_event(other) self.assertNotIn(SIGNATURE_PROPERTY, other["properties"]) @@ -210,6 +225,15 @@ def test_client_without_signing_adds_no_props(self): queued = self._enqueue_and_read(client) self.assertNotIn(SIGNATURE_PROPERTY, queued["properties"]) + def test_enabled_without_key_warns_and_does_not_sign(self): + with mock.patch.object(Client, "log") as log: + client = Client(FAKE_TEST_API_KEY, enable_exception_signing=True) + self.assertIsNone(client._exception_signer) + self.assertTrue( + any("UNSIGNED" in str(c.args) for c in log.warning.call_args_list), + "expected a warning that events will be sent unsigned", + ) + if __name__ == "__main__": unittest.main()