Skip to content
Merged
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added
- Sliding-window rate limiting in `DefaultPolicyEngine` per `(principal_id, capability_id)` pair (#39).
Default limits by safety class: 60 READ / 10 WRITE / 2 DESTRUCTIVE per 60s window.
Service-role principals get 10× limits. Configurable via constructor.

## [0.3.0] - 2026-03-09

### Added
Expand Down
6 changes: 5 additions & 1 deletion docs/security.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,8 @@ Consider an agent that obtains a token for `billing.list_invoices` then passes i
- The `AGENT_KERNEL_SECRET` must be kept secret. Rotate it if compromised.
- The default `InMemoryDriver` has no persistence — suitable for testing only.
- PII redaction is heuristic (regex-based). It is not a substitute for proper data governance.
- There is no rate limiting or quota enforcement in v0.1.
- Rate limiting is enforced per `(principal_id, capability_id)` pair using a sliding window.
Default limits: 60 READ / 10 WRITE / 2 DESTRUCTIVE invocations per 60-second window.
Principals with the `"service"` role receive 10× the default limits. Limits are
configurable via `DefaultPolicyEngine(rate_limits=...)`. There is no distributed or
persistent rate-limit state — limits reset on process restart.
97 changes: 97 additions & 0 deletions src/agent_kernel/policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
from __future__ import annotations

import logging
import time
from collections import defaultdict
from collections.abc import Callable
from dataclasses import dataclass
from typing import Any, Protocol

from .enums import SafetyClass, SensitivityTag
Expand All @@ -18,6 +22,63 @@
_MAX_ROWS_USER = 50
_MAX_ROWS_SERVICE = 500

# Default rate limits per safety class: (invocations, window_seconds).
_DEFAULT_RATE_LIMITS: dict[SafetyClass, tuple[int, float]] = {
SafetyClass.READ: (60, 60.0),
SafetyClass.WRITE: (10, 60.0),
SafetyClass.DESTRUCTIVE: (2, 60.0),
}

# Service role multiplier for rate limits.
_SERVICE_RATE_MULTIPLIER = 10


@dataclass(slots=True)
class _RateEntry:
"""Timestamps for a single rate-limit key."""

timestamps: list[float]


class RateLimiter:
"""Sliding-window rate limiter using monotonic clock.

Args:
clock: Callable returning the current time in seconds.
Defaults to :func:`time.monotonic`.
"""

def __init__(self, clock: Callable[[], float] | None = None) -> None:
self._clock = clock or time.monotonic
self._windows: dict[str, _RateEntry] = defaultdict(lambda: _RateEntry(timestamps=[]))

def check(self, key: str, limit: int, window_seconds: float) -> bool:
"""Return ``True`` if the next invocation would be within the limit.

Prunes expired timestamps as a side-effect.

Args:
key: Rate-limit key (e.g. ``"principal:capability"``).
limit: Maximum allowed invocations per window.
window_seconds: Sliding window duration in seconds.

Returns:
``True`` if under limit, ``False`` if limit would be exceeded.
"""
now = self._clock()
cutoff = now - window_seconds
entry = self._windows[key]
entry.timestamps = [t for t in entry.timestamps if t > cutoff]
Comment thread
dgenio marked this conversation as resolved.
return len(entry.timestamps) < limit
Comment thread
dgenio marked this conversation as resolved.

def record(self, key: str) -> None:
"""Record an invocation for *key*.

Args:
key: Rate-limit key.
"""
self._windows[key].timestamps.append(self._clock())


class PolicyEngine(Protocol):
"""Interface for a policy engine."""
Expand Down Expand Up @@ -61,8 +122,28 @@ class DefaultPolicyEngine:
``"secrets_reader"`` and a justification of at least 15 characters.
6. **max_rows** — 50 for regular users; 500 for principals with the
``"service"`` role.
7. **Rate limiting** — sliding-window rate limit per
``(principal_id, capability_id)`` pair, with defaults by safety class.
Principals with the ``"service"`` role get 10× the default limits.
"""

def __init__(
self,
*,
rate_limits: dict[SafetyClass, tuple[int, float]] | None = None,
clock: Callable[[], float] | None = None,
) -> None:
"""Initialise the policy engine.

Args:
rate_limits: Override default rate limits per safety class.
Each value is ``(max_invocations, window_seconds)``.
clock: Monotonic clock callable for rate-limiter.
Defaults to :func:`time.monotonic`.
"""
self._rate_limits = rate_limits if rate_limits is not None else dict(_DEFAULT_RATE_LIMITS)
Comment thread
dgenio marked this conversation as resolved.
Outdated
self._limiter = RateLimiter(clock=clock)

@staticmethod
def _deny(reason: str, *, principal_id: str, capability_id: str) -> PolicyDenied:
"""Log a policy denial at WARNING and return the exception to raise."""
Expand Down Expand Up @@ -197,6 +278,22 @@ def evaluate(
else:
constraints["max_rows"] = max_rows

# ── Rate limiting ─────────────────────────────────────────────────

rate_key = f"{pid}:{cid}"
if capability.safety_class in self._rate_limits:
limit, window = self._rate_limits[capability.safety_class]
if "service" in roles:
limit *= _SERVICE_RATE_MULTIPLIER
if not self._limiter.check(rate_key, limit, window):
raise self._deny(
f"Rate limit exceeded: {limit} {capability.safety_class.value} "
f"invocations per {window}s for principal '{pid}'",
principal_id=pid,
capability_id=cid,
)
self._limiter.record(rate_key)

reason = "Request approved by DefaultPolicyEngine."
logger.info(
"policy_allowed",
Expand Down
141 changes: 141 additions & 0 deletions tests/test_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
SensitivityTag,
)
from agent_kernel.models import CapabilityRequest
from agent_kernel.policy import RateLimiter


def _req(cap_id: str, **constraints: object) -> CapabilityRequest:
Expand Down Expand Up @@ -292,3 +293,143 @@ def test_max_rows_negative_clamped_to_zero() -> None:
_req("cap.r", max_rows=-10), _cap("cap.r", SafetyClass.READ), p, justification=""
)
assert dec.constraints["max_rows"] == 0


# ── Rate limiting ─────────────────────────────────────────────────────────────────


def _make_clock(start: float = 0.0) -> tuple[list[float], callable]:
"""Return a controllable clock: (time_ref, clock_fn).

Advance time by mutating ``time_ref[0]``.
"""
time_ref = [start]
return time_ref, lambda: time_ref[0]
Comment thread
dgenio marked this conversation as resolved.
Outdated


def test_rate_limiter_under_limit() -> None:
"""Requests within the limit are allowed."""
t, clock = _make_clock()
Comment thread
dgenio marked this conversation as resolved.
Outdated
limiter = RateLimiter(clock=clock)
for _ in range(5):
assert limiter.check("k", 5, 60.0) is True
limiter.record("k")
# 6th should be denied
assert limiter.check("k", 5, 60.0) is False


def test_rate_limiter_window_expires() -> None:
"""Old entries expire and free up capacity."""
t, clock = _make_clock(0.0)
limiter = RateLimiter(clock=clock)
# Fill window
for _ in range(5):
limiter.check("k", 5, 60.0)
limiter.record("k")
assert limiter.check("k", 5, 60.0) is False
# Advance past window
t[0] = 61.0
assert limiter.check("k", 5, 60.0) is True


def test_read_rate_limit_exceeded() -> None:
"""61st READ invocation in 60s raises PolicyDenied."""
t, clock = _make_clock()
eng = DefaultPolicyEngine(clock=clock)
Comment thread
dgenio marked this conversation as resolved.
Outdated
p = Principal(principal_id="u1")
cap = _cap("cap.r", SafetyClass.READ)
for _ in range(60):
eng.evaluate(_req("cap.r"), cap, p, justification="")
with pytest.raises(PolicyDenied, match="Rate limit exceeded"):
eng.evaluate(_req("cap.r"), cap, p, justification="")


def test_write_rate_limit_exceeded() -> None:
"""11th WRITE invocation in 60s raises PolicyDenied."""
t, clock = _make_clock()
eng = DefaultPolicyEngine(clock=clock)
p = Principal(principal_id="u1", roles=["writer"])
Comment thread
dgenio marked this conversation as resolved.
Outdated
cap = _cap("cap.w", SafetyClass.WRITE)
just = "this is a long enough justification string"
for _ in range(10):
eng.evaluate(_req("cap.w"), cap, p, justification=just)
with pytest.raises(PolicyDenied, match="Rate limit exceeded"):
eng.evaluate(_req("cap.w"), cap, p, justification=just)


def test_destructive_rate_limit_exceeded() -> None:
"""3rd DESTRUCTIVE invocation in 60s raises PolicyDenied."""
t, clock = _make_clock()
eng = DefaultPolicyEngine(clock=clock)
p = Principal(principal_id="u1", roles=["admin"])
Comment thread
dgenio marked this conversation as resolved.
Outdated
cap = _cap("cap.d", SafetyClass.DESTRUCTIVE)
just = "long enough justification"
for _ in range(2):
eng.evaluate(_req("cap.d"), cap, p, justification=just)
with pytest.raises(PolicyDenied, match="Rate limit exceeded"):
eng.evaluate(_req("cap.d"), cap, p, justification=just)


def test_rate_limit_per_principal_capability_pair() -> None:
"""Rate limits are scoped to (principal_id, capability_id), not global."""
t, clock = _make_clock()
eng = DefaultPolicyEngine(clock=clock)
p1 = Principal(principal_id="u1")
Comment thread
dgenio marked this conversation as resolved.
Outdated
p2 = Principal(principal_id="u2")
cap = _cap("cap.r", SafetyClass.READ)
# Exhaust u1's limit
for _ in range(60):
eng.evaluate(_req("cap.r"), cap, p1, justification="")
with pytest.raises(PolicyDenied, match="Rate limit exceeded"):
eng.evaluate(_req("cap.r"), cap, p1, justification="")
# u2 is unaffected
eng.evaluate(_req("cap.r"), cap, p2, justification="")


def test_service_role_gets_10x_limit() -> None:
"""Principals with 'service' role get 10x the default rate limits."""
t, clock = _make_clock()
eng = DefaultPolicyEngine(clock=clock)
p = Principal(principal_id="svc1", roles=["service"])
Comment thread
dgenio marked this conversation as resolved.
Outdated
cap = _cap("cap.r", SafetyClass.READ)
# Default READ is 60; service gets 600
for _ in range(600):
eng.evaluate(_req("cap.r"), cap, p, justification="")
with pytest.raises(PolicyDenied, match="Rate limit exceeded"):
eng.evaluate(_req("cap.r"), cap, p, justification="")


def test_rate_limit_configurable() -> None:
"""Rate limits are configurable via constructor."""
t, clock = _make_clock()
eng = DefaultPolicyEngine(
rate_limits={SafetyClass.READ: (3, 10.0)},
Comment thread
dgenio marked this conversation as resolved.
Outdated
clock=clock,
)
p = Principal(principal_id="u1")
cap = _cap("cap.r", SafetyClass.READ)
for _ in range(3):
eng.evaluate(_req("cap.r"), cap, p, justification="")
with pytest.raises(PolicyDenied, match="Rate limit exceeded"):
eng.evaluate(_req("cap.r"), cap, p, justification="")


def test_rate_limit_window_slides() -> None:
"""Old entries expire, allowing new invocations after the window slides."""
t, clock = _make_clock(0.0)
eng = DefaultPolicyEngine(
rate_limits={SafetyClass.READ: (2, 10.0)},
clock=clock,
)
p = Principal(principal_id="u1")
cap = _cap("cap.r", SafetyClass.READ)
# Use both
eng.evaluate(_req("cap.r"), cap, p, justification="")
t[0] = 5.0
eng.evaluate(_req("cap.r"), cap, p, justification="")
# Blocked
with pytest.raises(PolicyDenied, match="Rate limit exceeded"):
eng.evaluate(_req("cap.r"), cap, p, justification="")
# Advance past first entry's window
t[0] = 11.0
eng.evaluate(_req("cap.r"), cap, p, justification="") # should succeed
Loading