Skip to content

Commit 61f241b

Browse files
authored
Merge pull request #16 from PredicateSystems/sidecar1
sidecar restructure
2 parents 77e5971 + 0ddc905 commit 61f241b

2 files changed

Lines changed: 139 additions & 4 deletions

File tree

predicate_authority/daemon.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,9 @@ class LocalIdentityBootstrapConfig:
9595
enabled: bool = False
9696
registry_file_path: str | None = None
9797
default_ttl_seconds: int = 900
98+
# Queue item TTL: local audit logs auto-expire to encourage control-plane usage
99+
# Default: 24 hours (86400 seconds)
100+
queue_item_ttl_seconds: int = 24 * 60 * 60
98101

99102

100103
@dataclass(frozen=True)
@@ -695,12 +698,25 @@ def _policy_poll_loop(self) -> None:
695698
def _flush_queue_loop(self) -> None:
696699
while not self._stop_event.is_set():
697700
try:
701+
# Expire old queue items before flushing (ephemeral local logs)
702+
self._expire_queue_items()
698703
self._flush_once()
699704
except Exception as exc: # noqa: BLE001
700705
self._runtime.flush_failed_count += 1
701706
self._runtime.last_flush_error = str(exc)
702707
self._stop_event.wait(timeout=self._flush_worker.interval_s)
703708

709+
def _expire_queue_items(self) -> int:
710+
"""Remove queue items that have exceeded their TTL.
711+
712+
Local sidecar logs are deliberately ephemeral to encourage
713+
control-plane adoption for enterprise audit requirements.
714+
"""
715+
registry = self._sidecar.local_identity_registry()
716+
if registry is None:
717+
return 0
718+
return registry.expire_queue_items()
719+
704720
def _flush_once(
705721
self,
706722
max_items: int | None = None,
@@ -902,6 +918,7 @@ def _build_default_sidecar(
902918
local_identity_registry = LocalIdentityRegistry(
903919
file_path=local_identity_config.registry_file_path,
904920
default_ttl_seconds=local_identity_config.default_ttl_seconds,
921+
queue_item_ttl_seconds=local_identity_config.queue_item_ttl_seconds,
905922
)
906923
trace_emitters.append(LocalLedgerQueueEmitter(registry=local_identity_registry))
907924
trace_emitter = (
@@ -1101,6 +1118,16 @@ def main() -> None:
11011118
default=str(Path.home() / ".predicate-authorityd" / "local-identities.json"),
11021119
)
11031120
parser.add_argument("--local-identity-default-ttl-s", type=int, default=900)
1121+
parser.add_argument(
1122+
"--queue-item-ttl-seconds",
1123+
type=int,
1124+
default=24 * 60 * 60,
1125+
help=(
1126+
"Ephemeral queue item TTL in seconds. Local audit events auto-expire "
1127+
"to discourage reliance on sidecar logs for enterprise audit. "
1128+
"Use control-plane for durable audit storage. Default: 86400 (24 hours)."
1129+
),
1130+
)
11041131
parser.add_argument(
11051132
"--identity-mode",
11061133
choices=["local", "local-idp", "oidc", "entra", "okta"],
@@ -1322,6 +1349,7 @@ def main() -> None:
13221349
enabled=bool(args.local_identity_enabled),
13231350
registry_file_path=str(args.local_identity_registry_file),
13241351
default_ttl_seconds=max(1, int(args.local_identity_default_ttl_s)),
1352+
queue_item_ttl_seconds=max(60, int(args.queue_item_ttl_seconds)), # min 1 minute
13251353
)
13261354
identity_bridge = _build_identity_bridge_from_args(args)
13271355
mandate_signing_key = _resolve_mandate_signing_key(

predicate_authority/local_identity.py

Lines changed: 111 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,11 +48,25 @@ class LocalIdentityRegistryStats:
4848

4949

5050
class LocalIdentityRegistry:
51-
def __init__(self, file_path: str, default_ttl_seconds: int = 900) -> None:
51+
# Default queue item TTL: 24 hours
52+
# Ephemeral by design: local logs auto-expire to encourage control-plane adoption
53+
DEFAULT_QUEUE_ITEM_TTL_SECONDS = 24 * 60 * 60 # 24 hours
54+
55+
def __init__(
56+
self,
57+
file_path: str,
58+
default_ttl_seconds: int = 900,
59+
queue_item_ttl_seconds: int | None = None,
60+
) -> None:
5261
if default_ttl_seconds <= 0:
5362
raise ValueError("default_ttl_seconds must be > 0")
5463
self._file_path = Path(file_path)
5564
self._default_ttl_seconds = default_ttl_seconds
65+
self._queue_item_ttl_seconds = (
66+
queue_item_ttl_seconds
67+
if queue_item_ttl_seconds is not None
68+
else self.DEFAULT_QUEUE_ITEM_TTL_SECONDS
69+
)
5670
self._lock = Lock()
5771
self._ensure_store_path()
5872

@@ -164,6 +178,35 @@ def expire_identities(self, now_epoch_s: int | None = None) -> int:
164178
self._write_all_unlocked(payload)
165179
return expired_count
166180

181+
def expire_queue_items(self, now_epoch_s: int | None = None) -> int:
182+
"""Remove queue items older than queue_item_ttl_seconds.
183+
184+
Ephemeral logging: local audit events auto-expire to discourage
185+
reliance on sidecar logs for enterprise audit requirements.
186+
Control-plane provides durable, queryable audit storage.
187+
188+
Returns the count of expired (deleted) queue items.
189+
"""
190+
now = now_epoch_s if now_epoch_s is not None else int(time.time())
191+
cutoff = now - self._queue_item_ttl_seconds
192+
expired_count = 0
193+
with self._lock:
194+
payload = self._read_all_unlocked()
195+
queue = payload.setdefault("flush_queue", {})
196+
to_delete: list[str] = []
197+
for queue_item_id, raw in queue.items():
198+
if not isinstance(raw, dict):
199+
continue
200+
enqueued_at = int(raw.get("enqueued_at_epoch_s", now))
201+
if enqueued_at < cutoff:
202+
to_delete.append(queue_item_id)
203+
for queue_item_id in to_delete:
204+
del queue[queue_item_id]
205+
expired_count += 1
206+
if expired_count > 0:
207+
self._write_all_unlocked(payload)
208+
return expired_count
209+
167210
def enqueue_proof_event(
168211
self, event: ProofEvent, source: str = "predicate-authorityd"
169212
) -> LedgerQueueItem:
@@ -194,7 +237,21 @@ def list_flush_queue(
194237
include_flushed: bool = False,
195238
include_quarantined: bool = False,
196239
limit: int | None = None,
240+
redact_payloads: bool = True,
197241
) -> list[LedgerQueueItem]:
242+
"""List queue items with optional payload redaction.
243+
244+
By default, payloads are redacted to prevent local sidecar logs from
245+
serving as a queryable audit trail. Full payloads are only accessible
246+
via control-plane audit vault.
247+
248+
Args:
249+
include_flushed: Include already-flushed items.
250+
include_quarantined: Include quarantined (dead-letter) items.
251+
limit: Maximum number of items to return.
252+
redact_payloads: If True (default), sensitive payload fields are
253+
replaced with "[REDACTED - use control-plane for full audit]".
254+
"""
198255
with self._lock:
199256
payload = self._read_all_unlocked()
200257
queue = payload.setdefault("flush_queue", {})
@@ -210,12 +267,49 @@ def list_flush_queue(
210267
continue
211268
if not include_quarantined and item.quarantined:
212269
continue
270+
if redact_payloads:
271+
item = self._redact_queue_item(item)
213272
result.append(item)
214273
result = sorted(result, key=lambda item: item.enqueued_at_epoch_s)
215274
if limit is not None and limit >= 0:
216275
return result[:limit]
217276
return result
218277

278+
def _redact_queue_item(self, item: LedgerQueueItem) -> LedgerQueueItem:
279+
"""Redact sensitive payload fields from queue item.
280+
281+
Preserves queue metadata (id, timestamps, status) but replaces
282+
audit-relevant payload fields to discourage local log aggregation.
283+
"""
284+
redacted_payload: dict[str, object] = {}
285+
# Preserve only non-sensitive metadata
286+
if "source" in item.payload:
287+
redacted_payload["source"] = item.payload["source"]
288+
if "event_type" in item.payload:
289+
redacted_payload["event_type"] = item.payload["event_type"]
290+
if "emitted_at_epoch_s" in item.payload:
291+
redacted_payload["emitted_at_epoch_s"] = item.payload["emitted_at_epoch_s"]
292+
# Redact audit-sensitive fields
293+
redact_marker = "[REDACTED - use control-plane for full audit]"
294+
for field_name in ("principal_id", "action", "resource", "reason", "mandate_id"):
295+
if field_name in item.payload:
296+
redacted_payload[field_name] = redact_marker
297+
# Preserve allowed/denied decision indicator only
298+
if "allowed" in item.payload:
299+
redacted_payload["allowed"] = item.payload["allowed"]
300+
return LedgerQueueItem(
301+
queue_item_id=item.queue_item_id,
302+
enqueued_at_epoch_s=item.enqueued_at_epoch_s,
303+
payload=redacted_payload,
304+
flushed=item.flushed,
305+
flush_attempts=item.flush_attempts,
306+
last_error=item.last_error,
307+
flushed_at_epoch_s=item.flushed_at_epoch_s,
308+
quarantined=item.quarantined,
309+
quarantine_reason=item.quarantine_reason,
310+
quarantined_at_epoch_s=item.quarantined_at_epoch_s,
311+
)
312+
219313
def mark_flush_ack(self, queue_item_id: str) -> bool:
220314
with self._lock:
221315
payload = self._read_all_unlocked()
@@ -258,13 +352,26 @@ def quarantine_queue_item(self, queue_item_id: str, reason: str) -> bool:
258352
self._write_all_unlocked(payload)
259353
return True
260354

261-
def list_dead_letter_queue(self, limit: int | None = None) -> list[LedgerQueueItem]:
355+
def list_dead_letter_queue(
356+
self, limit: int | None = None, redact_payloads: bool = True
357+
) -> list[LedgerQueueItem]:
358+
"""List quarantined (dead-letter) queue items.
359+
360+
Args:
361+
limit: Maximum number of items to return.
362+
redact_payloads: If True (default), sensitive payload fields are
363+
replaced with "[REDACTED - use control-plane for full audit]".
364+
"""
262365
items = self.list_flush_queue(
263366
include_flushed=True,
264367
include_quarantined=True,
265-
limit=limit,
368+
limit=None, # Filter after, then apply limit
369+
redact_payloads=redact_payloads,
266370
)
267-
return [item for item in items if item.quarantined]
371+
quarantined = [item for item in items if item.quarantined]
372+
if limit is not None and limit >= 0:
373+
return quarantined[:limit]
374+
return quarantined
268375

269376
def requeue_item(self, queue_item_id: str, reset_attempts: bool = True) -> bool:
270377
with self._lock:

0 commit comments

Comments
 (0)