Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions docs/dbt_integration/examples/dbt_af_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
"""
Example: dbt-af (Toloka) integration with Apache Airflow

dbt-af auto-generates Airflow DAGs from a dbt manifest.json
It parses the dbt project graph, creates domain-based DAGs with proper
scheduling, cross-DAG sensors, and test grouping

Requirements:
pip install dbt-af dbt-postgres

Key dbt-af features demonstrated:
- Domain-based DAG separation (staging, intermediate, marts - separate DAGs)
- Cross-schedule dependencies (daily - hourly - daily via ExternalTaskSensor)
- Singular test tasks (not_null checks as inline tasks)
- Snapshot support (customers_snapshot_ts)
- Backfill DAGs (auto-generated alongside scheduled DAGs)
- Manual "run single model" DAG with Airflow UI params
- Source freshness sensors (for sources with freshness config)
- dry_run mode (skips actual dbt execution, disables catchup)

IMPORTANT — dbt-af project structure constraints:
- Schedule values MUST use @ prefix: "@daily", "@hourly", "@weekly"
(without @ prefix, dbt-af silently falls back to @daily)
- Generic tests (unique/not_null in schema.yml) are INCOMPATIBLE with dbt-af
due to fqn[3] IndexError. Use singular tests in tests/dbt/models/<domain>/ instead.
- Every model config MUST have: sql_cluster, py_cluster, daily_sql_cluster, bf_cluster

Running tests:
pip install dbt-af dbt-postgres
pytest tests/integration/dbt/test_dbt_af_integration.py -v
"""

from pathlib import Path

from dbt_af.conf import Config, DbtDefaultTargetsConfig, DbtProjectConfig
from dbt_af.dags import compile_dbt_af_dags

DBT_PROJECT_DIR = Path(__file__).parent.parent.parent.parent / "dbt_projects" / "demo"

config = Config(
dbt_project=DbtProjectConfig(
dbt_project_name="dbt_af_demo",
dbt_models_path=DBT_PROJECT_DIR / "models",
dbt_project_path=DBT_PROJECT_DIR,
dbt_profiles_path=DBT_PROJECT_DIR,
dbt_target_path=DBT_PROJECT_DIR / "target",
dbt_log_path=DBT_PROJECT_DIR / "logs",
dbt_schema="public",
),
dbt_default_targets=DbtDefaultTargetsConfig(
default_target="dev",
),
max_active_dag_runs=1,
include_single_model_manual_dag=True,
debug_mode_enabled=False,
dry_run=True,
)

dags = compile_dbt_af_dags(
manifest_path=str(DBT_PROJECT_DIR / "target" / "manifest.json"),
config=config,
)

for dag_name, dag in dags.items():
globals()[dag_name] = dag
4 changes: 4 additions & 0 deletions hatch_build.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,9 @@
"towncrier>=23.11.0",
"twine>=4.0.2",
],
"devel-dbt": [
"dbt-af>=0.14.0",
],
"devel-duckdb": [
# Python 3.12 support was added in 0.10.0
"duckdb>=0.10.0; python_version >= '3.12'",
Expand Down Expand Up @@ -281,6 +284,7 @@
"devel": [
"apache-airflow[celery]",
"apache-airflow[cncf-kubernetes]",
"apache-airflow[devel-dbt]",
"apache-airflow[devel-debuggers]",
"apache-airflow[devel-devscripts]",
"apache-airflow[devel-duckdb]",
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ dynamic = ["version", "optional-dependencies", "dependencies"]
#
# START DEVEL EXTRAS HERE
#
# devel, devel-all-dbs, devel-ci, devel-debuggers, devel-devscripts, devel-duckdb, devel-hadoop,
# devel, devel-all-dbs, devel-ci, devel-dbt, devel-debuggers, devel-devscripts, devel-duckdb, devel-hadoop,
# devel-mypy, devel-sentry, devel-static-checks, devel-tests
#
# END DEVEL EXTRAS HERE
Expand Down
Empty file.
24 changes: 24 additions & 0 deletions tests/integration/dbt/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from __future__ import annotations

import subprocess
from pathlib import Path

DBT_PROJECT_DIR = Path(__file__).parent / "demo"


def pytest_configure(config):
"""Run dbt compile to generate manifest.json before test collection"""
manifest = DBT_PROJECT_DIR / "target" / "manifest.json"
if manifest.exists():
return

subprocess.run(
[
"dbt", "compile",
"--project-dir", str(DBT_PROJECT_DIR),
"--profiles-dir", str(DBT_PROJECT_DIR),
],
check=True,
capture_output=True,
text=True,
)
24 changes: 24 additions & 0 deletions tests/integration/dbt/demo/dbt_project.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
name: 'dbt_af_demo'
version: '1.0.0'
config-version: 2

profile: 'dbt_af_demo'

model-paths: ["models"]
snapshot-paths: ["snapshots"]
macro-paths: ["macros"]
target-path: "target"
clean-targets: ["target", "dbt_packages"]

models:
dbt_af_demo:
staging:
+materialized: view
intermediate:
+materialized: table
marts:
+materialized: table
core:
+schema: marts_core
analytics:
+schema: marts_analytics
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
{{ config(materialized='table') }}

WITH orders AS (
SELECT * FROM {{ ref('stg_orders') }}
),
customers AS (
SELECT * FROM {{ ref('stg_customers') }}
)
SELECT
o.order_id,
o.customer_id,
o.order_date,
o.amount_cents,
o.status,
c.customer_name,
c.account_balance,
c.market_segment,
o.loaded_at
FROM orders o
LEFT JOIN customers c ON o.customer_id = c.customer_id
11 changes: 11 additions & 0 deletions tests/integration/dbt/demo/models/intermediate/schema.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
version: 2

models:
- name: int_orders_enriched
description: "Orders enriched with customer data"
config:
schedule: "@hourly"
sql_cluster: dev
py_cluster: dev
daily_sql_cluster: dev
bf_cluster: dev
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
{{ config(materialized='table') }}

WITH orders AS (
SELECT * FROM {{ ref('int_orders_enriched') }}
)
SELECT
COALESCE(market_segment, 'Unknown') AS market_segment,
date_trunc('month', order_date)::date AS order_month,
count(DISTINCT order_id) AS order_count,
count(DISTINCT customer_id) AS unique_customers,
round(sum(amount_cents) / 100.0, 2) AS revenue
FROM orders
GROUP BY 1, 2
8 changes: 8 additions & 0 deletions tests/integration/dbt/demo/models/marts/core/customers.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{{ config(materialized='table') }}

SELECT
customer_id,
customer_name,
account_balance,
market_segment
FROM {{ ref('stg_customers') }}
16 changes: 16 additions & 0 deletions tests/integration/dbt/demo/models/marts/core/orders.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{{ config(materialized='table') }}

SELECT
o.order_id,
o.customer_id,
o.order_date,
o.amount_cents,
o.status,
CASE o.status
WHEN 'O' THEN 'Open'
WHEN 'F' THEN 'Fulfilled'
WHEN 'P' THEN 'Processing'
END AS status_label,
o.customer_name,
o.market_segment
FROM {{ ref('int_orders_enriched') }} o
29 changes: 29 additions & 0 deletions tests/integration/dbt/demo/models/marts/schema.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
version: 2

models:
- name: orders
description: "Core orders mart"
config:
schedule: "@daily"
sql_cluster: dev
py_cluster: dev
daily_sql_cluster: dev
bf_cluster: dev

- name: customers
description: "Core customers mart"
config:
schedule: "@daily"
sql_cluster: dev
py_cluster: dev
daily_sql_cluster: dev
bf_cluster: dev

- name: revenue_by_region
description: "Monthly revenue by market segment"
config:
schedule: "@daily"
sql_cluster: dev
py_cluster: dev
daily_sql_cluster: dev
bf_cluster: dev
18 changes: 18 additions & 0 deletions tests/integration/dbt/demo/models/staging/schema.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
version: 2

models:
- name: stg_orders
config:
schedule: "@daily"
sql_cluster: dev
py_cluster: dev
daily_sql_cluster: dev
bf_cluster: dev

- name: stg_customers
config:
schedule: "@daily"
sql_cluster: dev
py_cluster: dev
daily_sql_cluster: dev
bf_cluster: dev
13 changes: 13 additions & 0 deletions tests/integration/dbt/demo/models/staging/sources.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
version: 2

sources:
- name: src
schema: src
freshness:
warn_after: {count: 24, period: hour}
error_after: {count: 48, period: hour}
loaded_at_field: loaded_at
tables:
- name: orders
- name: customers
- name: customers_scd
7 changes: 7 additions & 0 deletions tests/integration/dbt/demo/models/staging/stg_customers.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
SELECT
c_custkey AS customer_id,
c_name AS customer_name,
c_nationkey AS nation_id,
c_acctbal AS account_balance,
c_mktsegment AS market_segment
FROM {{ source('src', 'customers') }}
8 changes: 8 additions & 0 deletions tests/integration/dbt/demo/models/staging/stg_orders.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
SELECT
o_orderkey AS order_id,
o_custkey AS customer_id,
o_orderdate AS order_date,
o_totalprice AS amount_cents,
o_orderstatus AS status,
now() AS loaded_at
FROM {{ source('src', 'orders') }}
12 changes: 12 additions & 0 deletions tests/integration/dbt/demo/profiles.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
dbt_af_demo:
target: dev
outputs:
dev:
type: postgres
host: "{{ env_var('DBT_HOST', 'postgres') }}"
port: "{{ env_var('DBT_PORT', '5432') | int }}"
user: "{{ env_var('DBT_USER', 'postgres') }}"
password: "{{ env_var('DBT_PASSWORD', 'airflow') }}"
dbname: "{{ env_var('DBT_DBNAME', 'airflow') }}"
schema: "{{ env_var('DBT_SCHEMA', 'public') }}"
threads: 4
23 changes: 23 additions & 0 deletions tests/integration/dbt/demo/snapshots/customers_snapshot_ts.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
{% snapshot customers_snapshot_ts %}
{{
config(
target_schema='snapshots',
unique_key='c_custkey',
strategy='timestamp',
updated_at='c_updated_at',
schedule='@daily',
sql_cluster='dev',
py_cluster='dev',
daily_sql_cluster='dev',
bf_cluster='dev'
)
}}
select
c_custkey,
c_name,
c_nationkey,
c_acctbal,
c_mktsegment,
c_updated_at
from {{ source('src', 'customers_scd') }}
{% endsnapshot %}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
select customer_id from {{ ref('customers') }} where customer_id is null
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
select order_id from {{ ref('orders') }} where order_id is null
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
select customer_id from {{ ref('stg_customers') }} where customer_id is null
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
select order_id from {{ ref('stg_orders') }} where order_id is null
Loading