Skip to content

Commit b5465a7

Browse files
authored
Merge branch 'main' into fix/docs-build-error
2 parents a8d08f5 + 4bbc329 commit b5465a7

16 files changed

Lines changed: 852 additions & 23 deletions

File tree

Makefile

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,11 +49,13 @@ install-dev-dbt-%:
4949
$(MAKE) install-dev; \
5050
if [ "$$version" = "1.6.0" ]; then \
5151
echo "Applying overrides for dbt 1.6.0"; \
52-
$(PIP) install 'pydantic>=2.0.0' 'google-cloud-bigquery==3.30.0' 'databricks-sdk==0.28.0' --reinstall; \
52+
$(PIP) install 'pydantic>=2.0.0' 'google-cloud-bigquery==3.30.0' 'databricks-sdk==0.28.0' \
53+
'pyOpenSSL>=24.0.0' --reinstall; \
5354
fi; \
5455
if [ "$$version" = "1.7.0" ]; then \
5556
echo "Applying overrides for dbt 1.7.0"; \
56-
$(PIP) install 'databricks-sdk==0.28.0' --reinstall; \
57+
$(PIP) install 'databricks-sdk==0.28.0' \
58+
'pyOpenSSL>=24.0.0' --reinstall; \
5759
fi; \
5860
if [ "$$version" = "1.5.0" ]; then \
5961
echo "Applying overrides for dbt 1.5.0"; \

docs/integrations/dlt.md

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,12 @@ This will create the configuration file and directories, which are found in all
2828

2929
SQLMesh will also automatically generate models to ingest data from the pipeline incrementally. Incremental loading is ideal for large datasets where recomputing entire tables is resource-intensive. In this case utilizing the [`INCREMENTAL_BY_TIME_RANGE` model kind](../concepts/models/model_kinds.md#incremental_by_time_range). However, these model definitions can be customized to meet your specific project needs.
3030

31-
#### Specify the path to the pipelines directory
31+
#### Specify the path to the pipelines working directory
3232

33-
The default location for dlt pipelines is `~/.dlt/pipelines/<pipeline_name>`. If your pipelines are in a [different directory](https://dlthub.com/docs/general-usage/pipeline#separate-working-environments-with-pipelines_dir), use the `--dlt-path` argument to specify the path explicitly:
33+
The default location for dlt pipeline working state is `~/.dlt/pipelines/<pipeline_name>`. If dlt stores your pipeline state in a [different pipelines working directory](https://dlthub.com/docs/general-usage/pipeline#separate-working-environments-with-pipelines_dir), use the `--dlt-path` argument to specify that directory explicitly. This should be the directory where dlt stores pipeline state, not the directory containing your pipeline scripts:
3434

3535
```bash
36-
sqlmesh init -t dlt --dlt-pipeline <pipeline-name> --dlt-path <pipelines-directory> dialect
36+
sqlmesh init -t dlt --dlt-pipeline <pipeline-name> --dlt-path <pipelines-working-directory> dialect
3737
```
3838

3939
### Generating models on demand
@@ -58,10 +58,10 @@ sqlmesh dlt_refresh <pipeline-name> --force
5858
sqlmesh dlt_refresh <pipeline-name> --table <dlt-table>
5959
```
6060

61-
- **Provide the explicit path to the pipelines directory** (using `--dlt-path`):
61+
- **Provide the explicit path to the pipelines working directory** (using `--dlt-path`):
6262

6363
```bash
64-
sqlmesh dlt_refresh <pipeline-name> --dlt-path <pipelines-directory>
64+
sqlmesh dlt_refresh <pipeline-name> --dlt-path <pipelines-working-directory>
6565
```
6666

6767
#### Configuration

docs/integrations/engines/clickhouse.md

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -420,6 +420,54 @@ If a model has many records in each partition, you may see additional performanc
420420

421421
Choose a model's time partitioning granularity based on the characteristics of the data it will process, making sure the total number of partitions is 1000 or fewer.
422422

423+
## Multi-gateway setup
424+
425+
ClickHouse does not have a catalog concept — its fully-qualified table names are two-level (`database.table`), not three-level (`catalog.database.table`).
426+
427+
When a SQLMesh project uses ClickHouse alongside a catalog-aware gateway such as Trino or BigQuery, the two gateway types produce FQNs with different nesting depths. SQLMesh's internal schema tracking requires uniform nesting, so it assigns a **virtual catalog** to ClickHouse models at load time.
428+
429+
### How the virtual catalog works
430+
431+
- SQLMesh automatically detects the nesting mismatch and injects a virtual catalog into each ClickHouse adapter when a catalog-aware gateway is also present.
432+
- ClickHouse models will appear with three-level FQNs in `sqlmesh plan` output and logs — for example, `__ch_prod__.mydb.mytable` for a gateway named `ch_prod`.
433+
- The virtual catalog prefix is **never sent to ClickHouse**. It is stripped from every DDL and DML statement before execution.
434+
- When ClickHouse is the only gateway in a project, no virtual catalog is assigned and models remain two-level.
435+
436+
### Adding a second gateway to an existing ClickHouse-only project
437+
438+
!!! warning "Re-materialization required"
439+
Adding a catalog-aware gateway (such as Trino or BigQuery) to a project that previously used ClickHouse as the only gateway triggers a **full re-materialization of every ClickHouse model** on the next `sqlmesh apply`. Plan for this before making the change.
440+
441+
If your project previously used ClickHouse as the only gateway, your models were fingerprinted with 2-level FQNs (`db.table`). Adding a catalog-aware gateway causes all ClickHouse models to be treated as new versions (their FQNs change to `__{gateway_name}__.db.table`):
442+
443+
- `FULL` models are recreated once — cost is proportional to the size of each table.
444+
- `INCREMENTAL_BY_TIME_RANGE` models require a **full historical backfill** from the model's configured start date.
445+
- The old 2-level model names appear as **Removed** in the plan and will be cleaned up after the environment TTL expires.
446+
447+
This is a one-time cost at the transition point and does not recur. There is no way to skip it — `--forward-only` does not apply because SQLMesh treats the 3-level names as new models, not modified ones.
448+
449+
### Virtual catalog naming
450+
451+
By default, the virtual catalog name is derived from **the gateway name you chose in your config**, wrapped in double underscores — for example, a gateway named `clickhouse` produces `__clickhouse__`, and a gateway named `ch_prod` produces `__ch_prod__`. The double-underscore wrapping makes it visually clear that this is an internal SQLMesh concept, not a real ClickHouse object.
452+
453+
You can override the default name by setting `virtual_catalog` in your ClickHouse connection configuration:
454+
455+
```yaml
456+
gateways:
457+
clickhouse:
458+
connection:
459+
type: clickhouse
460+
host: my-clickhouse-host
461+
username: default
462+
virtual_catalog: ch_virtual # optional; defaults to __{gateway_name}__ (e.g. __clickhouse__)
463+
trino:
464+
connection:
465+
type: trino
466+
...
467+
```
468+
469+
With this configuration, ClickHouse models will appear as `ch_virtual.mydb.mytable` in plan output instead of `__clickhouse__.mydb.mytable`.
470+
423471
## Local/Built-in Scheduler
424472

425473
**Engine Adapter Type**: `clickhouse`
@@ -446,4 +494,5 @@ If a model has many records in each partition, you may see additional performanc
446494
| `server_host_name` | The ClickHouse server hostname as identified by the CN or SNI of its TLS certificate. Set this to avoid SSL errors when connecting through a proxy or tunnel with a different hostname. | string | N |
447495
| `tls_mode` | Controls advanced TLS behavior. proxy and strict do not invoke ClickHouse mutual TLS connection, but do send client cert and key. mutual assumes ClickHouse mutual TLS auth with a client certificate. | string | N |
448496
| `connection_settings` | Additional [connection settings](https://clickhouse.com/docs/integrations/python#settings-argument) | dict | N |
449-
| `connection_pool_options` | Additional [options](https://clickhouse.com/docs/integrations/python#customizing-the-http-connection-pool) for the HTTP connection pool | dict | N |
497+
| `connection_pool_options` | Additional [options](https://clickhouse.com/docs/integrations/python#customizing-the-http-connection-pool) for the HTTP connection pool | dict | N |
498+
| `virtual_catalog` | Override the virtual catalog name used when ClickHouse runs alongside a catalog-aware gateway (e.g. Trino). Defaults to `__{gateway_name}__`. See [Multi-gateway setup](#multi-gateway-setup) for details. | string | N |

docs/reference/cli.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -279,7 +279,8 @@ Options:
279279
empty.
280280
--dlt-pipeline TEXT DLT pipeline for which to generate a SQLMesh project.
281281
Use alongside template: dlt
282-
--dlt-path TEXT The directory where the DLT pipeline resides. Use
282+
--dlt-path TEXT The DLT pipelines working directory, where DLT stores
283+
pipeline state (by default ~/.dlt/pipelines). Use
283284
alongside template: dlt
284285
--help Show this message and exit.
285286
```

sqlmesh/cli/main.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ def cli(
169169
@click.option(
170170
"--dlt-path",
171171
type=str,
172-
help="The directory where the DLT pipeline resides. Use alongside template: dlt",
172+
help="The DLT pipelines working directory, where DLT stores pipeline state (by default ~/.dlt/pipelines). Use alongside template: dlt",
173173
)
174174
@click.pass_context
175175
@error_handler
@@ -1155,7 +1155,7 @@ def table_name(
11551155
@click.option(
11561156
"--dlt-path",
11571157
type=str,
1158-
help="The directory where the DLT pipeline resides.",
1158+
help="The DLT pipelines working directory, where DLT stores pipeline state (by default ~/.dlt/pipelines).",
11591159
)
11601160
@click.pass_context
11611161
@error_handler

sqlmesh/core/config/connection.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2086,6 +2086,7 @@ class ClickhouseConnectionConfig(ConnectionConfig):
20862086
password: t.Optional[str] = None
20872087
port: t.Optional[int] = None
20882088
cluster: t.Optional[str] = None
2089+
virtual_catalog: t.Optional[str] = None
20892090
connect_timeout: int = 10
20902091
send_receive_timeout: int = 300
20912092
query_limit: int = 0
@@ -2120,6 +2121,19 @@ class ClickhouseConnectionConfig(ConnectionConfig):
21202121

21212122
_engine_import_validator = _get_engine_import_validator("clickhouse_connect", "clickhouse")
21222123

2124+
@field_validator("virtual_catalog")
2125+
def validate_virtual_catalog(cls, v: t.Optional[str]) -> t.Optional[str]:
2126+
if v is not None and not v.strip():
2127+
raise ConfigError(
2128+
"virtual_catalog cannot be an empty string. "
2129+
"Omit the field to use the default synthetic prefix (__<gateway_name>__)."
2130+
)
2131+
if v is not None and "." in v:
2132+
raise ConfigError(
2133+
f"virtual_catalog must be a single identifier with no dots (got: {v!r})"
2134+
)
2135+
return v
2136+
21232137
@property
21242138
def _connection_kwargs_keys(self) -> t.Set[str]:
21252139
kwargs = {
@@ -2181,7 +2195,11 @@ def cloud_mode(self) -> bool:
21812195

21822196
@property
21832197
def _extra_engine_config(self) -> t.Dict[str, t.Any]:
2184-
return {"cluster": self.cluster, "cloud_mode": self.cloud_mode}
2198+
return {
2199+
"cluster": self.cluster,
2200+
"cloud_mode": self.cloud_mode,
2201+
"virtual_catalog": self.virtual_catalog,
2202+
}
21852203

21862204
@property
21872205
def _static_connection_kwargs(self) -> t.Dict[str, t.Any]:

sqlmesh/core/config/scheduler.py

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,9 +138,29 @@ def create_plan_evaluator(self, context: GenericContext) -> PlanEvaluator:
138138

139139
def get_default_catalog_per_gateway(self, context: GenericContext) -> t.Dict[str, str]:
140140
default_catalogs_per_gateway: t.Dict[str, str] = {}
141+
unsupported_gateways = []
142+
141143
for gateway, adapter in context.engine_adapters.items():
142-
if catalog := adapter.default_catalog:
144+
if adapter.catalog_support.is_unsupported:
145+
unsupported_gateways.append((gateway, adapter))
146+
elif catalog := adapter.default_catalog:
143147
default_catalogs_per_gateway[gateway] = catalog
148+
149+
# When catalog-aware gateways exist, assign the gateway name as a virtual catalog for
150+
# catalog-unsupported gateways that opt in (e.g. ClickHouse) so that all models in the
151+
# project have a uniform 3-level FQN and the MappingSchema nesting level check passes.
152+
# Only adapters that explicitly return True from supports_virtual_catalog() are mutated;
153+
# other UNSUPPORTED adapters are left unchanged to avoid silent breakage.
154+
if default_catalogs_per_gateway and unsupported_gateways:
155+
for gateway, adapter in unsupported_gateways:
156+
if adapter.supports_virtual_catalog():
157+
adapter.inject_virtual_catalog(gateway)
158+
# Read the actual virtual catalog name back from the adapter — it may differ
159+
# from the gateway name if the user configured a custom virtual_catalog value.
160+
# inject_virtual_catalog() always sets _default_catalog so default_catalog
161+
# cannot return None at this point.
162+
default_catalogs_per_gateway[gateway] = adapter.default_catalog # type: ignore[assignment]
163+
144164
return default_catalogs_per_gateway
145165

146166

sqlmesh/core/context.py

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -492,6 +492,7 @@ def engine_adapter(self) -> EngineAdapter:
492492
@property
493493
def snapshot_evaluator(self) -> SnapshotEvaluator:
494494
if not self._snapshot_evaluator:
495+
self._ensure_virtual_catalog_injection()
495496
self._snapshot_evaluator = SnapshotEvaluator(
496497
{
497498
gateway: adapter.with_settings(execute_log_level=logging.INFO)
@@ -502,6 +503,15 @@ def snapshot_evaluator(self) -> SnapshotEvaluator:
502503
)
503504
return self._snapshot_evaluator
504505

506+
def _ensure_virtual_catalog_injection(self) -> None:
507+
"""Ensure virtual catalog injection has run before adapters are cloned for SnapshotEvaluator.
508+
509+
Injection is a side effect of get_default_catalog_per_gateway. In normal usage it fires
510+
earlier (default_catalog is accessed during model loading), but this guard covers the edge
511+
case where snapshot_evaluator is accessed directly on a fresh context before any model ops.
512+
"""
513+
_ = self.default_catalog_per_gateway
514+
505515
def execution_context(
506516
self,
507517
deployability_index: t.Optional[DeployabilityIndex] = None,
@@ -1440,6 +1450,8 @@ def plan(
14401450

14411451
plan = plan_builder.build()
14421452

1453+
self._warn_if_virtual_catalog_rematerialization(plan)
1454+
14431455
if no_auto_categorization or plan.uncategorized:
14441456
# Prompts are required if the auto categorization is disabled
14451457
# or if there are any uncategorized snapshots in the plan
@@ -2746,6 +2758,61 @@ def _run_plan_tests(self, skip_tests: bool = False) -> t.Optional[ModelTextTestR
27462758
return result
27472759
return None
27482760

2761+
def _warn_if_virtual_catalog_rematerialization(self, plan: "Plan") -> None:
2762+
"""Warn when ClickHouse models appear as new snapshots solely because a virtual catalog
2763+
prefix was added to their FQNs after a catalog-aware gateway joined the project.
2764+
2765+
This situation causes every previously-applied ClickHouse model to be treated as brand-new
2766+
by SQLMesh, triggering full re-materialization and historical backfills. Emitting a warning
2767+
before the plan is displayed gives users a chance to understand the cost before applying.
2768+
"""
2769+
from sqlglot import exp
2770+
2771+
# Collect the set of old 2-level snapshot names from the current environment so we can
2772+
# detect which new 3-level names are renames rather than genuinely new models.
2773+
old_names: t.Set[str] = set()
2774+
for s_id in plan.context_diff.removed_snapshots:
2775+
old_names.add(s_id.name)
2776+
for name in plan.context_diff.snapshots_by_name:
2777+
old_names.add(name)
2778+
2779+
affected: t.List[t.Tuple[str, str]] = [] # (new_3level_name, old_2level_name)
2780+
2781+
for gateway, adapter in self.engine_adapters.items():
2782+
if not adapter.supports_virtual_catalog() or not adapter._default_catalog:
2783+
continue
2784+
virtual_catalog = adapter._default_catalog
2785+
2786+
for snapshot in plan.new_snapshots:
2787+
table = exp.to_table(snapshot.name)
2788+
if table.catalog != virtual_catalog:
2789+
continue
2790+
# Reconstruct the 2-level name that would have been used before injection.
2791+
old_name = f"{table.db}.{table.name}"
2792+
if old_name in old_names:
2793+
affected.append((snapshot.name, old_name))
2794+
2795+
if not affected:
2796+
return
2797+
2798+
max_display = 10
2799+
model_lines = "\n".join(
2800+
f" - {new_name} (was: {old_name})" for new_name, old_name in affected[:max_display]
2801+
)
2802+
if len(affected) > max_display:
2803+
model_lines += f"\n ... and {len(affected) - max_display} more"
2804+
2805+
self.console.log_warning(
2806+
"ClickHouse models are being re-materialized due to virtual catalog FQN change.\n\n"
2807+
"The following ClickHouse models appear as new because their fully-qualified\n"
2808+
"names changed from 2-level (db.table) to 3-level (__gateway__.db.table):\n\n"
2809+
f"{model_lines}\n\n"
2810+
"FULL models will be recreated once. INCREMENTAL_BY_TIME_RANGE models will\n"
2811+
"require a full historical backfill from their configured start date.\n\n"
2812+
"This is a one-time cost when first adding a catalog-aware gateway to an\n"
2813+
"existing ClickHouse project. To proceed, run `sqlmesh apply`."
2814+
)
2815+
27492816
@property
27502817
def _model_tables(self) -> t.Dict[str, str]:
27512818
"""Mapping of model name to physical table name.

sqlmesh/core/engine_adapter/base.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,31 @@ def comments_enabled(self) -> bool:
223223
def catalog_support(self) -> CatalogSupport:
224224
return CatalogSupport.UNSUPPORTED
225225

226+
def supports_virtual_catalog(self) -> bool:
227+
"""Return True if this adapter can accept a virtual catalog for multi-gateway nesting alignment.
228+
229+
When a project mixes catalog-aware gateways (e.g. DuckDB) with catalog-unsupported gateways
230+
(e.g. ClickHouse), all adapters need a uniform 3-level FQN so MappingSchema nesting stays
231+
consistent. Adapters that return True here opt in to receiving an injected virtual catalog
232+
via inject_virtual_catalog(), which causes the set_catalog decorator to strip the catalog
233+
from DDL expressions rather than raising UnsupportedCatalogOperationError.
234+
"""
235+
return False
236+
237+
def inject_virtual_catalog(self, gateway: str) -> None:
238+
"""Inject a gateway name to configure the adapter's virtual catalog.
239+
240+
The adapter determines the final catalog name from the gateway name (e.g. ClickHouse
241+
wraps it as __{gateway}__). Only call this on adapters that return True from
242+
supports_virtual_catalog(). After injection, catalog_support should return
243+
SINGLE_CATALOG_ONLY so the set_catalog decorator strips the virtual catalog from DDL
244+
expressions instead of raising an error.
245+
"""
246+
raise NotImplementedError(
247+
f"{self.dialect} does not support virtual catalog injection. "
248+
"Override supports_virtual_catalog() to return True and implement inject_virtual_catalog()."
249+
)
250+
226251
@cached_property
227252
def schema_differ(self) -> SchemaDiffer:
228253
return SchemaDiffer(

0 commit comments

Comments
 (0)