From 9b81a986ad3eedde9d5b8466b8c7d9c65ce3b8da Mon Sep 17 00:00:00 2001 From: dimitrionian Date: Thu, 23 Apr 2026 18:43:04 +0500 Subject: [PATCH 1/5] ADO-419 dbt-af integration example --- dbt_projects/demo/dbt_project.yml | 24 ++ .../intermediate/int_orders_enriched.sql | 20 ++ .../demo/models/intermediate/schema.yml | 11 + .../marts/analytics/revenue_by_region.sql | 13 + .../demo/models/marts/core/customers.sql | 8 + .../demo/models/marts/core/orders.sql | 16 ++ dbt_projects/demo/models/marts/schema.yml | 29 +++ dbt_projects/demo/models/staging/schema.yml | 18 ++ dbt_projects/demo/models/staging/sources.yml | 13 + .../demo/models/staging/stg_customers.sql | 7 + .../demo/models/staging/stg_orders.sql | 8 + dbt_projects/demo/profiles.yml | 12 + .../demo/snapshots/customers_snapshot_ts.sql | 23 ++ .../marts/not_null_customers_customer_id.sql | 1 + .../models/marts/not_null_orders_order_id.sql | 1 + .../not_null_stg_customers_customer_id.sql | 1 + .../staging/not_null_stg_orders_order_id.sql | 1 + .../examples/dbt_af_example.py | 67 +++++ hatch_build.py | 4 + pyproject.toml | 2 +- tests/integration/dbt/__init__.py | 0 tests/integration/dbt/conftest.py | 24 ++ .../dbt/test_dbt_af_integration.py | 235 ++++++++++++++++++ 23 files changed, 537 insertions(+), 1 deletion(-) create mode 100644 dbt_projects/demo/dbt_project.yml create mode 100644 dbt_projects/demo/models/intermediate/int_orders_enriched.sql create mode 100644 dbt_projects/demo/models/intermediate/schema.yml create mode 100644 dbt_projects/demo/models/marts/analytics/revenue_by_region.sql create mode 100644 dbt_projects/demo/models/marts/core/customers.sql create mode 100644 dbt_projects/demo/models/marts/core/orders.sql create mode 100644 dbt_projects/demo/models/marts/schema.yml create mode 100644 dbt_projects/demo/models/staging/schema.yml create mode 100644 dbt_projects/demo/models/staging/sources.yml create mode 100644 dbt_projects/demo/models/staging/stg_customers.sql create mode 100644 dbt_projects/demo/models/staging/stg_orders.sql create mode 100644 dbt_projects/demo/profiles.yml create mode 100644 dbt_projects/demo/snapshots/customers_snapshot_ts.sql create mode 100644 dbt_projects/demo/tests/dbt/models/marts/not_null_customers_customer_id.sql create mode 100644 dbt_projects/demo/tests/dbt/models/marts/not_null_orders_order_id.sql create mode 100644 dbt_projects/demo/tests/dbt/models/staging/not_null_stg_customers_customer_id.sql create mode 100644 dbt_projects/demo/tests/dbt/models/staging/not_null_stg_orders_order_id.sql create mode 100644 docs/dbt_integration/examples/dbt_af_example.py create mode 100644 tests/integration/dbt/__init__.py create mode 100644 tests/integration/dbt/conftest.py create mode 100644 tests/integration/dbt/test_dbt_af_integration.py diff --git a/dbt_projects/demo/dbt_project.yml b/dbt_projects/demo/dbt_project.yml new file mode 100644 index 0000000000000..66931939344a8 --- /dev/null +++ b/dbt_projects/demo/dbt_project.yml @@ -0,0 +1,24 @@ +name: 'dbt_af_demo' +version: '1.0.0' +config-version: 2 + +profile: 'dbt_af_demo' + +model-paths: ["models"] +snapshot-paths: ["snapshots"] +macro-paths: ["macros"] +target-path: "target" +clean-targets: ["target", "dbt_packages"] + +models: + dbt_af_demo: + staging: + +materialized: view + intermediate: + +materialized: table + marts: + +materialized: table + core: + +schema: marts_core + analytics: + +schema: marts_analytics diff --git a/dbt_projects/demo/models/intermediate/int_orders_enriched.sql b/dbt_projects/demo/models/intermediate/int_orders_enriched.sql new file mode 100644 index 0000000000000..18166d047f640 --- /dev/null +++ b/dbt_projects/demo/models/intermediate/int_orders_enriched.sql @@ -0,0 +1,20 @@ +{{ config(materialized='table') }} + +WITH orders AS ( + SELECT * FROM {{ ref('stg_orders') }} +), +customers AS ( + SELECT * FROM {{ ref('stg_customers') }} +) +SELECT + o.order_id, + o.customer_id, + o.order_date, + o.amount_cents, + o.status, + c.customer_name, + c.account_balance, + c.market_segment, + o.loaded_at +FROM orders o +LEFT JOIN customers c ON o.customer_id = c.customer_id diff --git a/dbt_projects/demo/models/intermediate/schema.yml b/dbt_projects/demo/models/intermediate/schema.yml new file mode 100644 index 0000000000000..7d8f207d3e61d --- /dev/null +++ b/dbt_projects/demo/models/intermediate/schema.yml @@ -0,0 +1,11 @@ +version: 2 + +models: + - name: int_orders_enriched + description: "Orders enriched with customer data" + config: + schedule: "@hourly" + sql_cluster: dev + py_cluster: dev + daily_sql_cluster: dev + bf_cluster: dev diff --git a/dbt_projects/demo/models/marts/analytics/revenue_by_region.sql b/dbt_projects/demo/models/marts/analytics/revenue_by_region.sql new file mode 100644 index 0000000000000..248a5da5b488c --- /dev/null +++ b/dbt_projects/demo/models/marts/analytics/revenue_by_region.sql @@ -0,0 +1,13 @@ +{{ config(materialized='table') }} + +WITH orders AS ( + SELECT * FROM {{ ref('int_orders_enriched') }} +) +SELECT + COALESCE(market_segment, 'Unknown') AS market_segment, + date_trunc('month', order_date)::date AS order_month, + count(DISTINCT order_id) AS order_count, + count(DISTINCT customer_id) AS unique_customers, + round(sum(amount_cents) / 100.0, 2) AS revenue +FROM orders +GROUP BY 1, 2 diff --git a/dbt_projects/demo/models/marts/core/customers.sql b/dbt_projects/demo/models/marts/core/customers.sql new file mode 100644 index 0000000000000..f5f634da7ef63 --- /dev/null +++ b/dbt_projects/demo/models/marts/core/customers.sql @@ -0,0 +1,8 @@ +{{ config(materialized='table') }} + +SELECT + customer_id, + customer_name, + account_balance, + market_segment +FROM {{ ref('stg_customers') }} diff --git a/dbt_projects/demo/models/marts/core/orders.sql b/dbt_projects/demo/models/marts/core/orders.sql new file mode 100644 index 0000000000000..8718d122e2731 --- /dev/null +++ b/dbt_projects/demo/models/marts/core/orders.sql @@ -0,0 +1,16 @@ +{{ config(materialized='table') }} + +SELECT + o.order_id, + o.customer_id, + o.order_date, + o.amount_cents, + o.status, + CASE o.status + WHEN 'O' THEN 'Open' + WHEN 'F' THEN 'Fulfilled' + WHEN 'P' THEN 'Processing' + END AS status_label, + o.customer_name, + o.market_segment +FROM {{ ref('int_orders_enriched') }} o diff --git a/dbt_projects/demo/models/marts/schema.yml b/dbt_projects/demo/models/marts/schema.yml new file mode 100644 index 0000000000000..2f93b8af63839 --- /dev/null +++ b/dbt_projects/demo/models/marts/schema.yml @@ -0,0 +1,29 @@ +version: 2 + +models: + - name: orders + description: "Core orders mart" + config: + schedule: "@daily" + sql_cluster: dev + py_cluster: dev + daily_sql_cluster: dev + bf_cluster: dev + + - name: customers + description: "Core customers mart" + config: + schedule: "@daily" + sql_cluster: dev + py_cluster: dev + daily_sql_cluster: dev + bf_cluster: dev + + - name: revenue_by_region + description: "Monthly revenue by market segment" + config: + schedule: "@daily" + sql_cluster: dev + py_cluster: dev + daily_sql_cluster: dev + bf_cluster: dev diff --git a/dbt_projects/demo/models/staging/schema.yml b/dbt_projects/demo/models/staging/schema.yml new file mode 100644 index 0000000000000..9f32222cbc8df --- /dev/null +++ b/dbt_projects/demo/models/staging/schema.yml @@ -0,0 +1,18 @@ +version: 2 + +models: + - name: stg_orders + config: + schedule: "@daily" + sql_cluster: dev + py_cluster: dev + daily_sql_cluster: dev + bf_cluster: dev + + - name: stg_customers + config: + schedule: "@daily" + sql_cluster: dev + py_cluster: dev + daily_sql_cluster: dev + bf_cluster: dev diff --git a/dbt_projects/demo/models/staging/sources.yml b/dbt_projects/demo/models/staging/sources.yml new file mode 100644 index 0000000000000..b474cdf7af24b --- /dev/null +++ b/dbt_projects/demo/models/staging/sources.yml @@ -0,0 +1,13 @@ +version: 2 + +sources: + - name: src + schema: src + freshness: + warn_after: {count: 24, period: hour} + error_after: {count: 48, period: hour} + loaded_at_field: loaded_at + tables: + - name: orders + - name: customers + - name: customers_scd diff --git a/dbt_projects/demo/models/staging/stg_customers.sql b/dbt_projects/demo/models/staging/stg_customers.sql new file mode 100644 index 0000000000000..4c0e77147da2c --- /dev/null +++ b/dbt_projects/demo/models/staging/stg_customers.sql @@ -0,0 +1,7 @@ +SELECT + c_custkey AS customer_id, + c_name AS customer_name, + c_nationkey AS nation_id, + c_acctbal AS account_balance, + c_mktsegment AS market_segment +FROM {{ source('src', 'customers') }} diff --git a/dbt_projects/demo/models/staging/stg_orders.sql b/dbt_projects/demo/models/staging/stg_orders.sql new file mode 100644 index 0000000000000..617f951a1a83e --- /dev/null +++ b/dbt_projects/demo/models/staging/stg_orders.sql @@ -0,0 +1,8 @@ +SELECT + o_orderkey AS order_id, + o_custkey AS customer_id, + o_orderdate AS order_date, + o_totalprice AS amount_cents, + o_orderstatus AS status, + now() AS loaded_at +FROM {{ source('src', 'orders') }} diff --git a/dbt_projects/demo/profiles.yml b/dbt_projects/demo/profiles.yml new file mode 100644 index 0000000000000..d1c11cd6fb70c --- /dev/null +++ b/dbt_projects/demo/profiles.yml @@ -0,0 +1,12 @@ +dbt_af_demo: + target: dev + outputs: + dev: + type: postgres + host: "{{ env_var('DBT_HOST', 'postgres') }}" + port: "{{ env_var('DBT_PORT', '5432') | int }}" + user: "{{ env_var('DBT_USER', 'postgres') }}" + password: "{{ env_var('DBT_PASSWORD', 'airflow') }}" + dbname: "{{ env_var('DBT_DBNAME', 'airflow') }}" + schema: "{{ env_var('DBT_SCHEMA', 'public') }}" + threads: 4 diff --git a/dbt_projects/demo/snapshots/customers_snapshot_ts.sql b/dbt_projects/demo/snapshots/customers_snapshot_ts.sql new file mode 100644 index 0000000000000..66126ee4892ff --- /dev/null +++ b/dbt_projects/demo/snapshots/customers_snapshot_ts.sql @@ -0,0 +1,23 @@ +{% snapshot customers_snapshot_ts %} +{{ + config( + target_schema='snapshots', + unique_key='c_custkey', + strategy='timestamp', + updated_at='c_updated_at', + schedule='@daily', + sql_cluster='dev', + py_cluster='dev', + daily_sql_cluster='dev', + bf_cluster='dev' + ) +}} +select + c_custkey, + c_name, + c_nationkey, + c_acctbal, + c_mktsegment, + c_updated_at +from {{ source('src', 'customers_scd') }} +{% endsnapshot %} diff --git a/dbt_projects/demo/tests/dbt/models/marts/not_null_customers_customer_id.sql b/dbt_projects/demo/tests/dbt/models/marts/not_null_customers_customer_id.sql new file mode 100644 index 0000000000000..39a4a4f2a2c82 --- /dev/null +++ b/dbt_projects/demo/tests/dbt/models/marts/not_null_customers_customer_id.sql @@ -0,0 +1 @@ +select customer_id from {{ ref('customers') }} where customer_id is null diff --git a/dbt_projects/demo/tests/dbt/models/marts/not_null_orders_order_id.sql b/dbt_projects/demo/tests/dbt/models/marts/not_null_orders_order_id.sql new file mode 100644 index 0000000000000..7df0028e8123d --- /dev/null +++ b/dbt_projects/demo/tests/dbt/models/marts/not_null_orders_order_id.sql @@ -0,0 +1 @@ +select order_id from {{ ref('orders') }} where order_id is null diff --git a/dbt_projects/demo/tests/dbt/models/staging/not_null_stg_customers_customer_id.sql b/dbt_projects/demo/tests/dbt/models/staging/not_null_stg_customers_customer_id.sql new file mode 100644 index 0000000000000..659e6b5177bc5 --- /dev/null +++ b/dbt_projects/demo/tests/dbt/models/staging/not_null_stg_customers_customer_id.sql @@ -0,0 +1 @@ +select customer_id from {{ ref('stg_customers') }} where customer_id is null diff --git a/dbt_projects/demo/tests/dbt/models/staging/not_null_stg_orders_order_id.sql b/dbt_projects/demo/tests/dbt/models/staging/not_null_stg_orders_order_id.sql new file mode 100644 index 0000000000000..64a000c90009f --- /dev/null +++ b/dbt_projects/demo/tests/dbt/models/staging/not_null_stg_orders_order_id.sql @@ -0,0 +1 @@ +select order_id from {{ ref('stg_orders') }} where order_id is null diff --git a/docs/dbt_integration/examples/dbt_af_example.py b/docs/dbt_integration/examples/dbt_af_example.py new file mode 100644 index 0000000000000..249af63ba5faf --- /dev/null +++ b/docs/dbt_integration/examples/dbt_af_example.py @@ -0,0 +1,67 @@ +""" +Example: dbt-af (Toloka) integration with Apache Airflow + +dbt-af auto-generates Airflow DAGs from a dbt manifest.json +It parses the dbt project graph, creates domain-based DAGs with proper +scheduling, cross-DAG sensors, and test grouping + +Requirements: + pip install dbt-af dbt-postgres + +Key dbt-af features demonstrated: + - Domain-based DAG separation (staging, intermediate, marts → separate DAGs) + - Cross-schedule dependencies (daily → hourly → daily via ExternalTaskSensor) + - Singular test tasks (not_null checks as inline tasks) + - Snapshot support (customers_snapshot_ts) + - Backfill DAGs (auto-generated alongside scheduled DAGs) + - Manual "run single model" DAG with Airflow UI params + - Source freshness sensors (for sources with freshness config) + - dry_run mode (skips actual dbt execution, disables catchup) + +IMPORTANT — dbt-af project structure constraints: + - Schedule values MUST use @ prefix: "@daily", "@hourly", "@weekly" + (without @ prefix, dbt-af silently falls back to @daily) + - Generic tests (unique/not_null in schema.yml) are INCOMPATIBLE with dbt-af + due to fqn[3] IndexError. Use singular tests in tests/dbt/models// instead. + - Every model config MUST have: sql_cluster, py_cluster, daily_sql_cluster, bf_cluster + +Running tests: + breeze + pip install dbt-af dbt-postgres + dbt compile --project-dir /opt/airflow/dbt_projects/demo --profiles-dir /opt/airflow/dbt_projects/demo + pytest tests/integration/dbt/test_dbt_af_integration.py -v +""" + +from pathlib import Path + +from dbt_af.conf import Config, DbtDefaultTargetsConfig, DbtProjectConfig +from dbt_af.dags import compile_dbt_af_dags + +DBT_PROJECT_DIR = Path(__file__).parent.parent.parent.parent / "dbt_projects" / "demo" + +config = Config( + dbt_project=DbtProjectConfig( + dbt_project_name="dbt_af_demo", + dbt_models_path=DBT_PROJECT_DIR / "models", + dbt_project_path=DBT_PROJECT_DIR, + dbt_profiles_path=DBT_PROJECT_DIR, + dbt_target_path=DBT_PROJECT_DIR / "target", + dbt_log_path=DBT_PROJECT_DIR / "logs", + dbt_schema="public", + ), + dbt_default_targets=DbtDefaultTargetsConfig( + default_target="dev", + ), + max_active_dag_runs=1, + include_single_model_manual_dag=True, + debug_mode_enabled=False, + dry_run=True, +) + +dags = compile_dbt_af_dags( + manifest_path=str(DBT_PROJECT_DIR / "target" / "manifest.json"), + config=config, +) + +for dag_name, dag in dags.items(): + globals()[dag_name] = dag diff --git a/hatch_build.py b/hatch_build.py index 920a6feeb13ad..1d7048733319c 100644 --- a/hatch_build.py +++ b/hatch_build.py @@ -212,6 +212,9 @@ "towncrier>=23.11.0", "twine>=4.0.2", ], + "devel-dbt": [ + "dbt-af>=0.14.0", + ], "devel-duckdb": [ # Python 3.12 support was added in 0.10.0 "duckdb>=0.10.0; python_version >= '3.12'", @@ -281,6 +284,7 @@ "devel": [ "apache-airflow[celery]", "apache-airflow[cncf-kubernetes]", + "apache-airflow[devel-dbt]", "apache-airflow[devel-debuggers]", "apache-airflow[devel-devscripts]", "apache-airflow[devel-duckdb]", diff --git a/pyproject.toml b/pyproject.toml index a73751bbb6182..67287bef8d29f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -89,7 +89,7 @@ dynamic = ["version", "optional-dependencies", "dependencies"] # # START DEVEL EXTRAS HERE # -# devel, devel-all-dbs, devel-ci, devel-debuggers, devel-devscripts, devel-duckdb, devel-hadoop, +# devel, devel-all-dbs, devel-ci, devel-dbt, devel-debuggers, devel-devscripts, devel-duckdb, devel-hadoop, # devel-mypy, devel-sentry, devel-static-checks, devel-tests # # END DEVEL EXTRAS HERE diff --git a/tests/integration/dbt/__init__.py b/tests/integration/dbt/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/tests/integration/dbt/conftest.py b/tests/integration/dbt/conftest.py new file mode 100644 index 0000000000000..3fc5809203239 --- /dev/null +++ b/tests/integration/dbt/conftest.py @@ -0,0 +1,24 @@ +from __future__ import annotations + +import subprocess +from pathlib import Path + +DBT_PROJECT_DIR = Path(__file__).parents[3] / "dbt_projects" / "demo" + + +def pytest_configure(config): + """Run dbt compile to generate manifest.json before test collection""" + manifest = DBT_PROJECT_DIR / "target" / "manifest.json" + if manifest.exists(): + return + + subprocess.run( + [ + "dbt", "compile", + "--project-dir", str(DBT_PROJECT_DIR), + "--profiles-dir", str(DBT_PROJECT_DIR), + ], + check=True, + capture_output=True, + text=True, + ) diff --git a/tests/integration/dbt/test_dbt_af_integration.py b/tests/integration/dbt/test_dbt_af_integration.py new file mode 100644 index 0000000000000..4c099d3e0774b --- /dev/null +++ b/tests/integration/dbt/test_dbt_af_integration.py @@ -0,0 +1,235 @@ +""" +Integration tests for dbt-af framework + +Run inside Breeze: + pytest tests/integration/dbt/test_dbt_af_integration.py -v + +Tests verify that dbt-af correctly: +1. Parses manifest.json with custom dbt-af config fields (schedule, clusters) +2. Generates domain-based Airflow DAGs from manifest graph +3. Creates cross-domain ExternalTaskSensors for inter-domain dependencies +4. Handles cross-schedule dependencies (daily staging -> hourly intermediate -> daily marts) +5. Groups small tests (unique/not_null) inline with model tasks +6. Supports snapshot nodes +7. Generates backfill DAGs alongside scheduled DAGs +8. Generates manual "run single model" DAG with UI params +9. Creates source freshness sensors for sources with freshness config +10. Respects dry_run mode (catchup=False) + +No external database required - dbt-af only parses manifest.json and builds +Airflow DAG objects in memory +""" + +from __future__ import annotations + +import json +from pathlib import Path + +import pytest + +from dbt_af.conf import Config, DbtDefaultTargetsConfig, DbtProjectConfig +from dbt_af.dags import compile_dbt_af_dags + +DBT_PROJECT_DIR = Path(__file__).parents[3] / "dbt_projects" / "demo" +MANIFEST_PATH = str(DBT_PROJECT_DIR / "target" / "manifest.json") + + +@pytest.fixture(scope="module") +def dbt_af_config(): + return Config( + dbt_project=DbtProjectConfig( + dbt_project_name="dbt_af_demo", + dbt_models_path=DBT_PROJECT_DIR / "models", + dbt_project_path=DBT_PROJECT_DIR, + dbt_profiles_path=DBT_PROJECT_DIR, + dbt_target_path=DBT_PROJECT_DIR / "target", + dbt_log_path=DBT_PROJECT_DIR / "logs", + dbt_schema="public", + ), + dbt_default_targets=DbtDefaultTargetsConfig( + default_target="dev", + ), + max_active_dag_runs=1, + include_single_model_manual_dag=True, + debug_mode_enabled=False, + dry_run=True, + ) + + +@pytest.fixture(scope="module") +def generated_dags(dbt_af_config): + """Compile all DAGs from manifest — the core dbt-af entry point""" + return compile_dbt_af_dags( + manifest_path=MANIFEST_PATH, + config=dbt_af_config, + ) + + +# Manifest validation +class TestManifestParsing: + """Verify the generated manifest.json is valid for dbt-af""" + + def test_manifest_exists(self): + assert Path(MANIFEST_PATH).exists(), "manifest.json must exist in target/" + + def test_manifest_has_required_sections(self): + with open(MANIFEST_PATH) as f: + manifest = json.load(f) + for key in ("nodes", "sources", "metadata"): + assert key in manifest, f"manifest.json missing '{key}'" + + def test_manifest_node_counts(self): + with open(MANIFEST_PATH) as f: + manifest = json.load(f) + by_type = {} + for node in manifest["nodes"].values(): + rt = node["resource_type"] + by_type[rt] = by_type.get(rt, 0) + 1 + assert by_type.get("model", 0) == 6, f"Expected 6 models, got {by_type}" + assert by_type.get("test", 0) == 4, f"Expected 4 tests, got {by_type}" + assert by_type.get("snapshot", 0) == 1, f"Expected 1 snapshot, got {by_type}" + + def test_manifest_sources_count(self): + with open(MANIFEST_PATH) as f: + manifest = json.load(f) + assert len(manifest["sources"]) == 3 + + def test_models_have_dbt_af_config_fields(self): + """dbt-af requires schedule + cluster fields in every model config""" + with open(MANIFEST_PATH) as f: + manifest = json.load(f) + required = {"schedule", "sql_cluster", "py_cluster", "daily_sql_cluster", "bf_cluster"} + for uid, node in manifest["nodes"].items(): + if node["resource_type"] == "model": + missing = required - set(node["config"].keys()) + assert not missing, f"{uid} missing dbt-af config fields: {missing}" + + +# DAG generation +class TestDagGeneration: + """Verify compile_dbt_af_dags produces correct DAG set""" + + def test_dags_not_empty(self, generated_dags): + assert len(generated_dags) > 0 + + def test_all_dags_have_dbt_tag(self, generated_dags): + for name, dag in generated_dags.items(): + assert "dbt" in dag.tags, f"DAG '{name}' missing 'dbt' tag. Tags: {dag.tags}" + + def test_manual_run_dag_exists(self, generated_dags): + expected = "dbt_af_demo_dbt_run_model" + assert expected in generated_dags, ( + f"Manual run DAG '{expected}' not found. DAGs: {list(generated_dags.keys())}" + ) + + def test_manual_run_dag_has_no_schedule(self, generated_dags): + dag = generated_dags["dbt_af_demo_dbt_run_model"] + assert dag.schedule_interval is None + + +# Domain separation — core dbt-af feature +class TestDomainSeparation: + """dbt-af groups models into DAGs by domain (fqn[1]) + schedule""" + + def test_multiple_domain_dags_created(self, generated_dags): + scheduled = {k for k in generated_dags + if "dbt_run_model" not in k and "backfill" not in k} + assert len(scheduled) >= 2, ( + f"Expected DAGs for multiple domains. Got: {scheduled}" + ) + + def test_expected_domains_present(self, generated_dags): + dag_names = set(generated_dags.keys()) + found = {d for d in ("staging", "intermediate", "marts") if any(d in n for n in dag_names)} + assert len(found) >= 2, f"Expected domain names in DAG ids. DAGs: {dag_names}" + + +# Cross-schedule dependencies +class TestCrossSchedule: + """int_orders_enriched is hourly; staging/marts are daily + dbt-af must create separate DAGs for different schedules""" + + def test_hourly_dag_exists(self, generated_dags): + hourly = [n for n in generated_dags if "hourly" in n] + assert hourly, f"Expected hourly DAG. DAGs: {list(generated_dags.keys())}" + + def test_daily_and_hourly_coexist(self, generated_dags): + has_daily = any("daily" in n for n in generated_dags) + has_hourly = any("hourly" in n for n in generated_dags) + assert has_daily and has_hourly, ( + f"Need both daily and hourly DAGs. DAGs: {list(generated_dags.keys())}" + ) + + +# Task structure +class TestTaskStructure: + """Verify model/test/snapshot tasks inside DAGs""" + + def _all_task_ids(self, dags): + ids = set() + for dag in dags.values(): + ids.update(dag.task_ids) + return ids + + def test_model_tasks_present(self, generated_dags): + ids = self._all_task_ids(generated_dags) + for model in ("stg_customers", "stg_orders", "int_orders_enriched"): + assert any(model in t for t in ids), ( + f"Model '{model}' not found in tasks: {ids}" + ) + + def test_snapshot_task_present(self, generated_dags): + ids = self._all_task_ids(generated_dags) + assert any("customers_snapshot_ts" in t for t in ids), ( + f"Snapshot task not found. Tasks: {ids}" + ) + + def test_small_test_tasks_present(self, generated_dags): + """Singular tests should appear as inline tasks""" + ids = self._all_task_ids(generated_dags) + test_tasks = [t for t in ids if "not_null_" in t] + assert len(test_tasks) > 0, f"No test tasks found. Tasks: {ids}" + + +# Backfill DAGs +class TestBackfillDags: + """dbt-af auto-generates backfill DAGs for each domain""" + + def test_backfill_dags_exist(self, generated_dags): + bf = [n for n in generated_dags if "backfill" in n] + assert bf, f"No backfill DAGs. DAGs: {list(generated_dags.keys())}" + + def test_backfill_dags_tagged(self, generated_dags): + for name, dag in generated_dags.items(): + if "backfill" in name: + assert "backfill" in dag.tags, ( + f"Backfill DAG '{name}' missing 'backfill' tag" + ) + + +# Source freshness +class TestSourceFreshness: + """Sources with freshness config should produce DbtSourceFreshnessSensor tasks""" + + def test_freshness_sensors_created(self, generated_dags): + staging = {k: v for k, v in generated_dags.items() + if "staging" in k and "backfill" not in k} + ids = set() + for dag in staging.values(): + ids.update(dag.task_ids) + freshness = [t for t in ids if "freshness" in t.lower()] + assert freshness, ( + f"Expected source freshness sensors in staging DAGs. Tasks: {ids}" + ) + + +# Dry-run mode +class TestDryRun: + """dry_run=True should disable catchup on all scheduled DAGs""" + + def test_catchup_disabled(self, generated_dags): + for name, dag in generated_dags.items(): + if dag.schedule_interval is not None: + assert dag.catchup is False, ( + f"DAG '{name}' should have catchup=False in dry_run mode" + ) From 69e3c83bc3146a2e045eb7b40adef8bf0a1347f4 Mon Sep 17 00:00:00 2001 From: dimitrionian Date: Fri, 24 Apr 2026 17:30:03 +0500 Subject: [PATCH 2/5] ADO-419 Refactor --- .../examples/dbt_af_example.py | 6 +- tests/integration/dbt/conftest.py | 2 +- .../integration/dbt}/demo/dbt_project.yml | 0 .../intermediate/int_orders_enriched.sql | 0 .../dbt}/demo/models/intermediate/schema.yml | 0 .../marts/analytics/revenue_by_region.sql | 0 .../dbt}/demo/models/marts/core/customers.sql | 0 .../dbt}/demo/models/marts/core/orders.sql | 0 .../dbt}/demo/models/marts/schema.yml | 0 .../dbt}/demo/models/staging/schema.yml | 0 .../dbt}/demo/models/staging/sources.yml | 0 .../demo/models/staging/stg_customers.sql | 0 .../dbt}/demo/models/staging/stg_orders.sql | 0 .../integration/dbt}/demo/profiles.yml | 0 .../demo/snapshots/customers_snapshot_ts.sql | 0 .../marts/not_null_customers_customer_id.sql | 0 .../models/marts/not_null_orders_order_id.sql | 0 .../not_null_stg_customers_customer_id.sql | 0 .../staging/not_null_stg_orders_order_id.sql | 0 .../dbt/test_dbt_af_integration.py | 56 ++++++++++--------- 20 files changed, 33 insertions(+), 31 deletions(-) rename {dbt_projects => tests/integration/dbt}/demo/dbt_project.yml (100%) rename {dbt_projects => tests/integration/dbt}/demo/models/intermediate/int_orders_enriched.sql (100%) rename {dbt_projects => tests/integration/dbt}/demo/models/intermediate/schema.yml (100%) rename {dbt_projects => tests/integration/dbt}/demo/models/marts/analytics/revenue_by_region.sql (100%) rename {dbt_projects => tests/integration/dbt}/demo/models/marts/core/customers.sql (100%) rename {dbt_projects => tests/integration/dbt}/demo/models/marts/core/orders.sql (100%) rename {dbt_projects => tests/integration/dbt}/demo/models/marts/schema.yml (100%) rename {dbt_projects => tests/integration/dbt}/demo/models/staging/schema.yml (100%) rename {dbt_projects => tests/integration/dbt}/demo/models/staging/sources.yml (100%) rename {dbt_projects => tests/integration/dbt}/demo/models/staging/stg_customers.sql (100%) rename {dbt_projects => tests/integration/dbt}/demo/models/staging/stg_orders.sql (100%) rename {dbt_projects => tests/integration/dbt}/demo/profiles.yml (100%) rename {dbt_projects => tests/integration/dbt}/demo/snapshots/customers_snapshot_ts.sql (100%) rename {dbt_projects => tests/integration/dbt}/demo/tests/dbt/models/marts/not_null_customers_customer_id.sql (100%) rename {dbt_projects => tests/integration/dbt}/demo/tests/dbt/models/marts/not_null_orders_order_id.sql (100%) rename {dbt_projects => tests/integration/dbt}/demo/tests/dbt/models/staging/not_null_stg_customers_customer_id.sql (100%) rename {dbt_projects => tests/integration/dbt}/demo/tests/dbt/models/staging/not_null_stg_orders_order_id.sql (100%) diff --git a/docs/dbt_integration/examples/dbt_af_example.py b/docs/dbt_integration/examples/dbt_af_example.py index 249af63ba5faf..98f0f25eeb943 100644 --- a/docs/dbt_integration/examples/dbt_af_example.py +++ b/docs/dbt_integration/examples/dbt_af_example.py @@ -9,8 +9,8 @@ pip install dbt-af dbt-postgres Key dbt-af features demonstrated: - - Domain-based DAG separation (staging, intermediate, marts → separate DAGs) - - Cross-schedule dependencies (daily → hourly → daily via ExternalTaskSensor) + - Domain-based DAG separation (staging, intermediate, marts - separate DAGs) + - Cross-schedule dependencies (daily - hourly - daily via ExternalTaskSensor) - Singular test tasks (not_null checks as inline tasks) - Snapshot support (customers_snapshot_ts) - Backfill DAGs (auto-generated alongside scheduled DAGs) @@ -26,9 +26,7 @@ - Every model config MUST have: sql_cluster, py_cluster, daily_sql_cluster, bf_cluster Running tests: - breeze pip install dbt-af dbt-postgres - dbt compile --project-dir /opt/airflow/dbt_projects/demo --profiles-dir /opt/airflow/dbt_projects/demo pytest tests/integration/dbt/test_dbt_af_integration.py -v """ diff --git a/tests/integration/dbt/conftest.py b/tests/integration/dbt/conftest.py index 3fc5809203239..f1e089838d571 100644 --- a/tests/integration/dbt/conftest.py +++ b/tests/integration/dbt/conftest.py @@ -3,7 +3,7 @@ import subprocess from pathlib import Path -DBT_PROJECT_DIR = Path(__file__).parents[3] / "dbt_projects" / "demo" +DBT_PROJECT_DIR = Path(__file__).parent / "demo" def pytest_configure(config): diff --git a/dbt_projects/demo/dbt_project.yml b/tests/integration/dbt/demo/dbt_project.yml similarity index 100% rename from dbt_projects/demo/dbt_project.yml rename to tests/integration/dbt/demo/dbt_project.yml diff --git a/dbt_projects/demo/models/intermediate/int_orders_enriched.sql b/tests/integration/dbt/demo/models/intermediate/int_orders_enriched.sql similarity index 100% rename from dbt_projects/demo/models/intermediate/int_orders_enriched.sql rename to tests/integration/dbt/demo/models/intermediate/int_orders_enriched.sql diff --git a/dbt_projects/demo/models/intermediate/schema.yml b/tests/integration/dbt/demo/models/intermediate/schema.yml similarity index 100% rename from dbt_projects/demo/models/intermediate/schema.yml rename to tests/integration/dbt/demo/models/intermediate/schema.yml diff --git a/dbt_projects/demo/models/marts/analytics/revenue_by_region.sql b/tests/integration/dbt/demo/models/marts/analytics/revenue_by_region.sql similarity index 100% rename from dbt_projects/demo/models/marts/analytics/revenue_by_region.sql rename to tests/integration/dbt/demo/models/marts/analytics/revenue_by_region.sql diff --git a/dbt_projects/demo/models/marts/core/customers.sql b/tests/integration/dbt/demo/models/marts/core/customers.sql similarity index 100% rename from dbt_projects/demo/models/marts/core/customers.sql rename to tests/integration/dbt/demo/models/marts/core/customers.sql diff --git a/dbt_projects/demo/models/marts/core/orders.sql b/tests/integration/dbt/demo/models/marts/core/orders.sql similarity index 100% rename from dbt_projects/demo/models/marts/core/orders.sql rename to tests/integration/dbt/demo/models/marts/core/orders.sql diff --git a/dbt_projects/demo/models/marts/schema.yml b/tests/integration/dbt/demo/models/marts/schema.yml similarity index 100% rename from dbt_projects/demo/models/marts/schema.yml rename to tests/integration/dbt/demo/models/marts/schema.yml diff --git a/dbt_projects/demo/models/staging/schema.yml b/tests/integration/dbt/demo/models/staging/schema.yml similarity index 100% rename from dbt_projects/demo/models/staging/schema.yml rename to tests/integration/dbt/demo/models/staging/schema.yml diff --git a/dbt_projects/demo/models/staging/sources.yml b/tests/integration/dbt/demo/models/staging/sources.yml similarity index 100% rename from dbt_projects/demo/models/staging/sources.yml rename to tests/integration/dbt/demo/models/staging/sources.yml diff --git a/dbt_projects/demo/models/staging/stg_customers.sql b/tests/integration/dbt/demo/models/staging/stg_customers.sql similarity index 100% rename from dbt_projects/demo/models/staging/stg_customers.sql rename to tests/integration/dbt/demo/models/staging/stg_customers.sql diff --git a/dbt_projects/demo/models/staging/stg_orders.sql b/tests/integration/dbt/demo/models/staging/stg_orders.sql similarity index 100% rename from dbt_projects/demo/models/staging/stg_orders.sql rename to tests/integration/dbt/demo/models/staging/stg_orders.sql diff --git a/dbt_projects/demo/profiles.yml b/tests/integration/dbt/demo/profiles.yml similarity index 100% rename from dbt_projects/demo/profiles.yml rename to tests/integration/dbt/demo/profiles.yml diff --git a/dbt_projects/demo/snapshots/customers_snapshot_ts.sql b/tests/integration/dbt/demo/snapshots/customers_snapshot_ts.sql similarity index 100% rename from dbt_projects/demo/snapshots/customers_snapshot_ts.sql rename to tests/integration/dbt/demo/snapshots/customers_snapshot_ts.sql diff --git a/dbt_projects/demo/tests/dbt/models/marts/not_null_customers_customer_id.sql b/tests/integration/dbt/demo/tests/dbt/models/marts/not_null_customers_customer_id.sql similarity index 100% rename from dbt_projects/demo/tests/dbt/models/marts/not_null_customers_customer_id.sql rename to tests/integration/dbt/demo/tests/dbt/models/marts/not_null_customers_customer_id.sql diff --git a/dbt_projects/demo/tests/dbt/models/marts/not_null_orders_order_id.sql b/tests/integration/dbt/demo/tests/dbt/models/marts/not_null_orders_order_id.sql similarity index 100% rename from dbt_projects/demo/tests/dbt/models/marts/not_null_orders_order_id.sql rename to tests/integration/dbt/demo/tests/dbt/models/marts/not_null_orders_order_id.sql diff --git a/dbt_projects/demo/tests/dbt/models/staging/not_null_stg_customers_customer_id.sql b/tests/integration/dbt/demo/tests/dbt/models/staging/not_null_stg_customers_customer_id.sql similarity index 100% rename from dbt_projects/demo/tests/dbt/models/staging/not_null_stg_customers_customer_id.sql rename to tests/integration/dbt/demo/tests/dbt/models/staging/not_null_stg_customers_customer_id.sql diff --git a/dbt_projects/demo/tests/dbt/models/staging/not_null_stg_orders_order_id.sql b/tests/integration/dbt/demo/tests/dbt/models/staging/not_null_stg_orders_order_id.sql similarity index 100% rename from dbt_projects/demo/tests/dbt/models/staging/not_null_stg_orders_order_id.sql rename to tests/integration/dbt/demo/tests/dbt/models/staging/not_null_stg_orders_order_id.sql diff --git a/tests/integration/dbt/test_dbt_af_integration.py b/tests/integration/dbt/test_dbt_af_integration.py index 4c099d3e0774b..e04de9559011e 100644 --- a/tests/integration/dbt/test_dbt_af_integration.py +++ b/tests/integration/dbt/test_dbt_af_integration.py @@ -30,7 +30,7 @@ from dbt_af.conf import Config, DbtDefaultTargetsConfig, DbtProjectConfig from dbt_af.dags import compile_dbt_af_dags -DBT_PROJECT_DIR = Path(__file__).parents[3] / "dbt_projects" / "demo" +DBT_PROJECT_DIR = Path(__file__).parent / "demo" MANIFEST_PATH = str(DBT_PROJECT_DIR / "target" / "manifest.json") @@ -70,15 +70,18 @@ class TestManifestParsing: """Verify the generated manifest.json is valid for dbt-af""" def test_manifest_exists(self): + """manifest.json is generated by 'dbt compile' (conftest.py runs it automatically)""" assert Path(MANIFEST_PATH).exists(), "manifest.json must exist in target/" def test_manifest_has_required_sections(self): + """nodes = models/tests/snapshots/seeds, sources = external tables, metadata = dbt version info""" with open(MANIFEST_PATH) as f: manifest = json.load(f) for key in ("nodes", "sources", "metadata"): assert key in manifest, f"manifest.json missing '{key}'" def test_manifest_node_counts(self): + """Demo project: 6 SQL models, 4 singular tests, 1 SCD snapshot""" with open(MANIFEST_PATH) as f: manifest = json.load(f) by_type = {} @@ -90,6 +93,7 @@ def test_manifest_node_counts(self): assert by_type.get("snapshot", 0) == 1, f"Expected 1 snapshot, got {by_type}" def test_manifest_sources_count(self): + """3 external source tables: orders, customers, customers_scd (defined in sources.yml)""" with open(MANIFEST_PATH) as f: manifest = json.load(f) assert len(manifest["sources"]) == 3 @@ -110,19 +114,25 @@ class TestDagGeneration: """Verify compile_dbt_af_dags produces correct DAG set""" def test_dags_not_empty(self, generated_dags): + """compile_dbt_af_dags must produce at least one DAG from the manifest""" assert len(generated_dags) > 0 def test_all_dags_have_dbt_tag(self, generated_dags): + """dbt-af tags every generated DAG with 'dbt' for filtering in Airflow UI""" for name, dag in generated_dags.items(): assert "dbt" in dag.tags, f"DAG '{name}' missing 'dbt' tag. Tags: {dag.tags}" def test_manual_run_dag_exists(self, generated_dags): + """When include_single_model_manual_dag=True, dbt-af creates a special DAG + ('{project}_dbt_run_model') with no schedule. It allows running any single + model on-demand from Airflow UI with custom params (model name, date range).""" expected = "dbt_af_demo_dbt_run_model" assert expected in generated_dags, ( f"Manual run DAG '{expected}' not found. DAGs: {list(generated_dags.keys())}" ) def test_manual_run_dag_has_no_schedule(self, generated_dags): + """Manual run DAG must have no schedule — it's triggered manually from UI""" dag = generated_dags["dbt_af_demo_dbt_run_model"] assert dag.schedule_interval is None @@ -132,33 +142,16 @@ class TestDomainSeparation: """dbt-af groups models into DAGs by domain (fqn[1]) + schedule""" def test_multiple_domain_dags_created(self, generated_dags): - scheduled = {k for k in generated_dags - if "dbt_run_model" not in k and "backfill" not in k} - assert len(scheduled) >= 2, ( - f"Expected DAGs for multiple domains. Got: {scheduled}" - ) - - def test_expected_domains_present(self, generated_dags): - dag_names = set(generated_dags.keys()) - found = {d for d in ("staging", "intermediate", "marts") if any(d in n for n in dag_names)} - assert len(found) >= 2, f"Expected domain names in DAG ids. DAGs: {dag_names}" - + """Each domain (models//) + schedule combination produces a separate DAG + Our demo has 3 domains: staging(@daily), intermediate(@hourly), marts(@daily)""" -# Cross-schedule dependencies -class TestCrossSchedule: - """int_orders_enriched is hourly; staging/marts are daily - dbt-af must create separate DAGs for different schedules""" + expected_dags = {"staging__daily", "intermediate__hourly", "marts__daily"} + missing = expected_dags - set(generated_dags.keys()) + assert not missing, ( + f"Missing domain DAGs: {missing}. Got: {list(generated_dags.keys())}" + ) - def test_hourly_dag_exists(self, generated_dags): - hourly = [n for n in generated_dags if "hourly" in n] - assert hourly, f"Expected hourly DAG. DAGs: {list(generated_dags.keys())}" - def test_daily_and_hourly_coexist(self, generated_dags): - has_daily = any("daily" in n for n in generated_dags) - has_hourly = any("hourly" in n for n in generated_dags) - assert has_daily and has_hourly, ( - f"Need both daily and hourly DAGs. DAGs: {list(generated_dags.keys())}" - ) # Task structure @@ -172,6 +165,8 @@ def _all_task_ids(self, dags): return ids def test_model_tasks_present(self, generated_dags): + """Each dbt model produces at least one Airflow task (may appear in multiple + DAGs, scheduled + backfill, and may be wrapped in a TaskGroup with tests)""" ids = self._all_task_ids(generated_dags) for model in ("stg_customers", "stg_orders", "int_orders_enriched"): assert any(model in t for t in ids), ( @@ -179,13 +174,15 @@ def test_model_tasks_present(self, generated_dags): ) def test_snapshot_task_present(self, generated_dags): + """dbt snapshots (SCD Type 2) are mapped to DbtSnapshot operator tasks""" ids = self._all_task_ids(generated_dags) assert any("customers_snapshot_ts" in t for t in ids), ( f"Snapshot task not found. Tasks: {ids}" ) def test_small_test_tasks_present(self, generated_dags): - """Singular tests should appear as inline tasks""" + """Singular tests (tests/dbt/models//*.sql) appear as inline + tasks within the model's task group, executed after the model runs""" ids = self._all_task_ids(generated_dags) test_tasks = [t for t in ids if "not_null_" in t] assert len(test_tasks) > 0, f"No test tasks found. Tasks: {ids}" @@ -196,10 +193,12 @@ class TestBackfillDags: """dbt-af auto-generates backfill DAGs for each domain""" def test_backfill_dags_exist(self, generated_dags): + """dbt-af creates a backfill DAG for each domain to re-run historical data""" bf = [n for n in generated_dags if "backfill" in n] assert bf, f"No backfill DAGs. DAGs: {list(generated_dags.keys())}" def test_backfill_dags_tagged(self, generated_dags): + """Backfill DAGs are tagged for easy filtering in Airflow UI""" for name, dag in generated_dags.items(): if "backfill" in name: assert "backfill" in dag.tags, ( @@ -212,6 +211,9 @@ class TestSourceFreshness: """Sources with freshness config should produce DbtSourceFreshnessSensor tasks""" def test_freshness_sensors_created(self, generated_dags): + """Sources with freshness config (warn_after/error_after in sources.yml) + produce DbtSourceFreshnessSensor tasks that check loaded_at field + before allowing downstream models to run""" staging = {k: v for k, v in generated_dags.items() if "staging" in k and "backfill" not in k} ids = set() @@ -228,6 +230,8 @@ class TestDryRun: """dry_run=True should disable catchup on all scheduled DAGs""" def test_catchup_disabled(self, generated_dags): + """dry_run=True skips actual dbt execution and disables catchup + so Airflow won't try to backfill missed runs on first deploy""" for name, dag in generated_dags.items(): if dag.schedule_interval is not None: assert dag.catchup is False, ( From 810b9b0f1198b5eddff787fb22f7a2377b4b88dd Mon Sep 17 00:00:00 2001 From: dimitrionian Date: Mon, 4 May 2026 19:54:51 +0500 Subject: [PATCH 3/5] ADO-422 Integration tests for dmp-af (based on the ADO-419 dbt demo project) --- .../dbt/test_dmp_af_integration.py | 177 ++++++++++++++++++ 1 file changed, 177 insertions(+) create mode 100644 tests/integration/dbt/test_dmp_af_integration.py diff --git a/tests/integration/dbt/test_dmp_af_integration.py b/tests/integration/dbt/test_dmp_af_integration.py new file mode 100644 index 0000000000000..add575dace102 --- /dev/null +++ b/tests/integration/dbt/test_dmp_af_integration.py @@ -0,0 +1,177 @@ +""" +Integration tests for dmp-af framework + +dmp-af is a fork of dbt-af (Toloka) by dmp-labs with Airflow 3.x support added +Uses the same demo dbt project as dbt-af tests + +Run inside Breeze: + pip install /path/to/dmp-af dbt-postgres + pytest tests/integration/dbt/test_dmp_af_integration.py -v + +dbt compile runs automatically via conftest.py if manifest.json is missing +""" + +from __future__ import annotations + +import json +from pathlib import Path + +import pytest + +from dmp_af.conf import Config, DbtDefaultTargetsConfig, DbtProjectConfig +from dmp_af.dags import compile_dmp_af_dags + +DBT_PROJECT_DIR = Path(__file__).parent / "demo" +MANIFEST_PATH = str(DBT_PROJECT_DIR / "target" / "manifest.json") + + +@pytest.fixture(scope="module") +def dmp_af_config(): + return Config( + dbt_project=DbtProjectConfig( + dbt_project_name="dbt_af_demo", + dbt_models_path=DBT_PROJECT_DIR / "models", + dbt_project_path=DBT_PROJECT_DIR, + dbt_profiles_path=DBT_PROJECT_DIR, + dbt_target_path=DBT_PROJECT_DIR / "target", + dbt_log_path=DBT_PROJECT_DIR / "logs", + dbt_schema="public", + ), + dbt_default_targets=DbtDefaultTargetsConfig( + default_target="dev", + ), + max_active_dag_runs=1, + include_single_model_manual_dag=True, + debug_mode_enabled=False, + dry_run=True, + ) + + +@pytest.fixture(scope="module") +def generated_dags(dmp_af_config): + """Compile all DAGs from manifest, the core dmp-af entry point""" + return compile_dmp_af_dags( + manifest_path=MANIFEST_PATH, + config=dmp_af_config, + ) + + +class TestDmpAfDagGeneration: + """Verify compile_dmp_af_dags produces correct DAG set""" + + def test_dags_not_empty(self, generated_dags): + """compile_dmp_af_dags must produce at least one DAG from the manifest""" + assert len(generated_dags) > 0 + + def test_all_dags_have_dbt_tag(self, generated_dags): + """dmp-af tags every generated DAG with 'dbt' for filtering in Airflow UI""" + for name, dag in generated_dags.items(): + assert "dbt" in dag.tags, f"DAG '{name}' missing 'dbt' tag. Tags: {dag.tags}" + + def test_manual_run_dag_exists(self, generated_dags): + """When include_single_model_manual_dag=True, dmp-af creates a special DAG + ('{project}_dbt_run_model') with no schedule. It allows running any single + model on-demand from Airflow UI with custom params (model name, date range)""" + expected = "dbt_af_demo_dbt_run_model" + assert expected in generated_dags, ( + f"Manual run DAG '{expected}' not found. DAGs: {list(generated_dags.keys())}" + ) + + def test_manual_run_dag_has_no_schedule(self, generated_dags): + """Manual run DAG must have no schedule, it's triggered manually from UI""" + dag = generated_dags["dbt_af_demo_dbt_run_model"] + assert dag.schedule_interval is None + + +class TestDmpAfDomainSeparation: + """dmp-af groups models into DAGs by domain (fqn[1]) + schedule""" + + def test_expected_domain_dags_created(self, generated_dags): + """Each domain + schedule combination produces a separate DAG + Our demo has 3 domains: staging(@daily), intermediate(@hourly), marts(@daily)""" + expected_dags = {"staging__daily", "intermediate__hourly", "marts__daily"} + missing = expected_dags - set(generated_dags.keys()) + assert not missing, ( + f"Missing domain DAGs: {missing}. Got: {list(generated_dags.keys())}" + ) + + +class TestDmpAfTaskStructure: + """Verify model/test/snapshot tasks inside DAGs""" + + def _all_task_ids(self, dags): + ids = set() + for dag in dags.values(): + ids.update(dag.task_ids) + return ids + + def test_model_tasks_present(self, generated_dags): + """Each dbt model produces at least one Airflow task (may appear in multiple + DAGs, scheduled + backfill, and may be wrapped in a TaskGroup with tests)""" + ids = self._all_task_ids(generated_dags) + for model in ("stg_customers", "stg_orders", "int_orders_enriched"): + assert any(model in t for t in ids), ( + f"Model '{model}' not found in tasks: {ids}" + ) + + def test_snapshot_task_present(self, generated_dags): + """dbt snapshots (SCD Type 2) are mapped to DbtSnapshot operator tasks""" + ids = self._all_task_ids(generated_dags) + assert any("customers_snapshot_ts" in t for t in ids), ( + f"Snapshot task not found. Tasks: {ids}" + ) + + def test_small_test_tasks_present(self, generated_dags): + """Singular tests (tests/dbt/models//*.sql) appear as inline + tasks within the model's task group, executed after the model runs""" + ids = self._all_task_ids(generated_dags) + test_tasks = [t for t in ids if "not_null_" in t] + assert len(test_tasks) > 0, f"No test tasks found. Tasks: {ids}" + + +class TestDmpAfBackfillDags: + """dmp-af auto-generates backfill DAGs for each domain""" + + def test_backfill_dags_exist(self, generated_dags): + """dmp-af creates a backfill DAG for each domain to rerun historical data""" + bf = [n for n in generated_dags if "backfill" in n] + assert bf, f"No backfill DAGs. DAGs: {list(generated_dags.keys())}" + + def test_backfill_dags_tagged(self, generated_dags): + """Backfill DAGs are tagged for easy filtering in Airflow UI""" + for name, dag in generated_dags.items(): + if "backfill" in name: + assert "backfill" in dag.tags, ( + f"Backfill DAG '{name}' missing 'backfill' tag" + ) + + +class TestDmpAfSourceFreshness: + """Sources with freshness config should produce DbtSourceFreshnessSensor tasks""" + + def test_freshness_sensors_created(self, generated_dags): + """Sources with freshness config (warn_after/error_after in sources.yml) + produce DbtSourceFreshnessSensor tasks that check loaded_at field + before allowing downstream models to run""" + staging = {k: v for k, v in generated_dags.items() + if "staging" in k and "backfill" not in k} + ids = set() + for dag in staging.values(): + ids.update(dag.task_ids) + freshness = [t for t in ids if "freshness" in t.lower()] + assert freshness, ( + f"Expected source freshness sensors in staging DAGs. Tasks: {ids}" + ) + + +class TestDmpAfDryRun: + """dry_run=True should disable catchup on all scheduled DAGs""" + + def test_catchup_disabled(self, generated_dags): + """dry_run=True skips actual dbt execution and disables catchup + so Airflow won't try to backfill missed runs on first deploy""" + for name, dag in generated_dags.items(): + if dag.schedule_interval is not None: + assert dag.catchup is False, ( + f"DAG '{name}' should have catchup=False in dry_run mode" + ) From bf0e63e81bd05d89b3b51b68cdd9bfbf2a95d412 Mon Sep 17 00:00:00 2001 From: dimitrionian Date: Wed, 6 May 2026 13:47:04 +0500 Subject: [PATCH 4/5] ADO-422 Add compatibility with Airflow 3 --- tests/integration/dbt/test_dmp_af_integration.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/tests/integration/dbt/test_dmp_af_integration.py b/tests/integration/dbt/test_dmp_af_integration.py index add575dace102..e875193c42df7 100644 --- a/tests/integration/dbt/test_dmp_af_integration.py +++ b/tests/integration/dbt/test_dmp_af_integration.py @@ -80,7 +80,9 @@ def test_manual_run_dag_exists(self, generated_dags): def test_manual_run_dag_has_no_schedule(self, generated_dags): """Manual run DAG must have no schedule, it's triggered manually from UI""" dag = generated_dags["dbt_af_demo_dbt_run_model"] - assert dag.schedule_interval is None + # Airflow 3 removed schedule_interval in favor of schedule + schedule = getattr(dag, 'schedule_interval', None) if hasattr(dag, 'schedule_interval') else dag.schedule + assert schedule is None class TestDmpAfDomainSeparation: @@ -171,7 +173,9 @@ def test_catchup_disabled(self, generated_dags): """dry_run=True skips actual dbt execution and disables catchup so Airflow won't try to backfill missed runs on first deploy""" for name, dag in generated_dags.items(): - if dag.schedule_interval is not None: + # Airflow 3 removed schedule_interval in favor of schedule + schedule = getattr(dag, 'schedule_interval', None) if hasattr(dag, 'schedule_interval') else dag.schedule + if schedule is not None: assert dag.catchup is False, ( f"DAG '{name}' should have catchup=False in dry_run mode" ) From c0d9fa8d4072d118ad4105109205858539b37e48 Mon Sep 17 00:00:00 2001 From: dimitrionian Date: Fri, 8 May 2026 14:10:50 +0500 Subject: [PATCH 5/5] ADO-420 Run integration/functional tests for cosmos --- .../dbt/test_cosmos_integration.py | 239 ++++++++++++++++++ 1 file changed, 239 insertions(+) create mode 100644 tests/integration/dbt/test_cosmos_integration.py diff --git a/tests/integration/dbt/test_cosmos_integration.py b/tests/integration/dbt/test_cosmos_integration.py new file mode 100644 index 0000000000000..612669082edec --- /dev/null +++ b/tests/integration/dbt/test_cosmos_integration.py @@ -0,0 +1,239 @@ +""" +Integration tests for astronomer-cosmos framework + +Cosmos takes a fundamentally different approach from dbt-af/dmp-af: +- One DAG with all models as tasks (not domain-based splitting) +- Works with any standard dbt project (no custom config fields required) +- Multiple execution modes (local, docker, kubernetes, etc) +- Profile can be generated from Airflow connections (but profiles.yml using is also possible) +- Supports dbt build command (run + test together) +- Generic tests work out of the box (no fqn issues) +- Multiple ways to load the dbt graph (manifest, dbt ls, automatic) + +Run inside Breeze: + pip install astronomer-cosmos dbt-postgres + pytest tests/integration/dbt/test_cosmos_integration.py -v + +Uses the same demo dbt project as dbt-af/dmp-af tests +""" + +from __future__ import annotations + +from datetime import datetime +from pathlib import Path + +import pytest +from airflow.models.dag import DAG + +from cosmos import DbtTaskGroup, ExecutionConfig, ProfileConfig, ProjectConfig, RenderConfig +from cosmos.constants import ExecutionMode, LoadMode, TestBehavior +from cosmos.converter import DbtToAirflowConverter + +DBT_PROJECT_DIR = Path(__file__).parent / "demo" +MANIFEST_PATH = DBT_PROJECT_DIR / "target" / "manifest.json" + + +@pytest.fixture +def profile_config(): + """Profile config using profiles.yml from the demo project""" + return ProfileConfig( + profile_name="dbt_af_demo", + target_name="dev", + profiles_yml_filepath=DBT_PROJECT_DIR / "profiles.yml", + ) + + +@pytest.fixture +def project_config(): + return ProjectConfig(dbt_project_path=DBT_PROJECT_DIR) + + +@pytest.fixture +def project_config_with_manifest(): + return ProjectConfig( + dbt_project_path=DBT_PROJECT_DIR, + manifest_path=MANIFEST_PATH, + ) + + +class TestCosmosDagGeneration: + """Cosmos creates a DAG with all dbt models as Airflow tasks.""" + + def test_dag_has_tasks(self, project_config_with_manifest, profile_config): + """DbtToAirflowConverter populates a DAG with tasks from manifest.""" + with DAG("test_cosmos_dag", start_date=datetime(2024, 1, 1)) as dag: + DbtToAirflowConverter( + dag=dag, + project_config=project_config_with_manifest, + profile_config=profile_config, + execution_config=ExecutionConfig(execution_mode=ExecutionMode.LOCAL), + render_config=RenderConfig( + load_method=LoadMode.DBT_MANIFEST, + test_behavior=TestBehavior.NONE, + ), + ) + assert len(dag.task_ids) > 0, f"DAG should have tasks. Got: {dag.task_ids}" + + def test_model_tasks_present(self, project_config_with_manifest, profile_config): + """Each dbt model becomes an Airflow task within the DAG""" + with DAG("test_cosmos_models", start_date=datetime(2024, 1, 1)) as dag: + DbtToAirflowConverter( + dag=dag, + project_config=project_config_with_manifest, + profile_config=profile_config, + execution_config=ExecutionConfig(execution_mode=ExecutionMode.LOCAL), + render_config=RenderConfig( + load_method=LoadMode.DBT_MANIFEST, + test_behavior=TestBehavior.NONE, + ), + ) + task_ids = dag.task_ids + for model in ("stg_customers", "stg_orders", "int_orders_enriched", "orders", "customers"): + assert any(model in tid for tid in task_ids), ( + f"Model '{model}' not found in tasks: {task_ids}" + ) + + def test_snapshot_task_present(self, project_config_with_manifest, profile_config): + """Snapshots are included as tasks in the DAG""" + with DAG("test_cosmos_snapshot", start_date=datetime(2024, 1, 1)) as dag: + DbtToAirflowConverter( + dag=dag, + project_config=project_config_with_manifest, + profile_config=profile_config, + execution_config=ExecutionConfig(execution_mode=ExecutionMode.LOCAL), + render_config=RenderConfig( + load_method=LoadMode.DBT_MANIFEST, + test_behavior=TestBehavior.NONE, + ), + ) + task_ids = dag.task_ids + assert any("customers_snapshot_ts" in tid for tid in task_ids), ( + f"Snapshot task not found. Tasks: {task_ids}" + ) + + +class TestCosmosTestBehavior: + """Cosmos supports different strategies for running dbt tests""" + + def test_after_each_creates_test_tasks(self, project_config_with_manifest, profile_config): + """AFTER_EACH: test tasks run immediately after their parent model""" + with DAG("test_cosmos_after_each", start_date=datetime(2024, 1, 1)) as dag: + DbtToAirflowConverter( + dag=dag, + project_config=project_config_with_manifest, + profile_config=profile_config, + execution_config=ExecutionConfig(execution_mode=ExecutionMode.LOCAL), + render_config=RenderConfig( + load_method=LoadMode.DBT_MANIFEST, + test_behavior=TestBehavior.AFTER_EACH, + ), + ) + task_ids = dag.task_ids + test_tasks = [tid for tid in task_ids if "test" in tid.lower()] + assert len(test_tasks) > 0, f"Expected test tasks with AFTER_EACH. Tasks: {task_ids}" + + def test_none_skips_tests(self, project_config_with_manifest, profile_config): + """NONE: no test tasks are created.""" + with DAG("test_cosmos_none", start_date=datetime(2024, 1, 1)) as dag: + DbtToAirflowConverter( + dag=dag, + project_config=project_config_with_manifest, + profile_config=profile_config, + execution_config=ExecutionConfig(execution_mode=ExecutionMode.LOCAL), + render_config=RenderConfig( + load_method=LoadMode.DBT_MANIFEST, + test_behavior=TestBehavior.NONE, + ), + ) + task_ids = dag.task_ids + test_tasks = [tid for tid in task_ids if "test" in tid.lower()] + assert len(test_tasks) == 0, f"Expected no test tasks with NONE. Got: {test_tasks}" + + +class TestCosmosTaskGroup: + """DbtTaskGroup embeds dbt models as a TaskGroup inside an existing DAG""" + + def test_task_group_in_existing_dag(self, project_config_with_manifest, profile_config): + """dbt project can be embedded as a TaskGroup within a larger DAG""" + with DAG("test_cosmos_task_group", start_date=datetime(2024, 1, 1)) as dag: + DbtTaskGroup( + project_config=project_config_with_manifest, + profile_config=profile_config, + execution_config=ExecutionConfig(execution_mode=ExecutionMode.LOCAL), + render_config=RenderConfig( + load_method=LoadMode.DBT_MANIFEST, + test_behavior=TestBehavior.NONE, + ), + ) + assert len(dag.task_ids) > 0, f"TaskGroup should add tasks to DAG. Got: {dag.task_ids}" + + +class TestCosmosModelSelection: + """Cosmos supports dbt-style select/exclude for filtering models""" + + def test_select_specific_models(self, project_config_with_manifest, profile_config): + """select parameter filters which models are included in the DAG""" + with DAG("test_cosmos_select", start_date=datetime(2024, 1, 1)) as dag: + DbtToAirflowConverter( + dag=dag, + project_config=project_config_with_manifest, + profile_config=profile_config, + execution_config=ExecutionConfig(execution_mode=ExecutionMode.LOCAL), + render_config=RenderConfig( + load_method=LoadMode.DBT_MANIFEST, + select=["stg_customers", "stg_orders"], + test_behavior=TestBehavior.NONE, + ), + ) + task_ids = dag.task_ids + assert any("stg_" in tid for tid in task_ids), f"Expected staging models. Tasks: {task_ids}" + assert not any("revenue_by_region" in tid for tid in task_ids), ( + f"Should not have marts models when selecting staging only. Tasks: {task_ids}" + ) + + def test_exclude_models(self, project_config_with_manifest, profile_config): + """exclude parameter removes specific models from the DAG""" + with DAG("test_cosmos_exclude", start_date=datetime(2024, 1, 1)) as dag: + DbtToAirflowConverter( + dag=dag, + project_config=project_config_with_manifest, + profile_config=profile_config, + execution_config=ExecutionConfig(execution_mode=ExecutionMode.LOCAL), + render_config=RenderConfig( + load_method=LoadMode.DBT_MANIFEST, + exclude=["stg_customers", "stg_orders"], + test_behavior=TestBehavior.NONE, + ), + ) + task_ids = dag.task_ids + assert not any("stg_" in tid for tid in task_ids), ( + f"Should not have staging models when excluding them. Tasks: {task_ids}" + ) + # Other models should still be present + assert any("int_orders_enriched" in tid for tid in task_ids), ( + f"Non-excluded models should be present. Tasks: {task_ids}" + ) + + +class TestCosmosNoCustomConfigRequired: + """Unlike dbt-af/dmp-af, cosmos works with any standard dbt project""" + + def test_all_models_in_one_dag(self, project_config_with_manifest, profile_config): + """Cosmos does not require 'schedule' in model config, all models go into one DAG + regardless of any dbt-af-specific config fields presented in schema.yml""" + with DAG("test_cosmos_no_config", start_date=datetime(2024, 1, 1)) as dag: + DbtToAirflowConverter( + dag=dag, + project_config=project_config_with_manifest, + profile_config=profile_config, + execution_config=ExecutionConfig(execution_mode=ExecutionMode.LOCAL), + render_config=RenderConfig( + load_method=LoadMode.DBT_MANIFEST, + test_behavior=TestBehavior.NONE, + ), + ) + task_ids = dag.task_ids + # All models from all domains in one DAG + assert any("stg_orders" in tid for tid in task_ids) + assert any("int_orders_enriched" in tid for tid in task_ids) + assert any("orders" in tid for tid in task_ids)