Skip to content

Commit 1ff0ed4

Browse files
authored
fix(clickhouse): support multi-gateway projects with catalog-aware engines (#5826)
Signed-off-by: mday-io <mdaytn@gmail.com> Signed-off-by: Michael Day <michael.day@cloudkitchens.com>
1 parent f372f62 commit 1ff0ed4

8 files changed

Lines changed: 812 additions & 4 deletions

File tree

docs/integrations/engines/clickhouse.md

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -420,6 +420,54 @@ If a model has many records in each partition, you may see additional performanc
420420

421421
Choose a model's time partitioning granularity based on the characteristics of the data it will process, making sure the total number of partitions is 1000 or fewer.
422422

423+
## Multi-gateway setup
424+
425+
ClickHouse does not have a catalog concept — its fully-qualified table names are two-level (`database.table`), not three-level (`catalog.database.table`).
426+
427+
When a SQLMesh project uses ClickHouse alongside a catalog-aware gateway such as Trino or BigQuery, the two gateway types produce FQNs with different nesting depths. SQLMesh's internal schema tracking requires uniform nesting, so it assigns a **virtual catalog** to ClickHouse models at load time.
428+
429+
### How the virtual catalog works
430+
431+
- SQLMesh automatically detects the nesting mismatch and injects a virtual catalog into each ClickHouse adapter when a catalog-aware gateway is also present.
432+
- ClickHouse models will appear with three-level FQNs in `sqlmesh plan` output and logs — for example, `__ch_prod__.mydb.mytable` for a gateway named `ch_prod`.
433+
- The virtual catalog prefix is **never sent to ClickHouse**. It is stripped from every DDL and DML statement before execution.
434+
- When ClickHouse is the only gateway in a project, no virtual catalog is assigned and models remain two-level.
435+
436+
### Adding a second gateway to an existing ClickHouse-only project
437+
438+
!!! warning "Re-materialization required"
439+
Adding a catalog-aware gateway (such as Trino or BigQuery) to a project that previously used ClickHouse as the only gateway triggers a **full re-materialization of every ClickHouse model** on the next `sqlmesh apply`. Plan for this before making the change.
440+
441+
If your project previously used ClickHouse as the only gateway, your models were fingerprinted with 2-level FQNs (`db.table`). Adding a catalog-aware gateway causes all ClickHouse models to be treated as new versions (their FQNs change to `__{gateway_name}__.db.table`):
442+
443+
- `FULL` models are recreated once — cost is proportional to the size of each table.
444+
- `INCREMENTAL_BY_TIME_RANGE` models require a **full historical backfill** from the model's configured start date.
445+
- The old 2-level model names appear as **Removed** in the plan and will be cleaned up after the environment TTL expires.
446+
447+
This is a one-time cost at the transition point and does not recur. There is no way to skip it — `--forward-only` does not apply because SQLMesh treats the 3-level names as new models, not modified ones.
448+
449+
### Virtual catalog naming
450+
451+
By default, the virtual catalog name is derived from **the gateway name you chose in your config**, wrapped in double underscores — for example, a gateway named `clickhouse` produces `__clickhouse__`, and a gateway named `ch_prod` produces `__ch_prod__`. The double-underscore wrapping makes it visually clear that this is an internal SQLMesh concept, not a real ClickHouse object.
452+
453+
You can override the default name by setting `virtual_catalog` in your ClickHouse connection configuration:
454+
455+
```yaml
456+
gateways:
457+
clickhouse:
458+
connection:
459+
type: clickhouse
460+
host: my-clickhouse-host
461+
username: default
462+
virtual_catalog: ch_virtual # optional; defaults to __{gateway_name}__ (e.g. __clickhouse__)
463+
trino:
464+
connection:
465+
type: trino
466+
...
467+
```
468+
469+
With this configuration, ClickHouse models will appear as `ch_virtual.mydb.mytable` in plan output instead of `__clickhouse__.mydb.mytable`.
470+
423471
## Local/Built-in Scheduler
424472

425473
**Engine Adapter Type**: `clickhouse`
@@ -446,4 +494,5 @@ If a model has many records in each partition, you may see additional performanc
446494
| `server_host_name` | The ClickHouse server hostname as identified by the CN or SNI of its TLS certificate. Set this to avoid SSL errors when connecting through a proxy or tunnel with a different hostname. | string | N |
447495
| `tls_mode` | Controls advanced TLS behavior. proxy and strict do not invoke ClickHouse mutual TLS connection, but do send client cert and key. mutual assumes ClickHouse mutual TLS auth with a client certificate. | string | N |
448496
| `connection_settings` | Additional [connection settings](https://clickhouse.com/docs/integrations/python#settings-argument) | dict | N |
449-
| `connection_pool_options` | Additional [options](https://clickhouse.com/docs/integrations/python#customizing-the-http-connection-pool) for the HTTP connection pool | dict | N |
497+
| `connection_pool_options` | Additional [options](https://clickhouse.com/docs/integrations/python#customizing-the-http-connection-pool) for the HTTP connection pool | dict | N |
498+
| `virtual_catalog` | Override the virtual catalog name used when ClickHouse runs alongside a catalog-aware gateway (e.g. Trino). Defaults to `__{gateway_name}__`. See [Multi-gateway setup](#multi-gateway-setup) for details. | string | N |

sqlmesh/core/config/connection.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2086,6 +2086,7 @@ class ClickhouseConnectionConfig(ConnectionConfig):
20862086
password: t.Optional[str] = None
20872087
port: t.Optional[int] = None
20882088
cluster: t.Optional[str] = None
2089+
virtual_catalog: t.Optional[str] = None
20892090
connect_timeout: int = 10
20902091
send_receive_timeout: int = 300
20912092
query_limit: int = 0
@@ -2120,6 +2121,19 @@ class ClickhouseConnectionConfig(ConnectionConfig):
21202121

21212122
_engine_import_validator = _get_engine_import_validator("clickhouse_connect", "clickhouse")
21222123

2124+
@field_validator("virtual_catalog")
2125+
def validate_virtual_catalog(cls, v: t.Optional[str]) -> t.Optional[str]:
2126+
if v is not None and not v.strip():
2127+
raise ConfigError(
2128+
"virtual_catalog cannot be an empty string. "
2129+
"Omit the field to use the default synthetic prefix (__<gateway_name>__)."
2130+
)
2131+
if v is not None and "." in v:
2132+
raise ConfigError(
2133+
f"virtual_catalog must be a single identifier with no dots (got: {v!r})"
2134+
)
2135+
return v
2136+
21232137
@property
21242138
def _connection_kwargs_keys(self) -> t.Set[str]:
21252139
kwargs = {
@@ -2181,7 +2195,11 @@ def cloud_mode(self) -> bool:
21812195

21822196
@property
21832197
def _extra_engine_config(self) -> t.Dict[str, t.Any]:
2184-
return {"cluster": self.cluster, "cloud_mode": self.cloud_mode}
2198+
return {
2199+
"cluster": self.cluster,
2200+
"cloud_mode": self.cloud_mode,
2201+
"virtual_catalog": self.virtual_catalog,
2202+
}
21852203

21862204
@property
21872205
def _static_connection_kwargs(self) -> t.Dict[str, t.Any]:

sqlmesh/core/config/scheduler.py

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,9 +138,29 @@ def create_plan_evaluator(self, context: GenericContext) -> PlanEvaluator:
138138

139139
def get_default_catalog_per_gateway(self, context: GenericContext) -> t.Dict[str, str]:
140140
default_catalogs_per_gateway: t.Dict[str, str] = {}
141+
unsupported_gateways = []
142+
141143
for gateway, adapter in context.engine_adapters.items():
142-
if catalog := adapter.default_catalog:
144+
if adapter.catalog_support.is_unsupported:
145+
unsupported_gateways.append((gateway, adapter))
146+
elif catalog := adapter.default_catalog:
143147
default_catalogs_per_gateway[gateway] = catalog
148+
149+
# When catalog-aware gateways exist, assign the gateway name as a virtual catalog for
150+
# catalog-unsupported gateways that opt in (e.g. ClickHouse) so that all models in the
151+
# project have a uniform 3-level FQN and the MappingSchema nesting level check passes.
152+
# Only adapters that explicitly return True from supports_virtual_catalog() are mutated;
153+
# other UNSUPPORTED adapters are left unchanged to avoid silent breakage.
154+
if default_catalogs_per_gateway and unsupported_gateways:
155+
for gateway, adapter in unsupported_gateways:
156+
if adapter.supports_virtual_catalog():
157+
adapter.inject_virtual_catalog(gateway)
158+
# Read the actual virtual catalog name back from the adapter — it may differ
159+
# from the gateway name if the user configured a custom virtual_catalog value.
160+
# inject_virtual_catalog() always sets _default_catalog so default_catalog
161+
# cannot return None at this point.
162+
default_catalogs_per_gateway[gateway] = adapter.default_catalog # type: ignore[assignment]
163+
144164
return default_catalogs_per_gateway
145165

146166

sqlmesh/core/context.py

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -492,6 +492,7 @@ def engine_adapter(self) -> EngineAdapter:
492492
@property
493493
def snapshot_evaluator(self) -> SnapshotEvaluator:
494494
if not self._snapshot_evaluator:
495+
self._ensure_virtual_catalog_injection()
495496
self._snapshot_evaluator = SnapshotEvaluator(
496497
{
497498
gateway: adapter.with_settings(execute_log_level=logging.INFO)
@@ -502,6 +503,15 @@ def snapshot_evaluator(self) -> SnapshotEvaluator:
502503
)
503504
return self._snapshot_evaluator
504505

506+
def _ensure_virtual_catalog_injection(self) -> None:
507+
"""Ensure virtual catalog injection has run before adapters are cloned for SnapshotEvaluator.
508+
509+
Injection is a side effect of get_default_catalog_per_gateway. In normal usage it fires
510+
earlier (default_catalog is accessed during model loading), but this guard covers the edge
511+
case where snapshot_evaluator is accessed directly on a fresh context before any model ops.
512+
"""
513+
_ = self.default_catalog_per_gateway
514+
505515
def execution_context(
506516
self,
507517
deployability_index: t.Optional[DeployabilityIndex] = None,
@@ -1440,6 +1450,8 @@ def plan(
14401450

14411451
plan = plan_builder.build()
14421452

1453+
self._warn_if_virtual_catalog_rematerialization(plan)
1454+
14431455
if no_auto_categorization or plan.uncategorized:
14441456
# Prompts are required if the auto categorization is disabled
14451457
# or if there are any uncategorized snapshots in the plan
@@ -2746,6 +2758,61 @@ def _run_plan_tests(self, skip_tests: bool = False) -> t.Optional[ModelTextTestR
27462758
return result
27472759
return None
27482760

2761+
def _warn_if_virtual_catalog_rematerialization(self, plan: "Plan") -> None:
2762+
"""Warn when ClickHouse models appear as new snapshots solely because a virtual catalog
2763+
prefix was added to their FQNs after a catalog-aware gateway joined the project.
2764+
2765+
This situation causes every previously-applied ClickHouse model to be treated as brand-new
2766+
by SQLMesh, triggering full re-materialization and historical backfills. Emitting a warning
2767+
before the plan is displayed gives users a chance to understand the cost before applying.
2768+
"""
2769+
from sqlglot import exp
2770+
2771+
# Collect the set of old 2-level snapshot names from the current environment so we can
2772+
# detect which new 3-level names are renames rather than genuinely new models.
2773+
old_names: t.Set[str] = set()
2774+
for s_id in plan.context_diff.removed_snapshots:
2775+
old_names.add(s_id.name)
2776+
for name in plan.context_diff.snapshots_by_name:
2777+
old_names.add(name)
2778+
2779+
affected: t.List[t.Tuple[str, str]] = [] # (new_3level_name, old_2level_name)
2780+
2781+
for gateway, adapter in self.engine_adapters.items():
2782+
if not adapter.supports_virtual_catalog() or not adapter._default_catalog:
2783+
continue
2784+
virtual_catalog = adapter._default_catalog
2785+
2786+
for snapshot in plan.new_snapshots:
2787+
table = exp.to_table(snapshot.name)
2788+
if table.catalog != virtual_catalog:
2789+
continue
2790+
# Reconstruct the 2-level name that would have been used before injection.
2791+
old_name = f"{table.db}.{table.name}"
2792+
if old_name in old_names:
2793+
affected.append((snapshot.name, old_name))
2794+
2795+
if not affected:
2796+
return
2797+
2798+
max_display = 10
2799+
model_lines = "\n".join(
2800+
f" - {new_name} (was: {old_name})" for new_name, old_name in affected[:max_display]
2801+
)
2802+
if len(affected) > max_display:
2803+
model_lines += f"\n ... and {len(affected) - max_display} more"
2804+
2805+
self.console.log_warning(
2806+
"ClickHouse models are being re-materialized due to virtual catalog FQN change.\n\n"
2807+
"The following ClickHouse models appear as new because their fully-qualified\n"
2808+
"names changed from 2-level (db.table) to 3-level (__gateway__.db.table):\n\n"
2809+
f"{model_lines}\n\n"
2810+
"FULL models will be recreated once. INCREMENTAL_BY_TIME_RANGE models will\n"
2811+
"require a full historical backfill from their configured start date.\n\n"
2812+
"This is a one-time cost when first adding a catalog-aware gateway to an\n"
2813+
"existing ClickHouse project. To proceed, run `sqlmesh apply`."
2814+
)
2815+
27492816
@property
27502817
def _model_tables(self) -> t.Dict[str, str]:
27512818
"""Mapping of model name to physical table name.

sqlmesh/core/engine_adapter/base.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,31 @@ def comments_enabled(self) -> bool:
223223
def catalog_support(self) -> CatalogSupport:
224224
return CatalogSupport.UNSUPPORTED
225225

226+
def supports_virtual_catalog(self) -> bool:
227+
"""Return True if this adapter can accept a virtual catalog for multi-gateway nesting alignment.
228+
229+
When a project mixes catalog-aware gateways (e.g. DuckDB) with catalog-unsupported gateways
230+
(e.g. ClickHouse), all adapters need a uniform 3-level FQN so MappingSchema nesting stays
231+
consistent. Adapters that return True here opt in to receiving an injected virtual catalog
232+
via inject_virtual_catalog(), which causes the set_catalog decorator to strip the catalog
233+
from DDL expressions rather than raising UnsupportedCatalogOperationError.
234+
"""
235+
return False
236+
237+
def inject_virtual_catalog(self, gateway: str) -> None:
238+
"""Inject a gateway name to configure the adapter's virtual catalog.
239+
240+
The adapter determines the final catalog name from the gateway name (e.g. ClickHouse
241+
wraps it as __{gateway}__). Only call this on adapters that return True from
242+
supports_virtual_catalog(). After injection, catalog_support should return
243+
SINGLE_CATALOG_ONLY so the set_catalog decorator strips the virtual catalog from DDL
244+
expressions instead of raising an error.
245+
"""
246+
raise NotImplementedError(
247+
f"{self.dialect} does not support virtual catalog injection. "
248+
"Override supports_virtual_catalog() to return True and implement inject_virtual_catalog()."
249+
)
250+
226251
@cached_property
227252
def schema_differ(self) -> SchemaDiffer:
228253
return SchemaDiffer(

0 commit comments

Comments
 (0)