Python-native data transformations for Snowflake.
Define your pipelines as Python files. Clair compiles them to SQL, builds a DAG, and runs them against Snowflake in dependency order — with data quality tests, incremental strategies, and structured logging out of the box.
SQL-only tools make it hard to express dependencies clearly. With clair, each table or view is a Python file that references its upstreams by import. And if SQL isn't the right tool — write pandas instead:
from clair import Trouve
from refined.products.catalog import trouve as catalog
from refined.products.reviews import trouve as reviews
trouve = Trouve(
sql=f"""
SELECT
c.product_id,
c.name,
avg(r.rating) AS avg_rating,
count(*) AS review_count
FROM {catalog} c
JOIN {reviews} r ON c.product_id = r.product_id
GROUP BY 1, 2
""",
)Import the upstream, use it in the f-string — clair figures out the rest.
Or skip SQL entirely and write pandas:
import pandas as pd
from clair import PandasTrouve
from refined.products.catalog import trouve as catalog
from refined.products.reviews import trouve as reviews
def summarize(inputs):
df = inputs["catalog"].merge(inputs["reviews"], on="product_id")
return df.groupby("name")["rating"].mean().reset_index()
trouve = PandasTrouve(
inputs={"catalog": catalog, "reviews": reviews},
transform=summarize,
)clair fetches the upstream tables from Snowflake, runs your function locally, then writes the result back. The DAG, lineage, --select filters, and data quality tests all work unchanged.
A Trouve is a single Python file representing one queryable Snowflake object. Its position in the directory tree determines its fully-qualified name:
my_project/
└── source/
└── products/
└── reviews.py → source.products.reviews
There are two kinds of Trouve:
Trouve— SQL-based. Compiles to a SnowflakeTABLEorVIEW. Runs inside Snowflake.PandasTrouve— pandas-based. Runs a Python function on the machine executing clair, then writes the result back to Snowflake.
Trouve has three types:
| Type | What it is |
|---|---|
SOURCE |
A Snowflake table that contains unprocessed data loaded into Snowflake by external tools. |
TABLE |
A Snowflake table that clair creates and populates. |
VIEW |
A Snowflake view that clair creates. |
Clair discovers all Trouves in your project, resolves the Python import references into a dependency graph, and executes nodes in topological order. If a node fails, all its downstream dependents are skipped automatically.
Each directory level maps to one part of the Snowflake name: database_name / schema_name / table_name.
my_project/
├── source/
│ └── products/
│ ├── catalog.py # source.products.catalog
│ └── reviews.py # source.products.reviews
├── refined/
│ └── products/
│ ├── catalog.py # refined.products.catalog
│ └── reviews.py # refined.products.reviews
├── derived/
│ └── products/
│ └── top_reviewed.py # derived.products.top_reviewed
└── _clairtifacts/ # compiled SQL artifacts — add to .gitignore
Files beginning with _ are ignored during discovery.
Full documentation — guides, CLI reference, and API reference — is at rivage-sh.github.io/clair.
Install the CLI globally using uv:
uv tool install rivage-clairThen add clair as a project dependency so your Trouve files get IDE autocompletion and type checking:
uv add rivage-clairRun clair init — it will prompt for your Snowflake connection details and write ~/.clair/environments.yml.
my_project/
├── source/
│ └── products/
│ ├── catalog.py
│ └── reviews.py
├── refined/
│ └── products/
│ ├── catalog.py
│ └── reviews.py
└── derived/
└── products/
└── top_reviewed.py
source/products/catalog.py — declares a pre-existing Snowflake table
from clair import Column, ColumnType, Trouve, TrouveType
trouve = Trouve(
type=TrouveType.SOURCE,
columns=[
Column(name="product_id", type=ColumnType.STRING),
Column(name="name", type=ColumnType.STRING),
Column(name="category", type=ColumnType.STRING),
Column(name="price", type=ColumnType.FLOAT),
],
)refined/products/catalog.py — cleans and standardises the source
from clair import Trouve, TrouveType
from source.products.catalog import trouve as source_catalog
trouve = Trouve(
type=TrouveType.TABLE,
sql=f"""
SELECT
product_id,
initcap(name) AS name,
lower(category) AS category,
price
FROM {source_catalog}
WHERE product_id IS NOT NULL
""",
)refined/products/reviews.py — cleans the reviews source
from clair import Trouve, TrouveType
from source.products.reviews import trouve as source_reviews
trouve = Trouve(
type=TrouveType.TABLE,
sql=f"""
SELECT
review_id,
product_id,
rating,
review_date
FROM {source_reviews}
WHERE rating BETWEEN 1 AND 5
""",
)derived/products/top_reviewed.py — joins and aggregates the refined layer
from clair import Trouve, TrouveType
from refined.products.catalog import trouve as catalog
from refined.products.reviews import trouve as reviews
trouve = Trouve(
type=TrouveType.TABLE,
sql=f"""
SELECT
c.product_id,
c.name,
c.category,
avg(r.rating) AS avg_rating,
count(*) AS review_count
FROM {catalog} c
JOIN {reviews} r ON c.product_id = r.product_id
GROUP BY 1, 2, 3
HAVING avg_rating >= 4
""",
)# Inspect the generated SQL without touching Snowflake
clair compile --project=my_project
# Execute against Snowflake
clair run --project=my_project --env=devclair --help # list all commandsScaffold a new project and write a profile interactively.
clair init --project=./my_projectResolve the DAG and write SQL to _clairtifacts/. No Snowflake connection needed.
clair compile --project=./my_project [--select='db.schema.*'] [--run-mode=incremental]Execute Trouves against Snowflake in topological order.
clair run --project=./my_project --env=dev [--select='db.schema.*'] [--run-mode=incremental]Run data quality tests against Snowflake.
clair test --project=./my_project --env=dev [--select='db.schema.*']Print the dependency graph as an indented tree.
clair dag --project=./my_project [--select='db.schema.*']Start a local web UI showing the project DAG and per-Trouve documentation. No Snowflake connection needed.
clair docs --project=./my_project [--port=8741] [--host=127.0.0.1] [--no-browser]Remove compiled artifacts from _clairtifacts/.
clair clean --project=./my_project --before=7d # older than 7 days
clair clean --project=./my_project --before=2026-03-01 # before a date
clair clean --project=./my_project --dry-run # preview onlyAttach tests to any TABLE or VIEW Trouve. They run against the live table in Snowflake.
from clair import Trouve, TrouveType, TestUnique, TestNotNull, TestRowCount
trouve = Trouve(
type=TrouveType.TABLE,
sql=f"SELECT event_id, user_id, event_type FROM {source_events}",
tests=[
TestUnique(column="event_id"),
TestNotNull(column="user_id"),
TestRowCount(min_rows=1),
],
)| Test | Checks |
|---|---|
TestUnique(column) |
No duplicate values in the column |
TestNotNull(column) |
No NULL values in the column |
TestRowCount(min_rows, max_rows) |
Row count is within the specified bounds |
TestUniqueColumns(columns) |
No duplicate combinations across multiple columns |
TestSql(sql) |
Arbitrary SQL — zero returned rows means pass |
TestSql lets you write any SQL assertion. Use THIS as a placeholder for the owning Trouve's table name, and reference other Trouves via import exactly as you would in Trouve.sql:
from clair import THIS, TestSql, Trouve, TrouveType
from source.payments.transactions import trouve as transactions
trouve = Trouve(
type=TrouveType.TABLE,
sql=f"SELECT * FROM {transactions}",
tests=[
TestSql(sql=f"SELECT * FROM {THIS} WHERE amount < 0"),
TestSql(sql=f"SELECT * FROM {THIS} t LEFT JOIN {transactions} tx ON t.id = tx.id WHERE tx.id IS NULL"),
],
)TestSql is skipped during --sample runs.
clair test --project=./my_project --env=devBy default, every run does a full refresh. To process only new data, configure a Trouve with RunMode.INCREMENTAL and choose a strategy:
Append — inserts new rows, never updates existing ones:
from clair import Trouve, TrouveType, RunConfig, RunMode, IncrementalMode
trouve = Trouve(
type=TrouveType.TABLE,
sql=f"""
SELECT * FROM {source_events}
WHERE occurred_at > dateadd('day', -3, current_timestamp())
""",
run_config=RunConfig(
run_mode=RunMode.INCREMENTAL,
incremental_mode=IncrementalMode.APPEND,
),
)Upsert — merges on primary key columns, updating existing rows and inserting new ones:
run_config=RunConfig(
run_mode=RunMode.INCREMENTAL,
incremental_mode=IncrementalMode.UPSERT,
primary_key_columns=["user_id"],
)If the target table doesn't exist yet, clair runs a full refresh automatically regardless of the configured strategy.
Pass --run-mode full_refresh on the CLI to force a full rebuild of everything in a single run.
When SQL isn't the right tool — complex reshaping, ML feature engineering, multi-step aggregations — use PandasTrouve. Your function receives upstream tables as DataFrames, runs locally on the machine executing clair, and the result is written back to Snowflake automatically.
import pandas as pd
from clair import PandasTrouve, Column, ColumnType, TestNotNull
from refined.products.catalog import trouve as catalog
from refined.products.reviews import trouve as reviews
def top_rated(inputs):
df = inputs["catalog"].merge(inputs["reviews"], on="product_id")
return (
df.groupby(["product_id", "name"])["rating"]
.mean()
.reset_index()
.rename(columns={"rating": "avg_rating"})
.query("avg_rating >= 4")
)
trouve = PandasTrouve(
inputs={"catalog": catalog, "reviews": reviews},
transform=top_rated,
columns=[
Column(name="product_id", type=ColumnType.STRING),
Column(name="name", type=ColumnType.STRING),
Column(name="avg_rating", type=ColumnType.FLOAT),
],
tests=[TestNotNull(column="product_id")],
docs="Top-rated products by average review score, computed in pandas.",
)Note: pandas transformations run on the machine executing clair, not inside Snowflake. Keep this in mind for large tables.
See the Pandas-Native Transformations guide for the full field reference, DAG integration details, and limitations.
All commands accept --select for glob-style filtering on fully-qualified names. Repeat the flag to union multiple patterns:
# Run everything in one schema
clair run --project=. --env=dev --select='source.products.*'
# Run a specific table
clair run --project=. --env=dev --select='derived.products.top_reviewed'
# Compile only tables matching a name pattern across all schemas
clair compile --project=. --select='*.*.top_*'
# Union multiple patterns -- select from two schemas at once
clair run --project=. --env=dev --select='source.products.*' --select='derived.products.*'Override the warehouse or role for a specific database or schema:
my_db/__database_config__.py
from clair import DatabaseDefaults
defaults = DatabaseDefaults(warehouse="transform_wh", role="transformer")my_db/derived/__schema_config__.py
from clair import SchemaDefaults
defaults = SchemaDefaults(warehouse="reporting_wh")Resolution order (later wins): environment defaults → __database_config__ → __schema_config__.
Example projects are included under example_projects/:
| Project | What it demonstrates |
|---|---|
example_1 |
A minimal 4-Trouve events pipeline with VARIANT flattening |
example_2 |
A 50-Trouve e-commerce warehouse across 4 layers (source → refined → derived → reports) |
example_3 |
Incremental APPEND and UPSERT strategies |
Each includes a setup.sql to create and seed the source tables and a verify.sql to inspect the results.
