Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,28 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

## [0.6.0] - 2026-05-25

### Changed — ETL stampers are now pluggable (BREAKING)

- **`bcli etl sync` no longer injects audit/metadata columns by default.**
Output is a clean record shape; any extra columns are opt-in.
- **New `bcli.etl.stampers` entry-point group.** A plugin exposes a zero-arg
callable returning a `Stamper` (`Callable[[list[dict]], list[dict]]` — a
per-page row transform). The operator opts in by name via the new
`[etl] stampers = ["..."]` config, or per-run with `bcli etl sync
--stamper NAME` (repeatable). Unknown names are skipped with a warning;
one broken plugin never aborts a sync. Mirrors the dispatch shape of
the `bcli.telemetry` / `bcli.ask` factories.
- **New `EtlConfig` (`[etl]` config section)** with a `stampers: list[str]`
field, wired into `BCConfig`.
- **`bcli_profile()` drops its built-in audit-column flag** in favour of
the generic `stampers=[...]` argument (entry-point names) / `[etl]
stampers` config. The generic `audit_stamper` / `company_id_stamper`
helpers remain. Migration: if you relied on the previous default audit
columns, install a package that registers the matching stamper plugin
and add its name to `[etl] stampers`.

## [0.5.0] - 2026-05-25

### Added — Part 3 (`bcli-site/` landing page v0)
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ build-backend = "hatchling.build"
# installed CLI binary (`bcli`) are unaffected — only `pip install` /
# `uv tool install` use this name.
name = "bc-cli"
version = "0.5.0"
version = "0.6.0"
description = "Python SDK and CLI for Microsoft Dynamics 365 Business Central APIs"
readme = "README.md"
license = "Apache-2.0"
Expand Down
22 changes: 22 additions & 0 deletions src/bcli/config/_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,27 @@ class ContextConfig(BaseModel):
model_config = {"extra": "allow"}


class EtlConfig(BaseModel):
"""Settings for ``bcli etl`` — the dlt-based extract pipeline.

``stampers`` lists entry-point names registered under the
``bcli.etl.stampers`` group that should post-process every page of
records before dlt ingests them (sync timestamps, soft-delete flags,
vendor-specific audit columns, …). Applied in the order given.

The package ships no audit-column stampers; the list is empty by
default so output stays a clean record shape. A downstream package
registers a stamper under the ``bcli.etl.stampers`` group and the
operator opts in by name, e.g. ``stampers = ["audit"]``. Unknown
names are skipped with a warning — see
:mod:`bcli.etl._stamper_factory`.
"""

stampers: list[str] = Field(default_factory=list)

model_config = {"extra": "allow"}


class BCConfig(BaseModel):
"""Top-level configuration."""

Expand All @@ -314,6 +335,7 @@ class BCConfig(BaseModel):
extract: ExtractConfig = Field(default_factory=ExtractConfig)
context: ContextConfig = Field(default_factory=ContextConfig)
ask: AskConfig = Field(default_factory=AskConfig)
etl: EtlConfig = Field(default_factory=EtlConfig)

model_config = {"extra": "allow"}

Expand Down
15 changes: 9 additions & 6 deletions src/bcli/etl/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,19 @@
and an explicit entity list. No bcli coupling.

- :func:`bcli_profile` — bridge that reads entities from a bcli profile's
registry and reuses bcli's authenticated session. Defaults match Fivetran
behavior (multi-company, Fivetran audit columns).
registry and reuses bcli's authenticated session. Multi-company on by
default; audit columns are opt-in via the ``bcli.etl.stampers``
entry-point group + ``[etl] stampers`` config (vendor-neutral by default).

Example — standalone:

>>> from bcli.etl import business_central, EntityDef, fivetran_stamper
>>> from bcli.etl import business_central, EntityDef, audit_stamper
>>> source = business_central(
... tenant_id="...", client_id="...", client_secret="...",
... environment="Production",
... entities=[EntityDef(name="customers")],
... multi_company=True,
... stampers=[fivetran_stamper()],
... stampers=[audit_stamper("bc-prod")],
... )

Example — bcli bridge:
Expand All @@ -34,11 +35,11 @@
from bcli.etl._bridge import bcli_profile, load_entities_from_bcli_registry
from bcli.etl._generic import EntityDef, business_central
from bcli.etl._polaris import PolarisConfig, register_load_with_polaris
from bcli.etl._stamper_factory import build_stampers, discover_stamper_factories
from bcli.etl._stampers import (
Stamper,
audit_stamper,
company_id_stamper,
fivetran_stamper,
)

__all__ = [
Expand All @@ -51,9 +52,11 @@
"StaticTokenAuth",
# Stampers
"Stamper",
"fivetran_stamper",
"audit_stamper",
"company_id_stamper",
# Stamper plugin discovery
"build_stampers",
"discover_stamper_factories",
# bcli bridge
"bcli_profile",
"load_entities_from_bcli_registry",
Expand Down
34 changes: 20 additions & 14 deletions src/bcli/etl/_bridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@

from bcli.etl._auth import StaticTokenAuth
from bcli.etl._generic import EntityDef, business_central as _generic_business_central
from bcli.etl._stampers import Stamper, fivetran_stamper
from bcli.etl._stamper_factory import build_stampers
from bcli.etl._stampers import Stamper


def load_entities_from_bcli_registry(
Expand Down Expand Up @@ -59,24 +60,29 @@ def bcli_profile(
entities: list[str] | None = None,
full_refresh: bool = False,
multi_company: bool = True,
fivetran_compat: bool = True,
include_standard: bool = False,
stampers: list[str] | None = None,
extra_stampers: list[Stamper] | None = None,
) -> Any:
"""dlt source using a bcli profile's registry + auth.

Defaults match Fivetran parity: multi-company on, Fivetran
audit columns on. Pass ``fivetran_compat=False`` for a cleaner record shape
in new downstream models.
Output is a clean record shape by default — no audit/metadata
columns. Those come from stampers registered under the
``bcli.etl.stampers`` entry-point group and opted into per config
or via the ``stampers`` argument.

Args:
profile: bcli profile name (from ``~/.config/bcli/config.toml``).
entities: Restrict to these entity names. Default: all custom endpoints.
full_refresh: Ignore incremental cursor.
multi_company: Iterate across all companies (Fivetran behavior).
fivetran_compat: Add ``_fivetran_synced`` / ``_fivetran_deleted`` columns.
multi_company: Iterate across all companies, adding a ``company_id``
column to every record.
include_standard: Include standard v2.0 entities in addition to custom.
extra_stampers: Optional extra stampers applied after the built-ins.
stampers: Entry-point names (group ``bcli.etl.stampers``) to apply.
Overrides ``[etl] stampers`` config when provided; pass ``[]`` to
force a clean shape regardless of config. ``None`` (default) reads
the config list.
extra_stampers: Programmatic stampers applied after the named ones.

Returns:
A dlt source ready to pass to ``pipeline.run(...)``.
Expand All @@ -102,12 +108,12 @@ def bcli_profile(
)
all_entities = [e for e in all_entities if e.name in name_set]

# Build stampers list
stampers: list[Stamper] = []
if fivetran_compat:
stampers.append(fivetran_stamper())
# Build stampers from registered plugins. Explicit `stampers=` overrides
# config; otherwise read the opt-in `[etl] stampers` list.
names = stampers if stampers is not None else list(config.etl.stampers)
resolved_stampers: list[Stamper] = build_stampers(names)
if extra_stampers:
stampers.extend(extra_stampers)
resolved_stampers.extend(extra_stampers)

# Wrap bcli's auth as an AuthProvider
auth = StaticTokenAuth(_build_token_provider(profile))
Expand All @@ -117,6 +123,6 @@ def bcli_profile(
environment=environment,
entities=all_entities,
multi_company=multi_company,
stampers=stampers,
stampers=resolved_stampers,
full_refresh=full_refresh,
)
2 changes: 1 addition & 1 deletion src/bcli/etl/_generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ def business_central(
multi_company: If ``True``, iterate through every company returned by
``/companies`` and extract each entity per company. Adds a
``company_id`` column to every record.
stampers: Optional post-processing hooks (e.g. ``fivetran_stamper()``).
stampers: Optional post-processing hooks (e.g. ``audit_stamper()``).
Defaults to an empty list.
full_refresh: If ``True``, ignore the incremental cursor.
"""
Expand Down
102 changes: 102 additions & 0 deletions src/bcli/etl/_stamper_factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
"""Pluggable stamper discovery for the BC ETL source.

OSS ships the *mechanism*; downstream packages register vendor-specific
audit columns. A third-party package exposes a zero-arg callable that
returns a :data:`~bcli.etl._stampers.Stamper` and advertises it under
the ``bcli.etl.stampers`` entry-point group::

[project.entry-points."bcli.etl.stampers"]
audit = "my_pkg.etl:audit_stamper"

The user opts in by name via ``[etl] stampers = ["audit"]`` in
``~/.config/bcli/config.toml``. :func:`build_stampers` resolves that
name list to concrete stampers, applied in the order given.

Mirrors the dispatch shape of :mod:`bcli.telemetry._factory` and
:mod:`bcli.ask._providers`: an unknown name or a failing factory logs a
warning and is skipped — one broken plugin never aborts a sync.

This module is part of the generic layer and must not import from bcli.*.
"""

from __future__ import annotations

import logging
from importlib.metadata import entry_points
from typing import Callable

from bcli.etl._stampers import Stamper

logger = logging.getLogger("bcli.etl")

ENTRYPOINT_GROUP = "bcli.etl.stampers"

# A factory is a zero-arg callable returning a Stamper.
StamperFactory = Callable[[], Stamper]


def discover_stamper_factories() -> dict[str, StamperFactory]:
"""Return ``{name: factory}`` for every registered ``bcli.etl.stampers``.

A factory that fails to load logs a warning and is skipped.
"""
out: dict[str, StamperFactory] = {}
for ep in _iter_entrypoints():
try:
factory = ep.load()
except Exception as exc: # noqa: BLE001
logger.warning(
"bcli.etl.stampers entry-point %r failed to load: %s",
ep.name, exc,
)
continue
if not callable(factory):
logger.warning(
"bcli.etl.stampers entry-point %r is not callable; skipping.",
ep.name,
)
continue
out[ep.name] = factory
return out


def build_stampers(names: list[str]) -> list[Stamper]:
"""Resolve a list of entry-point names to concrete stampers, in order.

Unknown names and factories that raise are logged and skipped so a
single misconfigured plugin can't abort the whole sync.
"""
if not names:
return []
available = discover_stamper_factories()
out: list[Stamper] = []
for name in names:
factory = available.get(name)
if factory is None:
logger.warning(
"ETL stamper %r is not registered (available: %s); skipping.",
name, sorted(available) or "none",
)
continue
try:
out.append(factory())
except Exception as exc: # noqa: BLE001
logger.warning(
"ETL stamper %r factory raised %s; skipping.", name, exc,
)
return out


def _iter_entrypoints():
try:
yield from entry_points(group=ENTRYPOINT_GROUP)
except Exception: # pragma: no cover — defensive
return


__all__ = [
"ENTRYPOINT_GROUP",
"StamperFactory",
"build_stampers",
"discover_stamper_factories",
]
34 changes: 7 additions & 27 deletions src/bcli/etl/_stampers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@

Stampers are post-processing functions applied to each page of records
before dlt ingests them. They add metadata columns (sync timestamps,
source identifiers, soft-delete flags) for downstream compatibility.
source identifiers, etc.) for downstream compatibility.

This package ships only vendor-neutral stampers. Vendor-specific audit
conventions live in downstream packages and register through the
``bcli.etl.stampers`` entry-point group — see
:mod:`bcli.etl._stamper_factory`.

This module is part of the generic layer and must not import from bcli.*.
"""
Expand All @@ -15,33 +20,8 @@
Stamper = Callable[[list[dict[str, Any]]], list[dict[str, Any]]]


def fivetran_stamper() -> Stamper:
"""Add Fivetran-compatible audit columns to every record.

Adds:
- ``_fivetran_synced``: ISO-8601 UTC timestamp of when the record was synced.
- ``_fivetran_deleted``: always ``False`` (soft-delete flag; BC doesn't
expose deletions, so downstream models should filter on this anyway).

Use this when migrating from or coexisting with Fivetran. Downstream
dbt models that reference these columns keep working unchanged.
"""

def _stamp(page: list[dict[str, Any]]) -> list[dict[str, Any]]:
synced_at = datetime.now(timezone.utc).isoformat()
return [
{**record, "_fivetran_synced": synced_at, "_fivetran_deleted": False}
for record in page
]

return _stamp


def audit_stamper(source_name: str) -> Stamper:
"""Add a generic audit trail (`_synced_at`, `_source`) to every record.

Use this for new pipelines not tied to Fivetran conventions.
"""
"""Add a generic audit trail (`_synced_at`, `_source`) to every record."""

def _stamp(page: list[dict[str, Any]]) -> list[dict[str, Any]]:
synced_at = datetime.now(timezone.utc).isoformat()
Expand Down
7 changes: 6 additions & 1 deletion src/bcli_cli/commands/etl_cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ def sync(
full_refresh: bool = typer.Option(False, "--full-refresh", help="Ignore cursor, reload everything"),
include_standard: bool = typer.Option(False, "--include-standard", help="Also sync standard v2.0 entities"),
file_format: str = typer.Option("jsonl", "--file-format", help="Filesystem loader file format: jsonl or parquet"),
stamper: Optional[list[str]] = typer.Option(None, "--stamper", help="ETL stamper plugin name to apply (repeatable). Overrides the [etl] stampers config. Plugins register under the bcli.etl.stampers entry-point group."),
polaris_uri: Optional[str] = typer.Option(None, "--polaris-uri", envvar="BCLI_POLARIS_URI", help="Polaris REST catalog URI. Enables post-sync Iceberg registration."),
polaris_warehouse: Optional[str] = typer.Option(None, "--polaris-warehouse", envvar="BCLI_POLARIS_WAREHOUSE", help="Polaris catalog (warehouse) name"),
polaris_credential: Optional[str] = typer.Option(None, "--polaris-credential", envvar="BCLI_POLARIS_CREDENTIAL", help="Polaris OAuth credential in 'client_id:client_secret' form"),
Expand All @@ -76,7 +77,10 @@ def sync(
"""Extract Business Central data and load to a destination via dlt.

By default syncs all custom API endpoints from the registry for the active
profile. Standard v2.0 entities are skipped (typically handled by Fivetran).
profile; pass --include-standard to also sync standard v2.0 entities.
Output is vendor-neutral by default — audit columns (sync timestamp,
soft-delete flags) are opt-in via ETL stamper plugins (--stamper NAME or
the [etl] stampers config).

\b
Examples:
Expand Down Expand Up @@ -130,6 +134,7 @@ def sync(
entities=entity_list,
full_refresh=full_refresh,
include_standard=include_standard,
stampers=list(stamper) if stamper else None,
)

if destination == "filesystem":
Expand Down
Loading
Loading