Skip to content

Commit e541a02

Browse files
authored
Merge pull request #10 from PredicateSystems/rust
Phase 3 hardening
2 parents d28ab01 + 78ce18a commit e541a02

13 files changed

Lines changed: 1029 additions & 27 deletions

README.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,12 @@ predicate-authorityd \
172172

173173
`predicate-authority` provides an ops-focused CLI for sidecar/runtime workflows.
174174

175+
### Ops docs quick links
176+
177+
- Sidecar operations guide: `docs/authorityd-operations.md`
178+
- User manual (sync/integrity/operator behaviors): `docs/predicate-authority-user-manual.md`
179+
- Control-plane production hardening runbook: `../predicate-authority-control-plane/docs/production-hardening-runbook.md`
180+
175181
### Sidecar health and status
176182

177183
```bash

docs/authorityd-operations.md

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,40 @@ PYTHONPATH=. predicate-authorityd \
6363
--control-plane-fail-open
6464
```
6565

66+
### Optional: enable long-poll policy/revocation sync from control-plane
67+
68+
Use this when running `cloud_connected` mode and you want active policy/revocation
69+
updates pushed through long-poll sync instead of waiting for file-based policy polling.
70+
71+
```bash
72+
export CONTROL_PLANE_URL="http://127.0.0.1:8080"
73+
export CONTROL_PLANE_TENANT_ID="dev-tenant"
74+
export CONTROL_PLANE_PROJECT_ID="dev-project"
75+
export CONTROL_PLANE_AUTH_TOKEN="<bearer-token>"
76+
77+
PYTHONPATH=. predicate-authorityd \
78+
--host 127.0.0.1 \
79+
--port 8787 \
80+
--mode cloud_connected \
81+
--policy-file examples/authorityd/policy.json \
82+
--control-plane-enabled \
83+
--control-plane-sync-enabled \
84+
--control-plane-sync-project-id "$CONTROL_PLANE_PROJECT_ID" \
85+
--control-plane-sync-environment "prod" \
86+
--control-plane-sync-wait-timeout-s 15 \
87+
--control-plane-sync-poll-interval-ms 200
88+
```
89+
90+
Quick checks:
91+
92+
```bash
93+
# daemon sync health counters
94+
curl -s http://127.0.0.1:8787/status | jq '.control_plane_sync_poll_count, .control_plane_sync_update_count, .control_plane_sync_error_count, .control_plane_last_sync_error'
95+
96+
# daemon metrics includes control-plane sync counters
97+
curl -s http://127.0.0.1:8787/metrics | rg "predicate_authority_control_plane_sync_total"
98+
```
99+
66100
### Signing key safety note (required until mandate `v2` claims)
67101

68102
Until mandate `v2` introduces explicit `iss`/`aud` claims and asymmetric signing defaults,

docs/predicate-authority-user-manual.md

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -441,6 +441,46 @@ curl -s -X POST http://127.0.0.1:8787/ledger/requeue \
441441

442442
---
443443

444+
## Control-plane sync and integrity quick checks
445+
446+
If you run `predicate-authorityd` with control-plane enabled, you can also enable
447+
long-poll sync to pull policy/revocation updates continuously:
448+
449+
```bash
450+
predicate-authorityd \
451+
--mode cloud_connected \
452+
--policy-file examples/authorityd/policy.json \
453+
--control-plane-enabled \
454+
--control-plane-sync-enabled \
455+
--control-plane-sync-project-id "dev-project" \
456+
--control-plane-sync-environment "prod"
457+
```
458+
459+
Check sync counters from daemon:
460+
461+
```bash
462+
curl -s http://127.0.0.1:8787/status | jq '.control_plane_sync_poll_count, .control_plane_sync_update_count, .control_plane_sync_error_count'
463+
```
464+
465+
From control-plane, verify tamper-evident audit integrity endpoints:
466+
467+
```bash
468+
curl -s "http://127.0.0.1:8080/v1/audit/integrity/root?tenant_id=tenant-a" \
469+
-H "Authorization: Bearer $TOKEN" | jq
470+
471+
curl -s "http://127.0.0.1:8080/v1/audit/integrity/proof/<event_id>?tenant_id=tenant-a" \
472+
-H "Authorization: Bearer $TOKEN" | jq
473+
```
474+
475+
Operational notes:
476+
477+
- control-plane may return `503 store_circuit_open:<operation>` during upstream DB distress,
478+
- if Kafka streaming is enabled in fail-closed mode, event-stream outages can return
479+
`503 event_stream_unavailable:<topic>`,
480+
- in fail-open mode, core authority decisions continue even if stream publish fails.
481+
482+
---
483+
444484
## `sdk-python` integration example (boundary adapter flow)
445485

446486
If your web agent uses `sdk-python`, build shared contract evidence before

predicate_authority/control_plane.py

Lines changed: 145 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from collections.abc import Mapping
88
from dataclasses import asdict, dataclass
99
from datetime import datetime, timezone
10-
from urllib.parse import urlsplit
10+
from urllib.parse import urlencode, urlsplit
1111

1212
from predicate_contracts import ProofEvent
1313

@@ -22,6 +22,11 @@ class ControlPlaneClientConfig:
2222
max_retries: int = 2
2323
backoff_initial_s: float = 0.2
2424
fail_open: bool = True
25+
sync_enabled: bool = False
26+
sync_wait_timeout_s: float = 15.0
27+
sync_poll_interval_ms: int = 200
28+
sync_project_id: str | None = None
29+
sync_environment: str | None = None
2530

2631

2732
@dataclass(frozen=True)
@@ -80,6 +85,89 @@ def authority_check(tenant_id: str, project_id: str, credits: int = 1) -> UsageC
8085
)
8186

8287

88+
@dataclass(frozen=True)
89+
class RemoteRevocation:
90+
revocation_id: str
91+
type: str
92+
principal_id: str | None = None
93+
intent_hash: str | None = None
94+
tags: tuple[str, ...] = ()
95+
reason: str | None = None
96+
created_at: str = ""
97+
98+
99+
@dataclass(frozen=True)
100+
class AuthoritySyncSnapshot:
101+
changed: bool
102+
sync_token: str
103+
tenant_id: str
104+
project_id: str | None = None
105+
environment: str | None = None
106+
policy_id: str | None = None
107+
policy_revision: int | None = None
108+
policy_document: dict[str, object] | None = None
109+
revocations: tuple[RemoteRevocation, ...] = ()
110+
111+
@staticmethod
112+
def from_payload(payload: Mapping[str, object]) -> AuthoritySyncSnapshot:
113+
revocations_payload = payload.get("revocations")
114+
parsed_revocations: list[RemoteRevocation] = []
115+
if isinstance(revocations_payload, list):
116+
for item in revocations_payload:
117+
if not isinstance(item, Mapping):
118+
continue
119+
raw_tags = item.get("tags")
120+
tags: tuple[str, ...] = ()
121+
if isinstance(raw_tags, list):
122+
tags = tuple(str(tag) for tag in raw_tags if isinstance(tag, str))
123+
parsed_revocations.append(
124+
RemoteRevocation(
125+
revocation_id=str(item.get("revocation_id", "")),
126+
type=str(item.get("type", "")),
127+
principal_id=(
128+
str(item["principal_id"])
129+
if isinstance(item.get("principal_id"), str)
130+
else None
131+
),
132+
intent_hash=(
133+
str(item["intent_hash"])
134+
if isinstance(item.get("intent_hash"), str)
135+
else None
136+
),
137+
tags=tags,
138+
reason=str(item["reason"]) if isinstance(item.get("reason"), str) else None,
139+
created_at=str(item.get("created_at", "")),
140+
)
141+
)
142+
policy_document = payload.get("policy_document")
143+
raw_policy_revision = payload.get("policy_revision")
144+
policy_revision: int | None = None
145+
if isinstance(raw_policy_revision, int):
146+
policy_revision = raw_policy_revision
147+
elif isinstance(raw_policy_revision, str) and raw_policy_revision.strip() != "":
148+
try:
149+
policy_revision = int(raw_policy_revision)
150+
except ValueError:
151+
policy_revision = None
152+
return AuthoritySyncSnapshot(
153+
changed=bool(payload.get("changed", False)),
154+
sync_token=str(payload.get("sync_token", "")),
155+
tenant_id=str(payload.get("tenant_id", "")),
156+
project_id=(
157+
str(payload["project_id"]) if isinstance(payload.get("project_id"), str) else None
158+
),
159+
environment=(
160+
str(payload["environment"]) if isinstance(payload.get("environment"), str) else None
161+
),
162+
policy_id=(
163+
str(payload["policy_id"]) if isinstance(payload.get("policy_id"), str) else None
164+
),
165+
policy_revision=policy_revision,
166+
policy_document=(dict(policy_document) if isinstance(policy_document, dict) else None),
167+
revocations=tuple(parsed_revocations),
168+
)
169+
170+
83171
class ControlPlaneClient:
84172
def __init__(self, config: ControlPlaneClientConfig) -> None:
85173
self.config = config
@@ -100,6 +188,29 @@ def send_usage_records(self, records: tuple[UsageCreditRecord, ...]) -> bool:
100188
def send_audit_payload(self, payload: Mapping[str, object]) -> bool:
101189
return self._post_json("/v1/audit/events:batch", payload)
102190

191+
def poll_authority_updates(
192+
self,
193+
current_token: str | None,
194+
wait_timeout_s: float = 15.0,
195+
poll_interval_ms: int = 200,
196+
project_id: str | None = None,
197+
environment: str | None = None,
198+
) -> AuthoritySyncSnapshot:
199+
query: dict[str, str | float | int] = {
200+
"tenant_id": self.config.tenant_id,
201+
"wait_timeout_s": max(0.0, float(wait_timeout_s)),
202+
"poll_interval_ms": max(50, int(poll_interval_ms)),
203+
}
204+
if current_token is not None and current_token.strip() != "":
205+
query["current_token"] = current_token
206+
if project_id is not None and project_id.strip() != "":
207+
query["project_id"] = project_id
208+
if environment is not None and environment.strip() != "":
209+
query["environment"] = environment
210+
path = "/v1/sync/authority-updates?" + urlencode(query)
211+
payload = self._get_json(path)
212+
return AuthoritySyncSnapshot.from_payload(payload)
213+
103214
def _post_json(self, path: str, payload: Mapping[str, object]) -> bool:
104215
attempts = self.config.max_retries + 1
105216
for attempt in range(attempts):
@@ -115,6 +226,20 @@ def _post_json(self, path: str, payload: Mapping[str, object]) -> bool:
115226
time.sleep(self.config.backoff_initial_s * (2**attempt))
116227
return False
117228

229+
def _get_json(self, path: str) -> Mapping[str, object]:
230+
attempts = self.config.max_retries + 1
231+
for attempt in range(attempts):
232+
try:
233+
return self._get_json_once(path)
234+
except Exception as exc:
235+
is_last_attempt = attempt == attempts - 1
236+
if is_last_attempt:
237+
if self.config.fail_open:
238+
return {}
239+
raise RuntimeError(f"control-plane request failed: {path}") from exc
240+
time.sleep(self.config.backoff_initial_s * (2**attempt))
241+
return {}
242+
118243
def _post_json_once(self, path: str, payload: Mapping[str, object]) -> None:
119244
target_path = path if path.startswith("/") else f"/{path}"
120245
connection = self._new_connection()
@@ -131,6 +256,25 @@ def _post_json_once(self, path: str, payload: Mapping[str, object]) -> None:
131256
if response.status >= 400:
132257
raise RuntimeError(f"HTTP {response.status}: {content}")
133258

259+
def _get_json_once(self, path: str) -> Mapping[str, object]:
260+
target_path = path if path.startswith("/") else f"/{path}"
261+
connection = self._new_connection()
262+
headers: dict[str, str] = {}
263+
if self.config.auth_token:
264+
headers["Authorization"] = f"Bearer {self.config.auth_token}"
265+
try:
266+
connection.request("GET", target_path, headers=headers)
267+
response = connection.getresponse()
268+
content = response.read().decode("utf-8")
269+
finally:
270+
connection.close()
271+
if response.status >= 400:
272+
raise RuntimeError(f"HTTP {response.status}: {content}")
273+
loaded = json.loads(content) if content.strip() != "" else {}
274+
if not isinstance(loaded, dict):
275+
raise RuntimeError("Expected object JSON payload from control-plane GET response.")
276+
return loaded
277+
134278
def _new_connection(self) -> http.client.HTTPConnection:
135279
if self._base.scheme == "https":
136280
return http.client.HTTPSConnection(self._base.netloc, timeout=self.config.timeout_s)

0 commit comments

Comments
 (0)