diff --git a/AGENTS.md b/AGENTS.md index 0fa41ec..1781ed7 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -161,6 +161,67 @@ user, don't loop. --- +## Dry-run before writes + +Before any `post` / `patch` / `delete` / `attach upload`, run with `--dry-run` +and `-f json` to get a structured preview the user can sanity-check: + +```bash +bcli --dry-run -f json post customers --data '{"displayName": "Test"}' +``` + +The response is a JSON envelope: + +```json +{ + "dry_run": true, + "method": "POST", + "endpoint": "customers", + "resolved_url": "https://.../api/v2.0/companies()/customers", + "profile": "dev", + "environment": "Sandbox", + "company_id": "", + "body": {"displayName": "Test"} +} +``` + +`PATCH` and `DELETE` envelopes also include `record_id`. The shape is stable — +field names won't change. Use it to: + +* Show the user the resolved URL + body before they approve a real write. +* Catch typos in the endpoint name before any HTTP call goes out. +* Verify the right environment / company is targeted. + +## Caution levels for write endpoints + +Every endpoint exposes a `caution` level (`low` / `medium` / `high`) via +`bcli endpoint info` and the `list_endpoints` MCP tool. Endpoints whose name +contains a mutation verb (`post`, `release`, `cancel`, `void`, `reverse`, +`apply`, `unapply`) are flagged `high` automatically. Treat `high` as: "do +not write without explicit user confirmation, even if the user previously +authorised a similar action." Examples: + +* `customers` → `low` (CRUD on a master-data record) +* `salesInvoicePost` → `high` (irreversibly posts an invoice) +* `customerLedgerEntryApply` → `high` (modifies posted ledger state) + +## Audit log location + +When the user has `[audit] enabled = true` in `~/.config/bcli/config.toml`, +every write you trigger appends a JSON line to +`~/.config/bcli/audit/.jsonl` (or whatever the user configured). +Each entry includes `outcome` (`completed` / `failed` / `dry_run`), +`correlation_id` (BC's `x-ms-correlation-request-id`), `latency_ms`, and the +redacted request body. Useful for: + +* Showing the user "here's what just happened" after a multi-step task. +* Grepping for the BC correlation ID when debugging a 500 the user reported. +* Reconciling intent (`dry_run` entries) against actual writes. + +You don't need to enable the log yourself — it's the user's choice. If they +ask "did that POST go through?" the audit log is the canonical answer when it's +on, and the CLI exit code is the answer when it's off. + ## When you have an MCP server If the user has mounted `bcli-mcp` (see [`docs/mcp-server.md`](docs/mcp-server.md)), diff --git a/CHANGELOG.md b/CHANGELOG.md index 1ffb6d5..3417fcb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,36 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [0.2.0] — 2026-05-06 + +### Added + +- **Structured `--dry-run` output** — write commands (`post`, `patch`, + `delete`, `attach upload`) now emit a stable JSON envelope on stdout + when `--format json` / `ndjson` / `raw` is selected. Includes + `dry_run`, `method`, `endpoint`, `resolved_url`, `profile`, + `environment`, `company_id`, `body`, and `record_id` (when applicable). + Agents can parse the envelope before deciding whether to proceed. The + human format keeps the same yellow rich panel on stderr but is now + augmented with the resolved URL and profile context. See + `docs/write-operations.md`. +- **Opt-in audit log** — new `[audit]` config section persists every + write to a per-profile JSONL file. Each entry captures the resolved + URL, response status, BC `correlation_id`, latency, redacted request + body, and outcome (`completed` / `failed` / `dry_run`). Bounded disk + usage via single-backup rotation. SDK (`AsyncBCClient`) does NOT + auto-emit; this is a CLI-layer ergonomic on top of BC permission sets. + See `docs/configuration.md#audit-log`. +- **Endpoint `caution` flag** — `EndpointMetadata` now carries a + `caution: low | medium | high` level. Importers populate it + automatically from a verb-name heuristic (entities containing `post`, + `release`, `cancel`, `void`, `reverse`, `apply`, `unapply` are flagged + `high`). Surfaced in `bcli endpoint info` and the `list_endpoints` MCP + tool so agents can require explicit user confirmation before mutating + posted/closed records. +- New `AGENTS.md` recipes for dry-run-first writes, caution-level + interpretation, and audit-log location. + ## [0.1.5] — 2026-05-05 ### Added diff --git a/docs/configuration.md b/docs/configuration.md index c41470f..ff52d3f 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -127,6 +127,69 @@ api_version = "v1.0" Imported endpoint registries are preferred; route defaults are only an escape hatch for ad-hoc access. +## Audit Log + +Optional, opt-in audit trail for write operations. When enabled, every CLI write +(POST / PATCH / DELETE / attach upload) appends one JSONL line to a per-profile +file. Captures the resolved URL, response status, BC correlation ID, latency, +and outcome — enough to reconstruct what happened (and what was attempted). + +Add an `[audit]` block to `~/.config/bcli/config.toml`: + +```toml +[audit] +enabled = true +backend = "jsonl" +path = "~/.config/bcli/audit/{profile}.jsonl" # default; {profile} interpolated +max_size_mb = 50 +include_reads = false +redact_keys = ["password", "secret", "token", "key", "apiKey", "authorization"] +``` + +| Setting | Default | What it does | +|---------|---------|--------------| +| `enabled` | `false` | Master switch. When false, the audit code path is a no-op. | +| `backend` | `"jsonl"` | `"jsonl"` (file) or `"null"` (drop). | +| `path` | `~/.config/bcli/audit/{profile}.jsonl` | File location. `{profile}` is interpolated to the active profile name. | +| `max_size_mb` | `50` | Rotation threshold. When the file exceeds this, the existing content moves to `.1` and a fresh file is started. Only one backup is kept. | +| `include_reads` | `false` | Reserved for a future release; currently writes only. | +| `redact_keys` | `["password", "secret", ...]` | Substring-matched (case-insensitive) on request-body field names. Matched values are replaced with `***REDACTED***` before write. | + +Each entry is one JSON object with the following keys: + +```json +{ + "ts": "2026-05-06T10:00:00Z", + "profile": "production", + "environment": "Production", + "company_id": "", + "method": "POST", + "endpoint": "customers", + "resolved_url": "https://api.businesscentral.dynamics.com/.../customers", + "record_id": null, + "request_body": {"displayName": "Test"}, + "status": 201, + "correlation_id": "abc-...", + "latency_ms": 312, + "cli_version": "0.2.0", + "caller": "cli", + "outcome": "completed", + "error": null +} +``` + +`outcome` is one of: + +- `completed` — the write succeeded. +- `failed` — the write raised; `status`, `correlation_id`, and `error` capture + what BC said. +- `dry_run` — the user passed `--dry-run`; no HTTP call fired but the intent + is recorded. + +The SDK (`AsyncBCClient`) does NOT auto-emit. Audit is a CLI-layer ergonomic; +programmatic SDK users get unfiltered access by design and can wire their own +logging. + ## File Locations | File | Purpose | @@ -135,4 +198,5 @@ hatch for ad-hoc access. | `~/.config/bcli/tokens.json` | Cached auth tokens | | `~/.config/bcli/registries/*.json` | Imported custom API registries | | `~/.config/bcli/queries/*.yaml` | Saved queries | +| `~/.config/bcli/audit/*.jsonl` | Per-profile audit log (when `[audit] enabled = true`) | | `.bcli.toml` | Project-level config override | diff --git a/docs/write-operations.md b/docs/write-operations.md index ab50f66..2d57256 100644 --- a/docs/write-operations.md +++ b/docs/write-operations.md @@ -53,14 +53,52 @@ bcli delete customers "a1b2c3d4-..." --etag 'W/"ABC123"' ## Dry Run -Preview write operations without executing: +Preview write operations without executing. Works on `post`, `patch`, `delete`, +and `attach upload`. The output adapts to `--format`: + +**Human format (default):** rich panel on stderr with the resolved URL, profile +context, and the request body. ```bash bcli --dry-run post customers --data '{"displayName": "Test"}' -# --dry-run: would POST to customers -# {"displayName": "Test"} +# --dry-run: would POST customers +# URL: https://api.businesscentral.dynamics.com/.../api/v2.0/companies()/customers +# Profile: dev +# Env: Sandbox +# Company: +# { +# "displayName": "Test" +# } +``` + +**Machine format (`-f json` / `-f ndjson` / `-f raw`):** a single JSON envelope +on stdout that an agent can parse before deciding whether to proceed: + +```bash +bcli --dry-run -f json post customers --data '{"displayName": "Test"}' ``` +```json +{ + "dry_run": true, + "method": "POST", + "endpoint": "customers", + "resolved_url": "https://api.businesscentral.dynamics.com/.../customers", + "profile": "dev", + "environment": "Sandbox", + "company_id": "", + "body": {"displayName": "Test"} +} +``` + +`PATCH` and `DELETE` envelopes also include `record_id`. `attach upload` adds +`file_path`, `byte_size`, `parent_type`, and `parent_id`. The envelope shape is +stable — agents can rely on the field names. + +When the audit log is enabled (see [Audit Log](configuration.md#audit-log)), each +dry-run is recorded with `outcome: "dry_run"` so the paper trail captures intent +even when no HTTP call fires. + ## Custom API Routes For custom endpoints not in the registry: diff --git a/pyproject.toml b/pyproject.toml index 6043799..ae3cf28 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -9,7 +9,7 @@ build-backend = "hatchling.build" # installed CLI binary (`bcli`) are unaffected — only `pip install` / # `uv tool install` use this name. name = "bc-cli" -version = "0.1.5" +version = "0.2.0" description = "Python SDK and CLI for Microsoft Dynamics 365 Business Central APIs" readme = "README.md" license = "Apache-2.0" diff --git a/src/bcli/audit/__init__.py b/src/bcli/audit/__init__.py new file mode 100644 index 0000000..8090655 --- /dev/null +++ b/src/bcli/audit/__init__.py @@ -0,0 +1,27 @@ +"""Opt-in audit log for bcli write operations. + +>>> from bcli.audit import get_audit_sink, AuditEntry +>>> sink = get_audit_sink(config.audit, profile_name="dev") # NullSink if disabled +>>> sink.emit(AuditEntry(...)) +""" + +from __future__ import annotations + +from bcli.audit._factory import get_audit_sink +from bcli.audit._protocol import ( + AuditEntry, + AuditSink, + JSONLAuditSink, + NullAuditSink, +) +from bcli.audit._redact import REDACTED, redact + +__all__ = [ + "AuditEntry", + "AuditSink", + "JSONLAuditSink", + "NullAuditSink", + "REDACTED", + "get_audit_sink", + "redact", +] diff --git a/src/bcli/audit/_factory.py b/src/bcli/audit/_factory.py new file mode 100644 index 0000000..afdd933 --- /dev/null +++ b/src/bcli/audit/_factory.py @@ -0,0 +1,74 @@ +"""Build an :class:`AuditSink` from an :class:`AuditConfig`. + +Returns :class:`NullAuditSink` when audit is disabled or the chosen +backend cannot be loaded — callers can ``emit()`` unconditionally. +""" + +from __future__ import annotations + +import logging +from pathlib import Path +from typing import TYPE_CHECKING + +from bcli.audit._protocol import AuditSink, JSONLAuditSink, NullAuditSink + +if TYPE_CHECKING: + from bcli.config._model import AuditConfig + +logger = logging.getLogger("bcli.audit") + + +def get_audit_sink( + config: "AuditConfig | None", + *, + profile_name: str, +) -> AuditSink: + """Build an audit sink for the given profile. + + ``profile_name`` is interpolated into ``config.path`` (template token + ``{profile}``) so a single global config produces one file per + profile automatically. + """ + if config is None or not config.enabled: + return NullAuditSink() + + backend = (config.backend or "jsonl").strip().lower() + if backend == "null": + return NullAuditSink() + + if backend == "jsonl": + try: + path = _resolve_path(config.path, profile_name=profile_name) + except Exception as exc: # noqa: BLE001 + logger.warning( + "audit path resolution failed (%s); falling back to NullAuditSink", exc + ) + return NullAuditSink() + return JSONLAuditSink( + path=path, + max_size_bytes=int(config.max_size_mb) * 1024 * 1024, + ) + + logger.warning( + "unknown audit backend '%s'; falling back to NullAuditSink. " + "Built-in choices: 'jsonl', 'null'.", + config.backend, + ) + return NullAuditSink() + + +def _resolve_path( + template: str | None, + *, + profile_name: str, +) -> Path: + """Expand the configured path, falling back to the documented default + when the user didn't set one.""" + if template: + expanded = template.format(profile=profile_name) + else: + # Default: ~/.config/bcli/audit/{profile}.jsonl + expanded = str( + Path.home() / ".config" / "bcli" / "audit" / f"{profile_name}.jsonl" + ) + return Path(expanded).expanduser() diff --git a/src/bcli/audit/_protocol.py b/src/bcli/audit/_protocol.py new file mode 100644 index 0000000..2404223 --- /dev/null +++ b/src/bcli/audit/_protocol.py @@ -0,0 +1,126 @@ +"""Audit-sink protocol + built-in NullAuditSink and JSONLAuditSink. + +The audit log is opt-in (off by default). When enabled, every CLI write +appends one JSONL line to a per-profile file. The file rotates once it +crosses ``max_size_bytes`` — the previous content moves to ``.1`` +and a fresh file is started. Only one backup is kept; users who need +indefinite retention should ship the file to their own log store. + +A sink MUST never crash the CLI. Path resolution failure, write failure, +disk full, permissions error — all swallowed. +""" + +from __future__ import annotations + +import json +import logging +from dataclasses import asdict, dataclass +from pathlib import Path +from typing import Any, Protocol, runtime_checkable + +logger = logging.getLogger("bcli.audit") + + +@dataclass(frozen=True) +class AuditEntry: + """One row in the audit log. + + Field semantics: + + * ``ts`` — ISO 8601 UTC timestamp of the request. + * ``profile`` — active profile name. + * ``environment`` — BC environment (e.g. ``Production``). + * ``company_id`` — BC company id; ``None`` for company-less calls. + * ``method`` — HTTP method (POST / PATCH / DELETE / UPLOAD). + * ``endpoint`` — entity-set name passed to ``bcli``. + * ``resolved_url`` — full URL that was (or would have been) hit. + * ``record_id`` — for PATCH / DELETE; ``None`` otherwise. + * ``request_body`` — redacted body that was sent; ``None`` for DELETE. + * ``status`` — HTTP status code; ``None`` if the call never + fired (dry-run, pre-call failure). + * ``correlation_id`` — BC ``x-ms-correlation-request-id`` header; useful + when grepping BC server-side logs. + * ``latency_ms`` — round-trip latency; ``None`` if no call fired. + * ``cli_version`` — bcli version string. + * ``caller`` — ``cli`` | ``mcp`` | ``sdk``; who initiated. + * ``outcome`` — ``completed`` | ``failed`` | ``dry_run``. + * ``error`` — exception message; ``None`` on success. + """ + + ts: str + profile: str + environment: str + company_id: str | None + method: str + endpoint: str + resolved_url: str | None + record_id: str | None + request_body: Any | None + status: int | None + correlation_id: str | None + latency_ms: int | None + cli_version: str + caller: str + outcome: str + error: str | None = None + + +@runtime_checkable +class AuditSink(Protocol): + """Structural type for audit sinks.""" + + is_active: bool + + def emit(self, entry: AuditEntry) -> None: ... + + +class NullAuditSink: + """Zero-overhead sink. Returned when audit is disabled or misconfigured.""" + + is_active: bool = False + + def emit(self, entry: AuditEntry) -> None: # noqa: ARG002 + return None + + +class JSONLAuditSink: + """Append-only JSONL file sink with single-backup rotation. + + Rotation strategy: when the file exceeds ``max_size_bytes`` BEFORE a + write, the existing file is moved to ``.1`` (overwriting any + prior backup) and a fresh file takes its place. This bounds disk usage + at ``2 * max_size_bytes`` regardless of how many writes happen. + """ + + is_active: bool = True + + def __init__(self, path: Path, max_size_bytes: int = 50 * 1024 * 1024) -> None: + self.path = path + self.max_size_bytes = max_size_bytes + + def emit(self, entry: AuditEntry) -> None: + try: + self.path.parent.mkdir(parents=True, exist_ok=True) + self._rotate_if_needed() + line = json.dumps(asdict(entry), default=str) + with self.path.open("a", encoding="utf-8") as fh: + fh.write(line + "\n") + except Exception as exc: # noqa: BLE001 — sink must never raise + logger.debug("audit emit failed: %s", exc) + + def _rotate_if_needed(self) -> None: + if not self.path.is_file(): + return + try: + size = self.path.stat().st_size + except OSError: + return + if size < self.max_size_bytes: + return + backup = self.path.with_suffix(self.path.suffix + ".1") + try: + if backup.exists(): + backup.unlink() + self.path.rename(backup) + except OSError as exc: + logger.debug("audit rotate failed: %s", exc) diff --git a/src/bcli/audit/_redact.py b/src/bcli/audit/_redact.py new file mode 100644 index 0000000..3ebc954 --- /dev/null +++ b/src/bcli/audit/_redact.py @@ -0,0 +1,60 @@ +"""Key-based redaction for audit log payloads. + +Walks dicts and lists, replacing values whose key contains any of the +configured tokens (case-insensitive substring match) with the sentinel +``REDACTED``. Non-dict bodies pass through unchanged. Never mutates the +input — returns a deep-copied structure. +""" + +from __future__ import annotations + +from collections.abc import Iterable +from typing import Any + +REDACTED = "***REDACTED***" + + +def redact(value: Any, keys: Iterable[str]) -> Any: + """Return a copy of ``value`` with sensitive values replaced. + + A field is redacted if any token in ``keys`` appears (case-insensitive) + anywhere in the field name. So ``("token",)`` matches ``token``, + ``apiToken``, ``session_token``, ``Token`` — all of them. This is + deliberately wide: false positives in an audit log are cheap, false + negatives leak credentials. + """ + needles = tuple(k.lower() for k in keys if k) + if not needles: + return _deep_copy(value) + return _walk(value, needles) + + +def _walk(value: Any, needles: tuple[str, ...]) -> Any: + if isinstance(value, dict): + out: dict[str, Any] = {} + for k, v in value.items(): + if isinstance(k, str) and _matches(k, needles): + out[k] = REDACTED + else: + out[k] = _walk(v, needles) + return out + if isinstance(value, list): + return [_walk(item, needles) for item in value] + if isinstance(value, tuple): + return tuple(_walk(item, needles) for item in value) + return value + + +def _matches(key: str, needles: tuple[str, ...]) -> bool: + lowered = key.lower() + return any(n in lowered for n in needles) + + +def _deep_copy(value: Any) -> Any: + if isinstance(value, dict): + return {k: _deep_copy(v) for k, v in value.items()} + if isinstance(value, list): + return [_deep_copy(v) for v in value] + if isinstance(value, tuple): + return tuple(_deep_copy(v) for v in value) + return value diff --git a/src/bcli/config/_model.py b/src/bcli/config/_model.py index 5851792..751fc39 100644 --- a/src/bcli/config/_model.py +++ b/src/bcli/config/_model.py @@ -154,12 +154,54 @@ def is_active(self) -> bool: return self.enabled and self.backend.strip().lower() not in ("", "null") +class AuditConfig(BaseModel): + """Optional audit log for write operations. + + When ``enabled = true`` every CLI write (POST / PATCH / DELETE / attach + upload) appends one JSONL line to a per-profile log file. Captures + request shape, resolved URL, response status, BC correlation id, + latency, and outcome — sufficient for forensic review and for agent- + driven workflows where you want a paper trail of what was done. + + Defaults are off and conservative: zero overhead when disabled, no + capture of read traffic, and request bodies are key-redacted before + write. + + Built-in backends: + + * ``"null"`` — drop everything (effectively disabled). + * ``"jsonl"`` — append one JSON object per line to ``path`` (default). + + The path supports a ``{profile}`` placeholder so a single global + config produces a per-profile log file automatically. + """ + + enabled: bool = False + backend: str = "jsonl" + path: str | None = None + max_size_mb: int = Field(default=50, ge=1) + include_reads: bool = False + redact_keys: list[str] = Field( + default_factory=lambda: [ + "password", + "secret", + "token", + "key", + "apiKey", + "authorization", + ] + ) + + model_config = {"extra": "allow"} + + class BCConfig(BaseModel): """Top-level configuration.""" defaults: BCDefaults = Field(default_factory=BCDefaults) profiles: dict[str, BCProfile] = Field(default_factory=dict) telemetry: TelemetryConfig = Field(default_factory=TelemetryConfig) + audit: AuditConfig = Field(default_factory=AuditConfig) model_config = {"extra": "allow"} diff --git a/src/bcli/registry/_importers.py b/src/bcli/registry/_importers.py index 5cf970f..879274a 100644 --- a/src/bcli/registry/_importers.py +++ b/src/bcli/registry/_importers.py @@ -8,7 +8,53 @@ from pathlib import Path from bcli.config._defaults import REGISTRIES_DIR -from bcli.registry._schema import EndpointMetadata +from bcli.registry._schema import CautionLevel, EndpointMetadata + +# Verbs in BC entity-set names that mutate posted/closed records. +# Hits any of these (case-insensitive, camelCase or all-caps token) → +# ``caution: high``. Generic BC terms only — no domain-specific vocabulary. +_DANGEROUS_VERBS = frozenset({ + "post", + "release", + "cancel", + "void", + "reverse", + "apply", + "unapply", +}) + +# Splits a name into tokens by camelCase boundaries: +# "salesInvoicePost" -> ["sales", "Invoice", "Post"] +# "PaymentReverse" -> ["Payment", "Reverse"] +# "salesinvoiceCANCEL"-> ["salesinvoice", "CANCEL"] +# Falls back to a single all-caps chunk when there are no boundaries +# (e.g. ``SALESINVOICEPOST``); we substring-check those separately. +_CAMEL_SPLIT = re.compile(r"[A-Z]+(?=[A-Z][a-z])|[A-Z]?[a-z]+|[A-Z]+") + + +def _infer_caution(entity_set_name: str) -> CautionLevel: + """Heuristic ``caution`` level from an entity-set name. + + Returns ``"high"`` if the name contains any of ``_DANGEROUS_VERBS`` as a + discrete token (camelCase boundary aware) or as a prefix/suffix of an + all-uppercase string. Returns ``"low"`` otherwise. Never returns + ``"medium"`` — that level is reserved for explicit setting by importers + or curators who know the endpoint's actual semantics. + """ + if not entity_set_name: + return "low" + + if entity_set_name.isupper(): + lowered = entity_set_name.lower() + for verb in _DANGEROUS_VERBS: + if lowered.startswith(verb) or lowered.endswith(verb): + return "high" + return "low" + + for token in _CAMEL_SPLIT.findall(entity_set_name): + if token.lower() in _DANGEROUS_VERBS: + return "high" + return "low" def import_from_postman(postman_file: Path) -> list[EndpointMetadata]: @@ -71,6 +117,7 @@ def _extract_from_item(item: dict, parent_desc: str = "") -> None: source_table=source_table, supports=[method], key_field="systemId", + caution=_infer_caution(entity_set_name), ) else: existing = endpoints[key] @@ -149,6 +196,8 @@ def import_from_json(json_file: Path) -> list[EndpointMetadata]: # bcli format if "endpoints" in raw: for entry in raw["endpoints"]: + # Backfill caution from heuristic when the source file omits it. + entry.setdefault("caution", _infer_caution(entry.get("entity_set_name", ""))) endpoints.append(EndpointMetadata.model_validate(entry)) return endpoints @@ -158,8 +207,9 @@ def import_from_json(json_file: Path) -> list[EndpointMetadata]: continue for entry in items: api_group = entry.get("api_group", group_name) + entity_set_name = entry.get("entity_set_name", "") meta = EndpointMetadata( - entity_set_name=entry.get("entity_set_name", ""), + entity_set_name=entity_set_name, entity_name=entry.get("entity_name", ""), api_publisher=entry.get("api_publisher", ""), api_group=api_group, @@ -171,6 +221,7 @@ def import_from_json(json_file: Path) -> list[EndpointMetadata]: key_field=entry.get("odata_key_fields", "systemId"), editable=entry.get("editable", "false").lower() == "true", supports=["GET"] if entry.get("data_access_intent") == "ReadOnly" else ["GET", "POST", "PATCH", "DELETE"], + caution=entry.get("caution") or _infer_caution(entity_set_name), ) if meta.entity_set_name: endpoints.append(meta) @@ -269,6 +320,7 @@ def _parse_metadata_xml( supports=["GET"], # Conservative default — metadata doesn't always tell us key_field="systemId", field_names=fields_by_type.get(entity_type, []), + caution=_infer_caution(entity_set_name), )) return sorted(endpoints, key=lambda e: e.entity_set_name) diff --git a/src/bcli/registry/_schema.py b/src/bcli/registry/_schema.py index e5b5e32..c76553c 100644 --- a/src/bcli/registry/_schema.py +++ b/src/bcli/registry/_schema.py @@ -2,8 +2,12 @@ from __future__ import annotations +from typing import Literal + from pydantic import BaseModel, Field +CautionLevel = Literal["low", "medium", "high"] + class EndpointMetadata(BaseModel): """Metadata for a single API endpoint.""" @@ -23,6 +27,13 @@ class EndpointMetadata(BaseModel): # Domain classification: "standard", "finance", "technical" domain: str = "standard" + # Caution level for agent driving — "low" (plain CRUD), "medium" + # (writes possible but reversible), "high" (mutates posted/closed + # records; agents should require explicit user approval). Defaults to + # ``low``; importers may set explicitly or use ``_infer_caution()`` to + # derive from the entity-set name. + caution: CautionLevel = "low" + # Optional metadata from imports source_table: str = "" page_number: str = "" diff --git a/src/bcli_cli/_audit_wrap.py b/src/bcli_cli/_audit_wrap.py new file mode 100644 index 0000000..60b7df2 --- /dev/null +++ b/src/bcli_cli/_audit_wrap.py @@ -0,0 +1,153 @@ +"""CLI-side audit-log integration. + +Two entry points used by the write commands: + +* :func:`audited_write` wraps the actual write coroutine, emitting one + ``completed`` or ``failed`` audit entry per call. +* :func:`emit_dry_run_audit` is called by ``render_dry_run`` to record + the ``dry_run`` outcome before short-circuiting. + +Both fast-path when audit is disabled (no sink construction, no work +beyond a property read) so the audit feature has zero overhead until +the user opts in. + +The SDK (``AsyncBCClient``) intentionally does NOT auto-emit — programmatic +users get unfiltered access. Audit is a CLI-layer ergonomic, layered on +top of BC's own permission set (the actual security boundary). +""" + +from __future__ import annotations + +import time +from datetime import datetime, timezone +from typing import Any, Awaitable, TypeVar + +from bcli._version import __version__ +from bcli.audit import AuditEntry, get_audit_sink, redact +from bcli_cli._state import state + +T = TypeVar("T") + +_CALLER = "cli" + + +def _profile_audit_sink(): + return get_audit_sink(state.config.audit, profile_name=state.active_profile_name) + + +def _now_iso() -> str: + return datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") + + +def _redact_body(body: Any | None) -> Any | None: + if body is None: + return None + return redact(body, state.config.audit.redact_keys) + + +async def audited_write( + coro: Awaitable[T], + *, + method: str, + endpoint: str, + body: Any | None = None, + record_id: str | None = None, + resolved_url: str | None = None, +) -> T: + """Run a write coroutine, emitting an audit entry on completion or failure. + + Re-raises any exception from the wrapped coroutine after recording it, + so callers see the same surface they would without auditing. + """ + sink = _profile_audit_sink() + if not sink.is_active: + return await coro + + profile = state.profile + redacted = _redact_body(body) + start = time.perf_counter() + + try: + result = await coro + except Exception as exc: + latency_ms = int((time.perf_counter() - start) * 1000) + sink.emit( + AuditEntry( + ts=_now_iso(), + profile=state.active_profile_name, + environment=profile.environment, + company_id=profile.company_id, + method=method.upper(), + endpoint=endpoint, + resolved_url=resolved_url, + record_id=record_id, + request_body=redacted, + status=getattr(exc, "status_code", None), + correlation_id=getattr(exc, "correlation_id", None), + latency_ms=latency_ms, + cli_version=__version__, + caller=_CALLER, + outcome="failed", + error=str(exc), + ) + ) + raise + + latency_ms = int((time.perf_counter() - start) * 1000) + sink.emit( + AuditEntry( + ts=_now_iso(), + profile=state.active_profile_name, + environment=profile.environment, + company_id=profile.company_id, + method=method.upper(), + endpoint=endpoint, + resolved_url=resolved_url, + record_id=record_id, + request_body=redacted, + status=200, # transport returns body on success; status not surfaced + correlation_id=None, + latency_ms=latency_ms, + cli_version=__version__, + caller=_CALLER, + outcome="completed", + error=None, + ) + ) + return result + + +def emit_dry_run_audit( + method: str, + endpoint: str, + *, + body: Any | None = None, + record_id: str | None = None, + resolved_url: str | None = None, +) -> None: + """Record a ``dry_run`` audit entry. Called from ``render_dry_run``.""" + sink = _profile_audit_sink() + if not sink.is_active: + return + + profile = state.profile + sink.emit( + AuditEntry( + ts=_now_iso(), + profile=state.active_profile_name, + environment=profile.environment, + company_id=profile.company_id, + method=method.upper(), + endpoint=endpoint, + resolved_url=resolved_url, + record_id=record_id, + request_body=_redact_body(body), + status=None, + correlation_id=None, + latency_ms=None, + cli_version=__version__, + caller=_CALLER, + outcome="dry_run", + error=None, + ) + ) diff --git a/src/bcli_cli/_dry_run.py b/src/bcli_cli/_dry_run.py new file mode 100644 index 0000000..86b6de4 --- /dev/null +++ b/src/bcli_cli/_dry_run.py @@ -0,0 +1,124 @@ +"""Shared ``--dry-run`` renderer for write commands. + +One helper, one shape, every write command. Used by ``bcli post``, +``patch``, ``delete``, ``attach upload``, ``attach test``, and +``batch run`` so an agent driving any of those gets the same machine- +readable preview without each command rolling its own format. + +Output format follows the active CLI ``--format``: + +* ``json`` — pretty-printed JSON envelope on stdout. +* ``ndjson`` — single-line JSON envelope on stdout (pipeable). +* ``raw`` — single-line JSON envelope on stdout. +* anything human (``table`` / ``markdown`` / ``csv`` / unset) — yellow + warning + key-value summary on stderr; the request body, if any, also + printed on stderr so the human view is one block on stderr and stdout + stays clean. + +The helper always raises ``typer.Exit(0)``; dry-run is a clean short- +circuit, not an error. +""" + +from __future__ import annotations + +import json +from typing import Any + +import typer +from rich.console import Console + +from bcli_cli._state import state +from bcli_cli._url_resolve import try_resolve_url + +_console = Console(stderr=True) + +# Formats that mean "an agent or pipe is consuming us; emit JSON on stdout" +_MACHINE_FORMATS = frozenset({"json", "ndjson", "raw"}) + + +def render_dry_run( + method: str, + endpoint: str, + *, + body: Any | None = None, + record_id: str | None = None, + publisher: str | None = None, + group: str | None = None, + version: str | None = None, + force_standard: bool = False, + extra: dict[str, Any] | None = None, +) -> None: + """Print a structured dry-run preview and ``typer.Exit(0)``. + + ``force_standard=True`` mirrors ``attach upload --standard``: the + resolved URL skips the custom registry and points at the standard + ``/api/v2.0/`` route, matching what the actual write would hit. + """ + profile = state.profile + profile_name = state.active_profile_name + + resolved_url = try_resolve_url( + endpoint, + record_id=record_id, + publisher=publisher, + group=group, + version=version, + force_standard=force_standard, + ) + + payload: dict[str, Any] = { + "dry_run": True, + "method": method.upper(), + "endpoint": endpoint, + "resolved_url": resolved_url, + "profile": profile_name, + "environment": profile.environment, + "company_id": profile.company_id, + } + if record_id is not None: + payload["record_id"] = record_id + if body is not None: + payload["body"] = body + if extra: + payload.update(extra) + + if state.format in _MACHINE_FORMATS: + indent = 2 if state.format == "json" else None + print(json.dumps(payload, indent=indent, default=str)) + else: + _render_human(payload) + + # Record the dry-run in the audit log when the user has it enabled, + # so a paper trail captures every "would have done X" intent in + # addition to actual writes. + from bcli_cli._audit_wrap import emit_dry_run_audit + emit_dry_run_audit( + method, + endpoint, + body=body, + record_id=record_id, + resolved_url=resolved_url, + ) + + raise typer.Exit() + + +def _render_human(payload: dict[str, Any]) -> None: + method = payload["method"] + endpoint = payload["endpoint"] + record_id = payload.get("record_id") + target = f"{endpoint}({record_id})" if record_id else endpoint + + _console.print(f"[yellow]--dry-run: would {method} {target}[/yellow]") + if payload.get("resolved_url"): + _console.print(f"[dim] URL: {payload['resolved_url']}[/dim]") + _console.print(f"[dim] Profile: {payload['profile']}[/dim]") + _console.print(f"[dim] Env: {payload['environment']}[/dim]") + if payload.get("company_id"): + _console.print(f"[dim] Company: {payload['company_id']}[/dim]") + for key in ("file_path", "byte_size", "parent_type"): + if key in payload: + _console.print(f"[dim] {key.replace('_', ' ').title():12}{payload[key]}[/dim]") + body = payload.get("body") + if body is not None: + _console.print(json.dumps(body, indent=2, default=str)) diff --git a/src/bcli_cli/_url_resolve.py b/src/bcli_cli/_url_resolve.py new file mode 100644 index 0000000..48ba6e2 --- /dev/null +++ b/src/bcli_cli/_url_resolve.py @@ -0,0 +1,57 @@ +"""Best-effort URL resolution shared by dry-run previews and audit logging. + +Both the dry-run renderer and the audit-log wrapper need to record the +fully-qualified URL that ``bcli`` is about to hit (or did hit). Two +constraints make a separate helper worthwhile: + +* The resolver must NEVER raise. A broken URL build path shouldn't crash + the user's actual command — the preview / audit just records ``None`` + and gets out of the way. +* The ``--standard`` escape hatch on ``attach upload`` bypasses the + registry. Code that resolves the URL for previewing or auditing has + to mirror that bypass; otherwise the preview shows the registry's + custom route while the actual upload silently uses ``/api/v2.0/``. +""" + +from __future__ import annotations + +from bcli_cli._state import state + + +def try_resolve_url( + endpoint: str, + *, + record_id: str | None = None, + publisher: str | None = None, + group: str | None = None, + version: str | None = None, + force_standard: bool = False, +) -> str | None: + """Resolve ``endpoint`` to a full URL using the active profile. + + Returns ``None`` on any failure (registry miss with + ``disable_standard_api``, missing company id, malformed profile, + etc.). Callers should treat ``None`` as "preview the rest, the user + will see the gap and correct". + """ + try: + if force_standard: + from bcli._url import build_url + + profile = state.profile + return build_url( + environment=profile.environment, + company_id=profile.company_id or "", + entity_set_name=endpoint, + record_id=record_id, + ) + client = state.make_async_client() + return client._resolve_url( + endpoint, + record_id=record_id, + publisher=publisher, + group=group, + version=version, + ) + except Exception: + return None diff --git a/src/bcli_cli/commands/attach_cmd.py b/src/bcli_cli/commands/attach_cmd.py index 0999a26..8d0b372 100644 --- a/src/bcli_cli/commands/attach_cmd.py +++ b/src/bcli_cli/commands/attach_cmd.py @@ -61,6 +61,7 @@ def upload_command( take priority. Force a specific route with ``--publisher/--group/--version``. """ output_format = format or state.format + state.format = output_format # propagate subcommand -f to dry-run + audit if output_format in ("json", "csv", "ndjson", "raw"): state.quiet = True @@ -70,14 +71,30 @@ def upload_command( confirm_write_or_exit("UPLOAD", "documentAttachments", yes=yes) if state.dry_run: - console.print( - f"[yellow]--dry-run: would upload {file_path} ({file_path.stat().st_size} bytes) " - f"as attachment on {parent_type}({parent_id})[/yellow]" + from bcli_cli._dry_run import render_dry_run + render_dry_run( + "UPLOAD", "documentAttachments", + publisher=publisher, group=group, version=version, + force_standard=standard, + extra={ + "file_path": str(file_path), + "byte_size": file_path.stat().st_size, + "parent_type": parent_type, + "parent_id": parent_id, + "file_name": file_name or file_path.name, + "force_standard": standard, + }, ) - raise typer.Exit() try: - result = asyncio.run( + from bcli_cli._audit_wrap import audited_write + from bcli_cli._url_resolve import try_resolve_url + resolved_url = try_resolve_url( + "documentAttachments", + publisher=publisher, group=group, version=version, + force_standard=standard, + ) + result = asyncio.run(audited_write( _execute_attach( file_path=file_path, parent_type=parent_type, @@ -88,8 +105,17 @@ def upload_command( group=group, version=version, force_standard=standard, - ) - ) + ), + method="UPLOAD", + endpoint="documentAttachments", + body={ + "parent_type": parent_type, + "parent_id": parent_id, + "file_name": file_name or file_path.name, + "byte_size": file_path.stat().st_size, + }, + resolved_url=resolved_url, + )) format_output([result] if result else [], output_format) except Exception as e: console.print(f"[red]Error:[/red] {e}") @@ -116,17 +142,26 @@ def test_command( upload in one shot. """ output_format = format or state.format + state.format = output_format # propagate subcommand -f to dry-run + audit if output_format in ("json", "csv", "ndjson", "raw"): state.quiet = True print_context_banner() if state.dry_run: - console.print( - f"[yellow]--dry-run: would create draft purchaseInvoice for vendor {vendor_id}, " - f"then attach {file_path}[/yellow]" + from bcli_cli._dry_run import render_dry_run + render_dry_run( + "TEST_ATTACH", "purchaseInvoices+documentAttachments", + publisher=publisher, group=group, version=version, + force_standard=standard, + extra={ + "vendor_id": vendor_id, + "invoice_date": invoice_date, + "file_path": str(file_path), + "byte_size": file_path.stat().st_size, + "force_standard": standard, + }, ) - raise typer.Exit() try: result = asyncio.run( diff --git a/src/bcli_cli/commands/delete_cmd.py b/src/bcli_cli/commands/delete_cmd.py index efb1274..1997fa3 100644 --- a/src/bcli_cli/commands/delete_cmd.py +++ b/src/bcli_cli/commands/delete_cmd.py @@ -25,8 +25,10 @@ def delete_command( yes: bool = typer.Option(False, "--yes", "-y", help="Skip the read-only-profile warning prompt"), ) -> None: """DELETE a record.""" - if format and format in ("json", "csv", "ndjson", "raw"): - state.quiet = True + if format: + state.format = format # propagate subcommand -f to dry-run + audit + if format in ("json", "csv", "ndjson", "raw"): + state.quiet = True print_context_banner() @@ -34,19 +36,41 @@ def delete_command( confirm_write_or_exit("DELETE", endpoint, yes=yes) if state.dry_run: - console.print(f"[yellow]--dry-run: would DELETE {endpoint}({record_id})[/yellow]") - raise typer.Exit() + from bcli_cli._dry_run import render_dry_run + render_dry_run( + "DELETE", endpoint, record_id=record_id, + publisher=publisher, group=group, version=version, + extra={"etag": etag}, + ) try: - asyncio.run( - _execute_delete(endpoint, record_id, etag=etag, publisher=publisher, group=group, version=version) - ) + asyncio.run(_audited_delete( + endpoint, record_id, + etag=etag, publisher=publisher, group=group, version=version, + )) console.print(f"[green]✓[/green] Deleted {endpoint}({record_id})") except Exception as e: console.print(f"[red]Error:[/red] {e}") raise typer.Exit(1) +async def _audited_delete(endpoint, record_id, **kwargs): + from bcli_cli._audit_wrap import audited_write + from bcli_cli._url_resolve import try_resolve_url + resolved_url = try_resolve_url( + endpoint, + record_id=record_id, + publisher=kwargs.get("publisher"), + group=kwargs.get("group"), + version=kwargs.get("version"), + ) + return await audited_write( + _execute_delete(endpoint, record_id, **kwargs), + method="DELETE", endpoint=endpoint, record_id=record_id, + resolved_url=resolved_url, + ) + + async def _execute_delete(endpoint, record_id, **kwargs): async with state.make_async_client() as client: return await client.delete(endpoint, record_id, **kwargs) diff --git a/src/bcli_cli/commands/endpoint_cmd.py b/src/bcli_cli/commands/endpoint_cmd.py index 1708948..955a0d4 100644 --- a/src/bcli_cli/commands/endpoint_cmd.py +++ b/src/bcli_cli/commands/endpoint_cmd.py @@ -30,6 +30,7 @@ def _endpoint_to_dict(ep) -> dict: "group": ep.api_group, "version": ep.api_version, "description": ep.description or "", + "caution": ep.caution, } @@ -154,6 +155,8 @@ def endpoint_info( console.print(f" Operations: {', '.join(ep.supports)}") console.print(f" Key field: {ep.key_field}") console.print(f" Category: {ep.category}") + caution_color = {"low": "green", "medium": "yellow", "high": "red"}[ep.caution] + console.print(f" Caution: [{caution_color}]{ep.caution}[/{caution_color}]") console.print(f" Custom: {'Yes' if ep.is_custom else 'No (standard v2.0)'}") if ep.description: console.print(f" Description: {ep.description}") diff --git a/src/bcli_cli/commands/patch_cmd.py b/src/bcli_cli/commands/patch_cmd.py index 9dcfa23..fa0bb6d 100644 --- a/src/bcli_cli/commands/patch_cmd.py +++ b/src/bcli_cli/commands/patch_cmd.py @@ -29,6 +29,7 @@ def patch_command( ) -> None: """PATCH (update) an existing record.""" output_format = format or state.format + state.format = output_format # propagate subcommand -f to dry-run + audit if output_format in ("json", "csv", "ndjson", "raw"): state.quiet = True @@ -40,20 +41,41 @@ def patch_command( body = _parse_data(data) if state.dry_run: - console.print(f"[yellow]--dry-run: would PATCH {endpoint}({record_id})[/yellow]") - console.print(json.dumps(body, indent=2)) - raise typer.Exit() + from bcli_cli._dry_run import render_dry_run + render_dry_run( + "PATCH", endpoint, body=body, record_id=record_id, + publisher=publisher, group=group, version=version, + extra={"etag": etag}, + ) try: - result = asyncio.run( - _execute_patch(endpoint, record_id, body, etag=etag, publisher=publisher, group=group, version=version) - ) + result = asyncio.run(_audited_patch( + endpoint, record_id, body, + etag=etag, publisher=publisher, group=group, version=version, + )) format_output([result] if result else [], output_format) except Exception as e: console.print(f"[red]Error:[/red] {e}") raise typer.Exit(1) +async def _audited_patch(endpoint, record_id, body, **kwargs): + from bcli_cli._audit_wrap import audited_write + from bcli_cli._url_resolve import try_resolve_url + resolved_url = try_resolve_url( + endpoint, + record_id=record_id, + publisher=kwargs.get("publisher"), + group=kwargs.get("group"), + version=kwargs.get("version"), + ) + return await audited_write( + _execute_patch(endpoint, record_id, body, **kwargs), + method="PATCH", endpoint=endpoint, body=body, record_id=record_id, + resolved_url=resolved_url, + ) + + async def _execute_patch(endpoint, record_id, body, **kwargs): async with state.make_async_client() as client: return await client.patch(endpoint, record_id, body, **kwargs) diff --git a/src/bcli_cli/commands/post_cmd.py b/src/bcli_cli/commands/post_cmd.py index a66805b..01881dd 100644 --- a/src/bcli_cli/commands/post_cmd.py +++ b/src/bcli_cli/commands/post_cmd.py @@ -27,6 +27,7 @@ def post_command( ) -> None: """POST (create) a new record.""" output_format = format or state.format + state.format = output_format # propagate subcommand -f to dry-run + audit if output_format in ("json", "csv", "ndjson", "raw"): state.quiet = True @@ -38,20 +39,36 @@ def post_command( body = _parse_data(data) if state.dry_run: - console.print(f"[yellow]--dry-run: would POST to {endpoint}[/yellow]") - console.print(json.dumps(body, indent=2)) - raise typer.Exit() + from bcli_cli._dry_run import render_dry_run + render_dry_run( + "POST", endpoint, body=body, + publisher=publisher, group=group, version=version, + ) try: - result = asyncio.run( - _execute_post(endpoint, body, publisher=publisher, group=group, version=version) - ) + result = asyncio.run(_audited_post(endpoint, body, publisher=publisher, group=group, version=version)) format_output([result] if result else [], output_format) except Exception as e: console.print(f"[red]Error:[/red] {e}") raise typer.Exit(1) +async def _audited_post(endpoint, body, **kwargs): + from bcli_cli._audit_wrap import audited_write + from bcli_cli._url_resolve import try_resolve_url + resolved_url = try_resolve_url( + endpoint, + publisher=kwargs.get("publisher"), + group=kwargs.get("group"), + version=kwargs.get("version"), + ) + return await audited_write( + _execute_post(endpoint, body, **kwargs), + method="POST", endpoint=endpoint, body=body, + resolved_url=resolved_url, + ) + + async def _execute_post(endpoint, body, **kwargs): async with state.make_async_client() as client: return await client.post(endpoint, body, **kwargs) diff --git a/tests/test_audit/__init__.py b/tests/test_audit/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_audit/test_factory.py b/tests/test_audit/test_factory.py new file mode 100644 index 0000000..19016cf --- /dev/null +++ b/tests/test_audit/test_factory.py @@ -0,0 +1,71 @@ +"""Tests for ``get_audit_sink`` — the factory that builds the audit sink +from an ``AuditConfig``. + +Disabled config returns ``NullAuditSink``. Enabled config with a path +returns ``JSONLAuditSink``. Path supports ``{profile}`` interpolation so +the same global config produces a per-profile log automatically. +""" + +from __future__ import annotations + +from bcli.audit._factory import get_audit_sink +from bcli.audit._protocol import JSONLAuditSink, NullAuditSink +from bcli.config._model import AuditConfig + + +def test_disabled_returns_null_sink() -> None: + cfg = AuditConfig(enabled=False) + sink = get_audit_sink(cfg, profile_name="dev") + assert isinstance(sink, NullAuditSink) + + +def test_enabled_default_returns_jsonl_sink(tmp_path, monkeypatch) -> None: + """Default backend is jsonl; with no explicit path it falls back to + the documented default under XDG config dir.""" + monkeypatch.setenv("HOME", str(tmp_path)) + cfg = AuditConfig(enabled=True) + sink = get_audit_sink(cfg, profile_name="dev") + assert isinstance(sink, JSONLAuditSink) + + +def test_path_template_substitutes_profile_name(tmp_path) -> None: + cfg = AuditConfig( + enabled=True, + path=str(tmp_path / "audit-{profile}.jsonl"), + ) + sink = get_audit_sink(cfg, profile_name="prod") + assert isinstance(sink, JSONLAuditSink) + assert sink.path == tmp_path / "audit-prod.jsonl" + + +def test_explicit_path_without_template_used_as_is(tmp_path) -> None: + target = tmp_path / "shared-audit.jsonl" + cfg = AuditConfig(enabled=True, path=str(target)) + sink = get_audit_sink(cfg, profile_name="dev") + assert isinstance(sink, JSONLAuditSink) + assert sink.path == target + + +def test_unknown_backend_falls_back_to_null(tmp_path) -> None: + """A misspelled backend value must NOT crash the CLI — it falls back + to the NullAuditSink and logs a warning.""" + cfg = AuditConfig(enabled=True, backend="not-a-real-backend") + sink = get_audit_sink(cfg, profile_name="dev") + assert isinstance(sink, NullAuditSink) + + +def test_max_size_propagates_to_sink(tmp_path) -> None: + cfg = AuditConfig( + enabled=True, + path=str(tmp_path / "a.jsonl"), + max_size_mb=5, + ) + sink = get_audit_sink(cfg, profile_name="dev") + assert isinstance(sink, JSONLAuditSink) + assert sink.max_size_bytes == 5 * 1024 * 1024 + + +def test_none_config_returns_null_sink() -> None: + """Profiles without an [audit] section get a no-op sink.""" + sink = get_audit_sink(None, profile_name="dev") + assert isinstance(sink, NullAuditSink) diff --git a/tests/test_audit/test_redact.py b/tests/test_audit/test_redact.py new file mode 100644 index 0000000..8ba345b --- /dev/null +++ b/tests/test_audit/test_redact.py @@ -0,0 +1,87 @@ +"""Tests for the audit-log redaction helper. + +Redaction runs over every request body before it lands in the JSONL audit +file. The match is case-insensitive on key name; the value is replaced +wholesale with the sentinel ``REDACTED``. Nested dicts and lists are +walked recursively. Non-dict bodies (strings, lists of primitives) pass +through unchanged. +""" + +from __future__ import annotations + +from bcli.audit._redact import REDACTED, redact + +DEFAULT_KEYS = ("password", "secret", "token", "key", "apiKey", "authorization") + + +def test_redact_replaces_top_level_key() -> None: + body = {"username": "alice", "password": "hunter2"} + out = redact(body, DEFAULT_KEYS) + assert out == {"username": "alice", "password": REDACTED} + + +def test_redact_is_case_insensitive() -> None: + body = {"Password": "x", "API_KEY": "y", "Token": "z"} + out = redact(body, DEFAULT_KEYS) + assert out["Password"] == REDACTED + assert out["Token"] == REDACTED + # API_KEY contains 'key' which is in defaults — should redact. + assert out["API_KEY"] == REDACTED + + +def test_redact_walks_nested_dicts() -> None: + body = { + "outer": { + "user": "alice", + "credentials": {"token": "abc123"}, + }, + } + out = redact(body, DEFAULT_KEYS) + assert out["outer"]["user"] == "alice" + assert out["outer"]["credentials"]["token"] == REDACTED + + +def test_redact_walks_lists_of_dicts() -> None: + body = {"items": [{"name": "a", "secret": "x"}, {"name": "b"}]} + out = redact(body, DEFAULT_KEYS) + assert out["items"][0]["secret"] == REDACTED + assert out["items"][0]["name"] == "a" + assert out["items"][1] == {"name": "b"} + + +def test_redact_does_not_mutate_input() -> None: + body = {"password": "hunter2"} + redact(body, DEFAULT_KEYS) + assert body == {"password": "hunter2"} + + +def test_redact_passes_through_non_dict_values() -> None: + assert redact("just a string", DEFAULT_KEYS) == "just a string" + assert redact([1, 2, 3], DEFAULT_KEYS) == [1, 2, 3] + assert redact(None, DEFAULT_KEYS) is None + + +def test_redact_handles_empty_keys_list() -> None: + body = {"password": "x"} + out = redact(body, ()) + assert out == {"password": "x"} + + +def test_redact_custom_keys_extend_defaults() -> None: + body = {"creditCard": "4111111111111111", "username": "alice"} + out = redact(body, ("creditcard",)) + assert out["creditCard"] == REDACTED + assert out["username"] == "alice" + + +def test_redact_partial_match_substring() -> None: + """A key that *contains* a redact term as a substring redacts. + + Justified: 'apiKey', 'api_key', 'apiToken', 'sessionToken' should all + redact even though they aren't exact matches for 'key' or 'token'. + """ + body = {"sessionToken": "x", "apiSecret": "y", "noMatch": "z"} + out = redact(body, ("token", "secret")) + assert out["sessionToken"] == REDACTED + assert out["apiSecret"] == REDACTED + assert out["noMatch"] == "z" diff --git a/tests/test_audit/test_sink.py b/tests/test_audit/test_sink.py new file mode 100644 index 0000000..46edeb1 --- /dev/null +++ b/tests/test_audit/test_sink.py @@ -0,0 +1,134 @@ +"""Tests for the JSONL audit-log sink. + +The sink appends one JSON object per write operation to a file the user +points at via ``[audit] path``. Rotation is size-based: when the file +exceeds ``max_size_mb`` the previous content moves to ``.1`` and a +fresh file is started. Only one backup is kept (the audit log is for +recent forensic context, not historical archival — that's the user's +log-shipping job). +""" + +from __future__ import annotations + +import json +from pathlib import Path + +from bcli.audit._protocol import ( + AuditEntry, + JSONLAuditSink, + NullAuditSink, +) + + +def _entry(**overrides) -> AuditEntry: + base = { + "ts": "2026-05-06T10:00:00Z", + "profile": "dev", + "environment": "Sandbox", + "company_id": "c-123", + "method": "POST", + "endpoint": "customers", + "resolved_url": "https://example.test/api/v2.0/companies(c-123)/customers", + "record_id": None, + "request_body": {"displayName": "Test"}, + "status": 201, + "correlation_id": "abc-cor", + "latency_ms": 312, + "cli_version": "0.2.0", + "caller": "cli", + "outcome": "completed", + "error": None, + } + base.update(overrides) + return AuditEntry(**base) + + +class TestNullAuditSink: + def test_is_active_false(self) -> None: + sink = NullAuditSink() + assert sink.is_active is False + + def test_emit_is_a_noop(self) -> None: + sink = NullAuditSink() + sink.emit(_entry()) # must not raise + + +class TestJSONLAuditSink: + def test_creates_parent_dir(self, tmp_path: Path) -> None: + target = tmp_path / "nested" / "audit.jsonl" + sink = JSONLAuditSink(path=target) + sink.emit(_entry()) + assert target.is_file() + + def test_emit_writes_one_jsonl_line(self, tmp_path: Path) -> None: + target = tmp_path / "audit.jsonl" + sink = JSONLAuditSink(path=target) + sink.emit(_entry(method="POST", endpoint="customers")) + sink.emit(_entry(method="DELETE", endpoint="items", record_id="x")) + + lines = target.read_text().splitlines() + assert len(lines) == 2 + first = json.loads(lines[0]) + assert first["method"] == "POST" + assert first["endpoint"] == "customers" + second = json.loads(lines[1]) + assert second["method"] == "DELETE" + assert second["record_id"] == "x" + + def test_jsonl_payload_round_trips_through_json(self, tmp_path: Path) -> None: + """Every emitted line must be parseable JSON with stable shape.""" + target = tmp_path / "audit.jsonl" + sink = JSONLAuditSink(path=target) + sink.emit(_entry()) + line = target.read_text().splitlines()[0] + payload = json.loads(line) + + # Every documented field present + for key in ( + "ts", "profile", "environment", "company_id", "method", + "endpoint", "resolved_url", "request_body", "status", + "correlation_id", "latency_ms", "cli_version", "caller", + "outcome", + ): + assert key in payload + + def test_rotation_keeps_one_backup(self, tmp_path: Path) -> None: + target = tmp_path / "audit.jsonl" + sink = JSONLAuditSink(path=target, max_size_bytes=200) + + sink.emit(_entry()) + size_one_entry = target.stat().st_size + + # Now write many more — should rotate repeatedly, but disk usage + # must stay bounded (one current + one backup, max). + for _ in range(100): + sink.emit(_entry()) + + backup = target.with_suffix(".jsonl.1") + assert target.is_file() + assert backup.is_file() + + # Bounded: current + backup never exceeds 2 entries' worth of + # slack on top of the threshold. Constant in the number of writes. + total = target.stat().st_size + backup.stat().st_size + assert total <= 2 * size_one_entry + 200, ( + f"unbounded growth: {total} bytes after 101 writes" + ) + + # Each file individually contains valid JSONL. + for f in (target, backup): + for line in f.read_text().splitlines(): + json.loads(line) + + def test_is_active_true(self, tmp_path: Path) -> None: + sink = JSONLAuditSink(path=tmp_path / "audit.jsonl") + assert sink.is_active is True + + def test_emit_never_raises_on_unwritable_path(self, tmp_path: Path) -> None: + """The audit sink must never crash the CLI. If the path is bad, + the emit should swallow it.""" + # /dev/null/foo is guaranteed to fail mkdir on POSIX. + target = Path("/dev/null/cant-write-here.jsonl") + sink = JSONLAuditSink(path=target) + # Must not raise: + sink.emit(_entry()) diff --git a/tests/test_cli/test_audit_wrap.py b/tests/test_cli/test_audit_wrap.py new file mode 100644 index 0000000..cb70ab1 --- /dev/null +++ b/tests/test_cli/test_audit_wrap.py @@ -0,0 +1,288 @@ +"""Tests for the CLI-side audit wrapper. + +The wrapper sits around every write command's actual API call. When the +audit sink is active, it records one entry per call: + +* ``completed`` on success +* ``failed`` on exception (captures status_code + correlation_id from BC errors) +* ``dry_run`` for ``--dry-run`` short-circuits + +Bodies pass through ``redact`` first so the log never contains plaintext +secrets. +""" + +from __future__ import annotations + +import asyncio +import json +from pathlib import Path + +import pytest + +from bcli.config._model import AuditConfig, BCConfig, BCDefaults, BCProfile +from bcli.errors import ValidationError +from bcli_cli._audit_wrap import audited_write, emit_dry_run_audit +from bcli_cli._state import state + + +@pytest.fixture +def audit_state(tmp_path: Path): + """State with audit enabled, writing JSONL to a tmp_path file.""" + audit_path = tmp_path / "audit-{profile}.jsonl" + cfg = BCConfig( + defaults=BCDefaults(profile="dev"), + profiles={ + "dev": BCProfile( + tenant_id="t1", + environment="Sandbox", + company_id="c-123", + ), + }, + audit=AuditConfig(enabled=True, path=str(audit_path)), + ) + state._config = cfg + state._registry = None + state.profile_name = None + state.format = "table" + + yield tmp_path / "audit-dev.jsonl" + + state._config = None + state._registry = None + + +@pytest.fixture +def audit_disabled(): + """State with audit explicitly disabled — wrapper must be a no-op.""" + cfg = BCConfig( + defaults=BCDefaults(profile="dev"), + profiles={"dev": BCProfile(tenant_id="t1", environment="Sandbox", company_id="c-1")}, + audit=AuditConfig(enabled=False), + ) + state._config = cfg + state._registry = None + state.profile_name = None + yield + state._config = None + state._registry = None + + +def _read_entries(path: Path) -> list[dict]: + return [json.loads(line) for line in path.read_text().splitlines()] + + +class TestAuditedWrite: + def test_completed_entry_emitted_on_success(self, audit_state: Path): + async def _ok(): + return {"id": "abc"} + + result = asyncio.run( + audited_write( + _ok(), + method="POST", + endpoint="customers", + body={"displayName": "Test"}, + resolved_url="https://example.test/customers", + ) + ) + assert result == {"id": "abc"} + entries = _read_entries(audit_state) + assert len(entries) == 1 + e = entries[0] + assert e["method"] == "POST" + assert e["endpoint"] == "customers" + assert e["outcome"] == "completed" + assert e["status"] == 200 + assert e["error"] is None + assert e["resolved_url"] == "https://example.test/customers" + assert e["latency_ms"] is not None + + def test_failed_entry_captures_bc_error_metadata(self, audit_state: Path): + async def _boom(): + raise ValidationError( + "field 'name' invalid", + status_code=400, + correlation_id="xyz-corr-id", + ) + + with pytest.raises(ValidationError): + asyncio.run( + audited_write( + _boom(), + method="POST", + endpoint="customers", + body={"name": ""}, + ) + ) + entries = _read_entries(audit_state) + assert len(entries) == 1 + e = entries[0] + assert e["outcome"] == "failed" + assert e["status"] == 400 + assert e["correlation_id"] == "xyz-corr-id" + assert "field 'name' invalid" in e["error"] + + def test_request_body_is_redacted(self, audit_state: Path): + async def _ok(): + return {"id": "abc"} + + asyncio.run( + audited_write( + _ok(), + method="POST", + endpoint="users", + body={"username": "alice", "password": "hunter2", "apiKey": "sk_xxx"}, + ) + ) + entry = _read_entries(audit_state)[0] + assert entry["request_body"]["username"] == "alice" + assert entry["request_body"]["password"] != "hunter2" + assert entry["request_body"]["apiKey"] != "sk_xxx" + + def test_no_emit_when_audit_disabled(self, audit_disabled, tmp_path: Path): + async def _ok(): + return {} + + asyncio.run( + audited_write( + _ok(), + method="POST", + endpoint="customers", + body={"x": 1}, + ) + ) + # Nothing in any audit dir. + for f in tmp_path.glob("**/*.jsonl"): + assert False, f"Expected no audit file, found {f}" + + def test_record_id_passed_through_for_patch(self, audit_state: Path): + async def _ok(): + return {} + + asyncio.run( + audited_write( + _ok(), + method="PATCH", + endpoint="customers", + body={"phone": "+1"}, + record_id="abc-123", + ) + ) + entry = _read_entries(audit_state)[0] + assert entry["method"] == "PATCH" + assert entry["record_id"] == "abc-123" + + def test_resolved_url_recorded_when_passed(self, audit_state: Path): + """The audit-log doc promises ``resolved_url`` is captured for every + write. Make sure the wrapper stores what callers pass instead of + silently dropping it to ``null``.""" + async def _ok(): + return {} + + url = "https://api.example.test/v2.0/companies(c-123)/customers" + asyncio.run( + audited_write( + _ok(), + method="POST", + endpoint="customers", + body={"x": 1}, + resolved_url=url, + ) + ) + entry = _read_entries(audit_state)[0] + assert entry["resolved_url"] == url + + def test_resolved_url_recorded_on_failure_too(self, audit_state: Path): + async def _boom(): + raise ValidationError("nope", status_code=400) + + url = "https://api.example.test/v2.0/companies(c-123)/customers" + with pytest.raises(ValidationError): + asyncio.run( + audited_write( + _boom(), + method="POST", + endpoint="customers", + body={"x": 1}, + resolved_url=url, + ) + ) + entry = _read_entries(audit_state)[0] + assert entry["outcome"] == "failed" + assert entry["resolved_url"] == url + + +class TestCommandLevelAuditing: + """The command-side wrappers (_audited_post / _audited_patch / + _audited_delete) must thread resolved_url through to the audit entry. + A null URL there breaks the documented audit contract.""" + + def test_audited_post_records_resolved_url(self, audit_state: Path, monkeypatch): + from bcli_cli.commands import post_cmd + + captured_url = "https://api.example.test/api/v2.0/companies(c-123)/customers" + + async def _fake_execute(endpoint, body, **kwargs): + return {"id": "abc"} + + class _StubClient: + def _resolve_url(self, entity, **_): + return captured_url + + monkeypatch.setattr(post_cmd, "_execute_post", _fake_execute) + monkeypatch.setattr(state, "make_async_client", lambda **_: _StubClient()) + + asyncio.run(post_cmd._audited_post("customers", {"x": 1})) + + entry = _read_entries(audit_state)[0] + assert entry["resolved_url"] == captured_url + assert entry["outcome"] == "completed" + + def test_audited_delete_records_resolved_url(self, audit_state: Path, monkeypatch): + from bcli_cli.commands import delete_cmd + + captured_url = ( + "https://api.example.test/api/v2.0/companies(c-123)/items(rec-1)" + ) + + async def _fake_execute(endpoint, record_id, **kwargs): + return {} + + class _StubClient: + def _resolve_url(self, entity, **_): + return captured_url + + monkeypatch.setattr(delete_cmd, "_execute_delete", _fake_execute) + monkeypatch.setattr(state, "make_async_client", lambda **_: _StubClient()) + + asyncio.run(delete_cmd._audited_delete("items", "rec-1")) + + entry = _read_entries(audit_state)[0] + assert entry["resolved_url"] == captured_url + assert entry["record_id"] == "rec-1" + + +class TestDryRunAudit: + def test_dry_run_entry_has_outcome_dry_run(self, audit_state: Path): + emit_dry_run_audit( + "POST", + "customers", + body={"displayName": "T"}, + resolved_url="https://example.test/customers", + ) + entries = _read_entries(audit_state) + assert len(entries) == 1 + e = entries[0] + assert e["outcome"] == "dry_run" + assert e["status"] is None + assert e["latency_ms"] is None + + def test_dry_run_no_emit_when_disabled(self, audit_disabled, tmp_path: Path): + emit_dry_run_audit("POST", "x", body={"a": 1}) + for f in tmp_path.glob("**/*.jsonl"): + assert False, f"Expected no audit file, found {f}" + + def test_dry_run_redacts_body(self, audit_state: Path): + emit_dry_run_audit("POST", "users", body={"password": "secret"}) + entry = _read_entries(audit_state)[0] + assert entry["request_body"]["password"] != "secret" diff --git a/tests/test_cli/test_dry_run.py b/tests/test_cli/test_dry_run.py new file mode 100644 index 0000000..903733d --- /dev/null +++ b/tests/test_cli/test_dry_run.py @@ -0,0 +1,227 @@ +"""Tests for the structured dry-run renderer. + +The renderer is the single source of truth for what ``--dry-run`` emits +across every write command (post / patch / delete / attach / batch). When +the active output format is machine-readable (json / ndjson / raw) the +renderer prints a JSON object on stdout that agents can parse before +deciding whether to proceed; when the format is human-facing it prints a +rich panel on stderr instead. Either way it ``typer.Exit()``s with code 0 +— dry-run is an explicit user request, not an error. +""" + +from __future__ import annotations + +import json +from contextlib import suppress + +import pytest +import typer + +from bcli.config._model import BCConfig, BCDefaults, BCProfile +from bcli_cli._dry_run import render_dry_run +from bcli_cli._state import state + + +@pytest.fixture +def configured_state(monkeypatch): + """Set up a minimal CLIState with one writable profile. + + Cleans up afterwards so tests don't bleed into each other. + """ + cfg = BCConfig( + defaults=BCDefaults(profile="dev"), + profiles={ + "dev": BCProfile( + tenant_id="t1", + environment="Sandbox", + company_id="c-123", + disable_writes=False, + ), + }, + ) + state._config = cfg + state._registry = None + state.profile_name = None + state.format = "table" + + # Default: URL resolution returns a deterministic stub. Individual + # tests can override by re-monkeypatching make_async_client. + class _StubClient: + def _resolve_url(self, entity, **_): + return f"https://api.example.test/v2.0/companies(c-123)/{entity}" + + monkeypatch.setattr(state, "make_async_client", lambda **_: _StubClient()) + + yield + state._config = None + state._registry = None + state.format = "table" + + +def _capture_json_payload(capsys) -> dict: + """Run helper and parse the JSON it printed to stdout.""" + out = capsys.readouterr().out + return json.loads(out) + + +class TestStructuredOutput: + def test_json_format_emits_dry_run_envelope(self, configured_state, capsys): + state.format = "json" + with pytest.raises(typer.Exit) as excinfo: + render_dry_run("POST", "customers", body={"displayName": "Test"}) + assert excinfo.value.exit_code == 0 + + payload = _capture_json_payload(capsys) + assert payload["dry_run"] is True + assert payload["method"] == "POST" + assert payload["endpoint"] == "customers" + assert payload["body"] == {"displayName": "Test"} + assert payload["profile"] == "dev" + assert payload["environment"] == "Sandbox" + assert payload["company_id"] == "c-123" + assert payload["resolved_url"].endswith("/customers") + + def test_patch_includes_record_id(self, configured_state, capsys): + state.format = "json" + with suppress(typer.Exit): + render_dry_run( + "PATCH", + "customers", + body={"phoneNumber": "+1"}, + record_id="abc-123", + ) + payload = _capture_json_payload(capsys) + assert payload["method"] == "PATCH" + assert payload["record_id"] == "abc-123" + assert payload["body"] == {"phoneNumber": "+1"} + + def test_delete_omits_body(self, configured_state, capsys): + state.format = "json" + with suppress(typer.Exit): + render_dry_run("DELETE", "customers", record_id="abc-123") + payload = _capture_json_payload(capsys) + assert payload["method"] == "DELETE" + assert payload["record_id"] == "abc-123" + assert "body" not in payload + + def test_extra_fields_merge_into_payload(self, configured_state, capsys): + """Helper accepts arbitrary extra fields — used by attach/batch.""" + state.format = "json" + with suppress(typer.Exit): + render_dry_run( + "UPLOAD", + "documentAttachments", + extra={"file_path": "/tmp/foo.pdf", "byte_size": 1234}, + ) + payload = _capture_json_payload(capsys) + assert payload["file_path"] == "/tmp/foo.pdf" + assert payload["byte_size"] == 1234 + + def test_method_normalised_to_upper_case(self, configured_state, capsys): + state.format = "json" + with suppress(typer.Exit): + render_dry_run("post", "items") + payload = _capture_json_payload(capsys) + assert payload["method"] == "POST" + + def test_ndjson_format_emits_single_line(self, configured_state, capsys): + state.format = "ndjson" + with suppress(typer.Exit): + render_dry_run("POST", "items", body={"x": 1}) + out = capsys.readouterr().out.strip() + assert "\n" not in out + assert json.loads(out)["dry_run"] is True + + +class TestHumanOutput: + def test_table_format_writes_yellow_warning_to_stderr( + self, configured_state, capsys + ): + state.format = "table" + with suppress(typer.Exit): + render_dry_run("POST", "items", body={"x": 1}) + captured = capsys.readouterr() + assert "POST items" in captured.err + assert "dry-run" in captured.err.lower() + + def test_human_format_includes_resolved_url(self, configured_state, capsys): + state.format = "table" + with suppress(typer.Exit): + render_dry_run("POST", "items", body={"x": 1}) + captured = capsys.readouterr() + # URL should appear somewhere in the human output + assert "api.example.test" in captured.err + + def test_human_format_renders_body_on_stdout(self, configured_state, capsys): + """The body itself is JSON to stdout so users can pipe it; the + framing chrome stays on stderr.""" + state.format = "table" + with suppress(typer.Exit): + render_dry_run("POST", "items", body={"alpha": "beta"}) + captured = capsys.readouterr() + assert "alpha" in captured.err or "alpha" in captured.out + + +class TestForceStandardResolution: + """When ``force_standard=True`` (e.g. ``bcli attach upload --standard``), + the dry-run preview URL must reflect the standard /api/v2.0/ route, not + whatever the registry might say. Otherwise the preview misleads users + about exactly the escape-hatch case the flag exists for.""" + + def test_force_standard_uses_v2_route(self, configured_state, capsys, monkeypatch): + # Stub make_async_client so the client's _resolve_url would lie if + # called. force_standard=True must bypass it entirely. + class _LyingClient: + def _resolve_url(self, *_a, **_kw): + return "https://example.test/CUSTOM/PATH/should-not-be-used" + + monkeypatch.setattr(state, "make_async_client", lambda **_: _LyingClient()) + state.format = "json" + with pytest.raises(typer.Exit): + render_dry_run( + "UPLOAD", "documentAttachments", + force_standard=True, + extra={"file_path": "/tmp/x"}, + ) + payload = _capture_json_payload(capsys) + assert payload["resolved_url"] is not None + assert "/api/v2.0/" in payload["resolved_url"] + assert "CUSTOM/PATH" not in payload["resolved_url"] + + +class TestResolutionFailure: + def test_failed_url_resolution_does_not_break_dry_run( + self, configured_state, capsys, monkeypatch + ): + """If the registry can't resolve the entity (e.g. typo, missing + custom registry entry), dry-run still emits — with resolved_url=None + — so the user sees what they asked for and can correct.""" + + def _failing_client(**_): + class _C: + def _resolve_url(self, *_a, **_kw): + raise RuntimeError("registry boom") + + return _C() + + monkeypatch.setattr(state, "make_async_client", _failing_client) + state.format = "json" + with suppress(typer.Exit): + render_dry_run("POST", "totallyMadeUpEntity", body={"x": 1}) + payload = _capture_json_payload(capsys) + assert payload["resolved_url"] is None + assert payload["endpoint"] == "totallyMadeUpEntity" + + +class TestExitBehaviour: + def test_always_exits_clean(self, configured_state): + state.format = "json" + with pytest.raises(typer.Exit) as excinfo: + render_dry_run("POST", "items", body={"x": 1}) + assert excinfo.value.exit_code == 0 + + def test_exits_clean_for_human_format_too(self, configured_state): + state.format = "table" + with pytest.raises(typer.Exit) as excinfo: + render_dry_run("DELETE", "items", record_id="x") + assert excinfo.value.exit_code == 0 diff --git a/tests/test_registry/test_caution.py b/tests/test_registry/test_caution.py new file mode 100644 index 0000000..6f4c201 --- /dev/null +++ b/tests/test_registry/test_caution.py @@ -0,0 +1,99 @@ +"""Tests for the EndpointMetadata.caution flag and verb-name heuristic. + +The flag tells agents (and humans driving in autocomplete-y ways) which +endpoints are likely to mutate posted/closed records and should be treated +with extra care. Default is ``low``. The heuristic infers ``high`` from +endpoint names containing common BC mutation verbs (post, release, cancel, +void, reverse, apply, unapply). Importers can also set ``caution`` directly +in JSON, in which case the explicit value wins over the heuristic. +""" + +from __future__ import annotations + +import pytest + +from bcli.registry._importers import _infer_caution +from bcli.registry._schema import EndpointMetadata + + +def test_caution_defaults_to_low() -> None: + meta = EndpointMetadata(entity_set_name="customers") + assert meta.caution == "low" + + +def test_caution_round_trips_through_schema() -> None: + meta = EndpointMetadata(entity_set_name="salesInvoicePost", caution="high") + payload = meta.model_dump() + assert payload["caution"] == "high" + restored = EndpointMetadata.model_validate(payload) + assert restored.caution == "high" + + +def test_caution_accepts_low_medium_high() -> None: + for level in ("low", "medium", "high"): + meta = EndpointMetadata(entity_set_name="x", caution=level) + assert meta.caution == level + + +def test_caution_rejects_unknown_levels() -> None: + with pytest.raises(ValueError): + EndpointMetadata(entity_set_name="x", caution="extreme") + + +@pytest.mark.parametrize( + "name", + [ + "salesInvoicePost", + "purchaseInvoicePost", + "salesOrderRelease", + "salesOrderCancel", + "journalVoid", + "paymentReverse", + "customerLedgerEntryApply", + "customerLedgerEntryUnapply", + # case-insensitive: still high + "SALESINVOICEPOST", + "salesinvoiceCANCEL", + ], +) +def test_infer_caution_high_for_mutation_verbs(name: str) -> None: + assert _infer_caution(name) == "high" + + +@pytest.mark.parametrize( + "name", + [ + "customers", + "items", + "vendors", + "salesInvoices", + "purchaseInvoices", + "companies", + "currencies", + ], +) +def test_infer_caution_low_for_plain_entities(name: str) -> None: + assert _infer_caution(name) == "low" + + +@pytest.mark.parametrize( + "name", + [ + # "post" inside another word boundary should NOT trip — "postal" + # is not a mutation verb. + "postalCodes", + # "applied" embedded in a noun-y entity name shouldn't escalate + # without a clear verb structure. + "appliedFilterMetadata", + ], +) +def test_infer_caution_does_not_escalate_for_lookalike_words(name: str) -> None: + assert _infer_caution(name) == "low" + + +def test_caution_explicit_overrides_inferred() -> None: + """Importers may set caution explicitly; the heuristic shouldn't overwrite.""" + # Even if the name would heuristically be "high", an explicit "medium" + # in the source should be preserved through schema validation. + meta = EndpointMetadata(entity_set_name="salesInvoicePost", caution="medium") + assert meta.caution == "medium" diff --git a/uv.lock b/uv.lock index d5f2f41..f62778e 100644 --- a/uv.lock +++ b/uv.lock @@ -302,7 +302,7 @@ wheels = [ [[package]] name = "bc-cli" -version = "0.1.5" +version = "0.2.0" source = { editable = "." } dependencies = [ { name = "httpx" },