Skip to content

Commit b354aa1

Browse files
author
Delega Bot
committed
Add Python webhook verification helper
1 parent 1c2bafe commit b354aa1

3 files changed

Lines changed: 134 additions & 0 deletions

File tree

src/delega/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
DelegaRateLimitError,
1010
)
1111
from .models import Agent, Comment, Project, Task
12+
from .webhooks import verify_webhook
1213

1314
__version__ = "0.1.1"
1415

@@ -24,6 +25,7 @@
2425
"DelegaRateLimitError",
2526
"Project",
2627
"Task",
28+
"verify_webhook",
2729
]
2830

2931

src/delega/webhooks.py

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
"""Webhook verification helpers."""
2+
3+
from __future__ import annotations
4+
5+
import hashlib
6+
import hmac
7+
from datetime import datetime, timezone
8+
9+
10+
def _parse_timestamp(value: str) -> datetime:
11+
try:
12+
parsed = datetime.fromisoformat(value.replace("Z", "+00:00"))
13+
except ValueError as exc:
14+
raise ValueError("invalid timestamp") from exc
15+
16+
if parsed.tzinfo is None:
17+
return parsed.replace(tzinfo=timezone.utc)
18+
return parsed.astimezone(timezone.utc)
19+
20+
21+
def verify_webhook(
22+
payload: bytes,
23+
signature: str,
24+
timestamp: str,
25+
secret: str,
26+
tolerance_seconds: int = 300,
27+
) -> bool:
28+
"""Verify a Delega webhook signature.
29+
30+
Args:
31+
payload: Raw request body bytes.
32+
signature: Value of the X-Delega-Signature header.
33+
timestamp: Value of the X-Delega-Timestamp header.
34+
secret: Your webhook secret.
35+
tolerance_seconds: Max age in seconds.
36+
37+
Returns:
38+
True if the signature matches and the timestamp is within tolerance.
39+
40+
Raises:
41+
ValueError: If the signature format is invalid, the timestamp is stale,
42+
or the signature does not match.
43+
"""
44+
if not signature.startswith("sha256="):
45+
raise ValueError("bad signature format")
46+
47+
signature_hex = signature[len("sha256=") :]
48+
if len(signature_hex) != 64 or any(ch not in "0123456789abcdefABCDEF" for ch in signature_hex):
49+
raise ValueError("bad signature format")
50+
51+
received_at = _parse_timestamp(timestamp)
52+
age_seconds = abs((datetime.now(timezone.utc) - received_at).total_seconds())
53+
if age_seconds > tolerance_seconds:
54+
raise ValueError("stale timestamp")
55+
56+
expected = "sha256=" + hmac.new(
57+
secret.encode("utf-8"),
58+
timestamp.encode("utf-8") + b"." + payload,
59+
hashlib.sha256,
60+
).hexdigest()
61+
if not hmac.compare_digest(expected, signature):
62+
raise ValueError("signature mismatch")
63+
64+
return True

tests/test_webhooks.py

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
"""Webhook verification tests."""
2+
3+
from __future__ import annotations
4+
5+
import hashlib
6+
import hmac
7+
import unittest
8+
from datetime import datetime, timedelta, timezone
9+
10+
from delega import verify_webhook
11+
12+
13+
def _timestamp(delta: timedelta = timedelta()) -> str:
14+
return (datetime.now(timezone.utc) + delta).isoformat().replace("+00:00", "Z")
15+
16+
17+
def _signature(payload: bytes, timestamp: str, secret: str) -> str:
18+
digest = hmac.new(
19+
secret.encode("utf-8"),
20+
timestamp.encode("utf-8") + b"." + payload,
21+
hashlib.sha256,
22+
).hexdigest()
23+
return f"sha256={digest}"
24+
25+
26+
class TestVerifyWebhook(unittest.TestCase):
27+
def setUp(self) -> None:
28+
self.payload = b'{"event":"task.created","task":{"id":"abc123"}}'
29+
self.secret = "whsec_test_secret"
30+
31+
def test_verifies_valid_signature(self) -> None:
32+
timestamp = _timestamp()
33+
signature = _signature(self.payload, timestamp, self.secret)
34+
35+
self.assertTrue(
36+
verify_webhook(self.payload, signature, timestamp, self.secret)
37+
)
38+
39+
def test_rejects_bad_signature_format(self) -> None:
40+
timestamp = _timestamp()
41+
42+
with self.assertRaisesRegex(ValueError, "bad signature format"):
43+
verify_webhook(self.payload, "not-a-sha256", timestamp, self.secret)
44+
45+
def test_rejects_stale_timestamp(self) -> None:
46+
timestamp = _timestamp(timedelta(minutes=-6))
47+
signature = _signature(self.payload, timestamp, self.secret)
48+
49+
with self.assertRaisesRegex(ValueError, "stale timestamp"):
50+
verify_webhook(self.payload, signature, timestamp, self.secret)
51+
52+
def test_rejects_signature_mismatch(self) -> None:
53+
timestamp = _timestamp()
54+
signature = _signature(self.payload, timestamp, "wrong_secret")
55+
56+
with self.assertRaisesRegex(ValueError, "signature mismatch"):
57+
verify_webhook(self.payload, signature, timestamp, self.secret)
58+
59+
def test_rejects_invalid_timestamp(self) -> None:
60+
signature = _signature(self.payload, "not-a-timestamp", self.secret)
61+
62+
with self.assertRaisesRegex(ValueError, "invalid timestamp"):
63+
verify_webhook(
64+
self.payload,
65+
signature,
66+
"not-a-timestamp",
67+
self.secret,
68+
)

0 commit comments

Comments
 (0)