diff --git a/CHANGELOG.md b/CHANGELOG.md index e389360..632b8fb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,28 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [0.6.0] - 2026-05-25 + +### Changed — ETL stampers are now pluggable (BREAKING) + +- **`bcli etl sync` no longer injects audit/metadata columns by default.** + Output is a clean record shape; any extra columns are opt-in. +- **New `bcli.etl.stampers` entry-point group.** A plugin exposes a zero-arg + callable returning a `Stamper` (`Callable[[list[dict]], list[dict]]` — a + per-page row transform). The operator opts in by name via the new + `[etl] stampers = ["..."]` config, or per-run with `bcli etl sync + --stamper NAME` (repeatable). Unknown names are skipped with a warning; + one broken plugin never aborts a sync. Mirrors the dispatch shape of + the `bcli.telemetry` / `bcli.ask` factories. +- **New `EtlConfig` (`[etl]` config section)** with a `stampers: list[str]` + field, wired into `BCConfig`. +- **`bcli_profile()` drops its built-in audit-column flag** in favour of + the generic `stampers=[...]` argument (entry-point names) / `[etl] + stampers` config. The generic `audit_stamper` / `company_id_stamper` + helpers remain. Migration: if you relied on the previous default audit + columns, install a package that registers the matching stamper plugin + and add its name to `[etl] stampers`. + ## [0.5.0] - 2026-05-25 ### Added — Part 3 (`bcli-site/` landing page v0) diff --git a/pyproject.toml b/pyproject.toml index ad299fd..a852eae 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -9,7 +9,7 @@ build-backend = "hatchling.build" # installed CLI binary (`bcli`) are unaffected — only `pip install` / # `uv tool install` use this name. name = "bc-cli" -version = "0.5.0" +version = "0.6.0" description = "Python SDK and CLI for Microsoft Dynamics 365 Business Central APIs" readme = "README.md" license = "Apache-2.0" diff --git a/src/bcli/config/_model.py b/src/bcli/config/_model.py index 4d94aee..2775357 100644 --- a/src/bcli/config/_model.py +++ b/src/bcli/config/_model.py @@ -304,6 +304,27 @@ class ContextConfig(BaseModel): model_config = {"extra": "allow"} +class EtlConfig(BaseModel): + """Settings for ``bcli etl`` — the dlt-based extract pipeline. + + ``stampers`` lists entry-point names registered under the + ``bcli.etl.stampers`` group that should post-process every page of + records before dlt ingests them (sync timestamps, soft-delete flags, + vendor-specific audit columns, …). Applied in the order given. + + The package ships no audit-column stampers; the list is empty by + default so output stays a clean record shape. A downstream package + registers a stamper under the ``bcli.etl.stampers`` group and the + operator opts in by name, e.g. ``stampers = ["audit"]``. Unknown + names are skipped with a warning — see + :mod:`bcli.etl._stamper_factory`. + """ + + stampers: list[str] = Field(default_factory=list) + + model_config = {"extra": "allow"} + + class BCConfig(BaseModel): """Top-level configuration.""" @@ -314,6 +335,7 @@ class BCConfig(BaseModel): extract: ExtractConfig = Field(default_factory=ExtractConfig) context: ContextConfig = Field(default_factory=ContextConfig) ask: AskConfig = Field(default_factory=AskConfig) + etl: EtlConfig = Field(default_factory=EtlConfig) model_config = {"extra": "allow"} diff --git a/src/bcli/etl/__init__.py b/src/bcli/etl/__init__.py index ba290e9..585f01f 100644 --- a/src/bcli/etl/__init__.py +++ b/src/bcli/etl/__init__.py @@ -6,18 +6,19 @@ and an explicit entity list. No bcli coupling. - :func:`bcli_profile` — bridge that reads entities from a bcli profile's - registry and reuses bcli's authenticated session. Defaults match Fivetran - behavior (multi-company, Fivetran audit columns). + registry and reuses bcli's authenticated session. Multi-company on by + default; audit columns are opt-in via the ``bcli.etl.stampers`` + entry-point group + ``[etl] stampers`` config (vendor-neutral by default). Example — standalone: - >>> from bcli.etl import business_central, EntityDef, fivetran_stamper + >>> from bcli.etl import business_central, EntityDef, audit_stamper >>> source = business_central( ... tenant_id="...", client_id="...", client_secret="...", ... environment="Production", ... entities=[EntityDef(name="customers")], ... multi_company=True, - ... stampers=[fivetran_stamper()], + ... stampers=[audit_stamper("bc-prod")], ... ) Example — bcli bridge: @@ -34,11 +35,11 @@ from bcli.etl._bridge import bcli_profile, load_entities_from_bcli_registry from bcli.etl._generic import EntityDef, business_central from bcli.etl._polaris import PolarisConfig, register_load_with_polaris +from bcli.etl._stamper_factory import build_stampers, discover_stamper_factories from bcli.etl._stampers import ( Stamper, audit_stamper, company_id_stamper, - fivetran_stamper, ) __all__ = [ @@ -51,9 +52,11 @@ "StaticTokenAuth", # Stampers "Stamper", - "fivetran_stamper", "audit_stamper", "company_id_stamper", + # Stamper plugin discovery + "build_stampers", + "discover_stamper_factories", # bcli bridge "bcli_profile", "load_entities_from_bcli_registry", diff --git a/src/bcli/etl/_bridge.py b/src/bcli/etl/_bridge.py index ad95ce9..6aae958 100644 --- a/src/bcli/etl/_bridge.py +++ b/src/bcli/etl/_bridge.py @@ -11,7 +11,8 @@ from bcli.etl._auth import StaticTokenAuth from bcli.etl._generic import EntityDef, business_central as _generic_business_central -from bcli.etl._stampers import Stamper, fivetran_stamper +from bcli.etl._stamper_factory import build_stampers +from bcli.etl._stampers import Stamper def load_entities_from_bcli_registry( @@ -59,24 +60,29 @@ def bcli_profile( entities: list[str] | None = None, full_refresh: bool = False, multi_company: bool = True, - fivetran_compat: bool = True, include_standard: bool = False, + stampers: list[str] | None = None, extra_stampers: list[Stamper] | None = None, ) -> Any: """dlt source using a bcli profile's registry + auth. - Defaults match Fivetran parity: multi-company on, Fivetran - audit columns on. Pass ``fivetran_compat=False`` for a cleaner record shape - in new downstream models. + Output is a clean record shape by default — no audit/metadata + columns. Those come from stampers registered under the + ``bcli.etl.stampers`` entry-point group and opted into per config + or via the ``stampers`` argument. Args: profile: bcli profile name (from ``~/.config/bcli/config.toml``). entities: Restrict to these entity names. Default: all custom endpoints. full_refresh: Ignore incremental cursor. - multi_company: Iterate across all companies (Fivetran behavior). - fivetran_compat: Add ``_fivetran_synced`` / ``_fivetran_deleted`` columns. + multi_company: Iterate across all companies, adding a ``company_id`` + column to every record. include_standard: Include standard v2.0 entities in addition to custom. - extra_stampers: Optional extra stampers applied after the built-ins. + stampers: Entry-point names (group ``bcli.etl.stampers``) to apply. + Overrides ``[etl] stampers`` config when provided; pass ``[]`` to + force a clean shape regardless of config. ``None`` (default) reads + the config list. + extra_stampers: Programmatic stampers applied after the named ones. Returns: A dlt source ready to pass to ``pipeline.run(...)``. @@ -102,12 +108,12 @@ def bcli_profile( ) all_entities = [e for e in all_entities if e.name in name_set] - # Build stampers list - stampers: list[Stamper] = [] - if fivetran_compat: - stampers.append(fivetran_stamper()) + # Build stampers from registered plugins. Explicit `stampers=` overrides + # config; otherwise read the opt-in `[etl] stampers` list. + names = stampers if stampers is not None else list(config.etl.stampers) + resolved_stampers: list[Stamper] = build_stampers(names) if extra_stampers: - stampers.extend(extra_stampers) + resolved_stampers.extend(extra_stampers) # Wrap bcli's auth as an AuthProvider auth = StaticTokenAuth(_build_token_provider(profile)) @@ -117,6 +123,6 @@ def bcli_profile( environment=environment, entities=all_entities, multi_company=multi_company, - stampers=stampers, + stampers=resolved_stampers, full_refresh=full_refresh, ) diff --git a/src/bcli/etl/_generic.py b/src/bcli/etl/_generic.py index b816b29..2172d1d 100644 --- a/src/bcli/etl/_generic.py +++ b/src/bcli/etl/_generic.py @@ -189,7 +189,7 @@ def business_central( multi_company: If ``True``, iterate through every company returned by ``/companies`` and extract each entity per company. Adds a ``company_id`` column to every record. - stampers: Optional post-processing hooks (e.g. ``fivetran_stamper()``). + stampers: Optional post-processing hooks (e.g. ``audit_stamper()``). Defaults to an empty list. full_refresh: If ``True``, ignore the incremental cursor. """ diff --git a/src/bcli/etl/_stamper_factory.py b/src/bcli/etl/_stamper_factory.py new file mode 100644 index 0000000..95974d9 --- /dev/null +++ b/src/bcli/etl/_stamper_factory.py @@ -0,0 +1,102 @@ +"""Pluggable stamper discovery for the BC ETL source. + +OSS ships the *mechanism*; downstream packages register vendor-specific +audit columns. A third-party package exposes a zero-arg callable that +returns a :data:`~bcli.etl._stampers.Stamper` and advertises it under +the ``bcli.etl.stampers`` entry-point group:: + + [project.entry-points."bcli.etl.stampers"] + audit = "my_pkg.etl:audit_stamper" + +The user opts in by name via ``[etl] stampers = ["audit"]`` in +``~/.config/bcli/config.toml``. :func:`build_stampers` resolves that +name list to concrete stampers, applied in the order given. + +Mirrors the dispatch shape of :mod:`bcli.telemetry._factory` and +:mod:`bcli.ask._providers`: an unknown name or a failing factory logs a +warning and is skipped — one broken plugin never aborts a sync. + +This module is part of the generic layer and must not import from bcli.*. +""" + +from __future__ import annotations + +import logging +from importlib.metadata import entry_points +from typing import Callable + +from bcli.etl._stampers import Stamper + +logger = logging.getLogger("bcli.etl") + +ENTRYPOINT_GROUP = "bcli.etl.stampers" + +# A factory is a zero-arg callable returning a Stamper. +StamperFactory = Callable[[], Stamper] + + +def discover_stamper_factories() -> dict[str, StamperFactory]: + """Return ``{name: factory}`` for every registered ``bcli.etl.stampers``. + + A factory that fails to load logs a warning and is skipped. + """ + out: dict[str, StamperFactory] = {} + for ep in _iter_entrypoints(): + try: + factory = ep.load() + except Exception as exc: # noqa: BLE001 + logger.warning( + "bcli.etl.stampers entry-point %r failed to load: %s", + ep.name, exc, + ) + continue + if not callable(factory): + logger.warning( + "bcli.etl.stampers entry-point %r is not callable; skipping.", + ep.name, + ) + continue + out[ep.name] = factory + return out + + +def build_stampers(names: list[str]) -> list[Stamper]: + """Resolve a list of entry-point names to concrete stampers, in order. + + Unknown names and factories that raise are logged and skipped so a + single misconfigured plugin can't abort the whole sync. + """ + if not names: + return [] + available = discover_stamper_factories() + out: list[Stamper] = [] + for name in names: + factory = available.get(name) + if factory is None: + logger.warning( + "ETL stamper %r is not registered (available: %s); skipping.", + name, sorted(available) or "none", + ) + continue + try: + out.append(factory()) + except Exception as exc: # noqa: BLE001 + logger.warning( + "ETL stamper %r factory raised %s; skipping.", name, exc, + ) + return out + + +def _iter_entrypoints(): + try: + yield from entry_points(group=ENTRYPOINT_GROUP) + except Exception: # pragma: no cover — defensive + return + + +__all__ = [ + "ENTRYPOINT_GROUP", + "StamperFactory", + "build_stampers", + "discover_stamper_factories", +] diff --git a/src/bcli/etl/_stampers.py b/src/bcli/etl/_stampers.py index 8add643..bb7973b 100644 --- a/src/bcli/etl/_stampers.py +++ b/src/bcli/etl/_stampers.py @@ -2,7 +2,12 @@ Stampers are post-processing functions applied to each page of records before dlt ingests them. They add metadata columns (sync timestamps, -source identifiers, soft-delete flags) for downstream compatibility. +source identifiers, etc.) for downstream compatibility. + +This package ships only vendor-neutral stampers. Vendor-specific audit +conventions live in downstream packages and register through the +``bcli.etl.stampers`` entry-point group — see +:mod:`bcli.etl._stamper_factory`. This module is part of the generic layer and must not import from bcli.*. """ @@ -15,33 +20,8 @@ Stamper = Callable[[list[dict[str, Any]]], list[dict[str, Any]]] -def fivetran_stamper() -> Stamper: - """Add Fivetran-compatible audit columns to every record. - - Adds: - - ``_fivetran_synced``: ISO-8601 UTC timestamp of when the record was synced. - - ``_fivetran_deleted``: always ``False`` (soft-delete flag; BC doesn't - expose deletions, so downstream models should filter on this anyway). - - Use this when migrating from or coexisting with Fivetran. Downstream - dbt models that reference these columns keep working unchanged. - """ - - def _stamp(page: list[dict[str, Any]]) -> list[dict[str, Any]]: - synced_at = datetime.now(timezone.utc).isoformat() - return [ - {**record, "_fivetran_synced": synced_at, "_fivetran_deleted": False} - for record in page - ] - - return _stamp - - def audit_stamper(source_name: str) -> Stamper: - """Add a generic audit trail (`_synced_at`, `_source`) to every record. - - Use this for new pipelines not tied to Fivetran conventions. - """ + """Add a generic audit trail (`_synced_at`, `_source`) to every record.""" def _stamp(page: list[dict[str, Any]]) -> list[dict[str, Any]]: synced_at = datetime.now(timezone.utc).isoformat() diff --git a/src/bcli_cli/commands/etl_cmd.py b/src/bcli_cli/commands/etl_cmd.py index 2a99898..decf7a0 100644 --- a/src/bcli_cli/commands/etl_cmd.py +++ b/src/bcli_cli/commands/etl_cmd.py @@ -68,6 +68,7 @@ def sync( full_refresh: bool = typer.Option(False, "--full-refresh", help="Ignore cursor, reload everything"), include_standard: bool = typer.Option(False, "--include-standard", help="Also sync standard v2.0 entities"), file_format: str = typer.Option("jsonl", "--file-format", help="Filesystem loader file format: jsonl or parquet"), + stamper: Optional[list[str]] = typer.Option(None, "--stamper", help="ETL stamper plugin name to apply (repeatable). Overrides the [etl] stampers config. Plugins register under the bcli.etl.stampers entry-point group."), polaris_uri: Optional[str] = typer.Option(None, "--polaris-uri", envvar="BCLI_POLARIS_URI", help="Polaris REST catalog URI. Enables post-sync Iceberg registration."), polaris_warehouse: Optional[str] = typer.Option(None, "--polaris-warehouse", envvar="BCLI_POLARIS_WAREHOUSE", help="Polaris catalog (warehouse) name"), polaris_credential: Optional[str] = typer.Option(None, "--polaris-credential", envvar="BCLI_POLARIS_CREDENTIAL", help="Polaris OAuth credential in 'client_id:client_secret' form"), @@ -76,7 +77,10 @@ def sync( """Extract Business Central data and load to a destination via dlt. By default syncs all custom API endpoints from the registry for the active - profile. Standard v2.0 entities are skipped (typically handled by Fivetran). + profile; pass --include-standard to also sync standard v2.0 entities. + Output is vendor-neutral by default — audit columns (sync timestamp, + soft-delete flags) are opt-in via ETL stamper plugins (--stamper NAME or + the [etl] stampers config). \b Examples: @@ -130,6 +134,7 @@ def sync( entities=entity_list, full_refresh=full_refresh, include_standard=include_standard, + stampers=list(stamper) if stamper else None, ) if destination == "filesystem": diff --git a/tests/test_etl/test_generic.py b/tests/test_etl/test_generic.py index ec66cc8..fa033b2 100644 --- a/tests/test_etl/test_generic.py +++ b/tests/test_etl/test_generic.py @@ -167,8 +167,9 @@ def test_builds_with_credentials(self): class TestGenericHasNoBcliCoupling: """The generic layer must be importable with only bcli.etl modules loaded. - If any of _generic.py, _client.py, _auth.py, _stampers.py imports from - bcli.registry, bcli.client, bcli.config, or bcli.auth, this test fails. + If any of _generic.py, _client.py, _auth.py, _stampers.py, + _stamper_factory.py imports from bcli.registry, bcli.client, + bcli.config, or bcli.auth, this test fails. """ def test_generic_does_not_import_bcli_sdk(self): @@ -181,7 +182,7 @@ def test_generic_does_not_import_bcli_sdk(self): ).parent forbidden_prefixes = ("bcli.registry", "bcli.client", "bcli.config", "bcli.auth", "bcli.errors") - for module_name in ("_generic.py", "_client.py", "_auth.py", "_stampers.py"): + for module_name in ("_generic.py", "_client.py", "_auth.py", "_stampers.py", "_stamper_factory.py"): path = etl_dir / module_name tree = ast.parse(path.read_text()) for node in ast.walk(tree): diff --git a/tests/test_etl/test_stamper_factory.py b/tests/test_etl/test_stamper_factory.py new file mode 100644 index 0000000..bf3070a --- /dev/null +++ b/tests/test_etl/test_stamper_factory.py @@ -0,0 +1,83 @@ +"""ETL stamper plugin discovery + resolution (bcli.etl.stampers group).""" + +from __future__ import annotations + +import pytest + +pytest.importorskip("dlt") + +from bcli.etl import _stamper_factory as mod +from bcli.etl._stamper_factory import build_stampers, discover_stamper_factories + + +def _fake_audit_factory(): + def _stamp(page): + return [{**rec, "_synced": "ts", "_deleted": False} for rec in page] + + return _stamp + + +def _broken_factory(): + raise RuntimeError("boom") + + +def _patch_factories(monkeypatch, mapping): + """Force discover_stamper_factories() to return our test mapping.""" + monkeypatch.setattr(mod, "discover_stamper_factories", lambda: dict(mapping)) + + +def test_empty_names_is_noop(monkeypatch): + _patch_factories(monkeypatch, {}) + assert build_stampers([]) == [] + + +def test_resolves_named_stamper_in_order(monkeypatch): + def _a(): + return lambda page: [{**r, "a": 1} for r in page] + + def _b(): + return lambda page: [{**r, "b": 2} for r in page] + + _patch_factories(monkeypatch, {"a": _a, "b": _b}) + stampers = build_stampers(["b", "a"]) + assert len(stampers) == 2 + # Applied in the requested order: b then a. + out = stampers[0]([{"id": "1"}]) + assert out[0]["b"] == 2 + + +def test_audit_style_plugin(monkeypatch): + _patch_factories(monkeypatch, {"audit": _fake_audit_factory}) + stampers = build_stampers(["audit"]) + out = stampers[0]([{"id": "1"}]) + assert out[0]["_synced"] == "ts" + assert out[0]["_deleted"] is False + + +def test_unknown_name_skipped_with_warning(monkeypatch, caplog): + _patch_factories(monkeypatch, {"known": _fake_audit_factory}) + with caplog.at_level("WARNING"): + stampers = build_stampers(["nope"]) + assert stampers == [] + assert any("not registered" in r.message for r in caplog.records) + + +def test_failing_factory_skipped(monkeypatch, caplog): + _patch_factories(monkeypatch, {"broken": _broken_factory}) + with caplog.at_level("WARNING"): + stampers = build_stampers(["broken"]) + assert stampers == [] + assert any("raised" in r.message for r in caplog.records) + + +def test_discover_skips_non_callable(monkeypatch): + """A non-callable entry point is dropped, not returned.""" + + class _FakeEP: + name = "bad" + + def load(self): + return "not-callable" + + monkeypatch.setattr(mod, "_iter_entrypoints", lambda: iter([_FakeEP()])) + assert discover_stamper_factories() == {} diff --git a/tests/test_etl/test_stampers.py b/tests/test_etl/test_stampers.py index d0f521c..69a1942 100644 --- a/tests/test_etl/test_stampers.py +++ b/tests/test_etl/test_stampers.py @@ -12,37 +12,9 @@ apply_stampers, audit_stamper, company_id_stamper, - fivetran_stamper, ) -class TestFivetranStamper: - def test_adds_both_fields(self): - stamp = fivetran_stamper() - out = stamp([{"id": "1", "name": "Acme"}, {"id": "2", "name": "Globex"}]) - for record in out: - assert "_fivetran_synced" in record - assert record["_fivetran_deleted"] is False - - def test_preserves_existing_fields(self): - stamp = fivetran_stamper() - out = stamp([{"id": "1", "name": "Acme"}]) - assert out[0]["id"] == "1" - assert out[0]["name"] == "Acme" - - def test_does_not_mutate_input(self): - stamp = fivetran_stamper() - src = [{"id": "1"}] - stamp(src) - assert "_fivetran_synced" not in src[0] - - def test_synced_is_iso_timestamp(self): - stamp = fivetran_stamper() - out = stamp([{"id": "1"}]) - # Parseable as ISO timestamp - datetime.fromisoformat(out[0]["_fivetran_synced"]) - - class TestAuditStamper: def test_adds_synced_at_and_source(self): stamp = audit_stamper("test-source") @@ -50,6 +22,17 @@ def test_adds_synced_at_and_source(self): assert "_synced_at" in out[0] assert out[0]["_source"] == "test-source" + def test_synced_at_is_iso_timestamp(self): + stamp = audit_stamper("test-source") + out = stamp([{"id": "1"}]) + datetime.fromisoformat(out[0]["_synced_at"]) + + def test_does_not_mutate_input(self): + stamp = audit_stamper("test-source") + src = [{"id": "1"}] + stamp(src) + assert "_synced_at" not in src[0] + class TestCompanyIdStamper: def test_injects_company_id(self): diff --git a/uv.lock b/uv.lock index e780914..5de2b3d 100644 --- a/uv.lock +++ b/uv.lock @@ -321,7 +321,7 @@ wheels = [ [[package]] name = "bc-cli" -version = "0.5.0" +version = "0.6.0" source = { editable = "." } dependencies = [ { name = "httpx" },