From f6188e7ab4238e3fa9c050f7554cfa5b9aaac717 Mon Sep 17 00:00:00 2001 From: Michael Day Date: Mon, 30 Mar 2026 10:36:41 -0400 Subject: [PATCH] fix: prevent default catalog leak into catalog-unsupported gateways Fixes #5748 When the default gateway has a default catalog set (e.g., Trino with catalog: example_catalog), that catalog was silently prepended to model names targeting secondary gateways that do not support catalogs (e.g., ClickHouse), causing UnsupportedCatalogOperationError at evaluation time. The per-gateway catalog dict omits catalog-unsupported gateways entirely, so the model loader could not distinguish "no catalog" from "not checked" and fell through to the global default. This change explicitly sets default_catalog to None when a gateway is known but absent from the dict. Additionally preserves the original default_catalog across blueprint iterations to prevent cross-contamination when blueprints target different gateways. Signed-off-by: Michael Day --- sqlmesh/core/model/definition.py | 17 ++-- tests/core/test_model.py | 167 +++++++++++++++++++++++++++++++ 2 files changed, 178 insertions(+), 6 deletions(-) diff --git a/sqlmesh/core/model/definition.py b/sqlmesh/core/model/definition.py index 8d4f72e918..d4f23b4fc0 100644 --- a/sqlmesh/core/model/definition.py +++ b/sqlmesh/core/model/definition.py @@ -2065,7 +2065,9 @@ def create_models_from_blueprints( **loader_kwargs: t.Any, ) -> t.List[Model]: model_blueprints: t.List[Model] = [] + original_default_catalog = loader_kwargs.get("default_catalog") for blueprint in _extract_blueprints(blueprints, path): + loader_kwargs["default_catalog"] = original_default_catalog blueprint_variables = _extract_blueprint_variables(blueprint, path) if gateway: @@ -2083,12 +2085,15 @@ def create_models_from_blueprints( else: gateway_name = None - if ( - default_catalog_per_gateway - and gateway_name - and (catalog := default_catalog_per_gateway.get(gateway_name)) is not None - ): - loader_kwargs["default_catalog"] = catalog + if default_catalog_per_gateway and gateway_name: + catalog = default_catalog_per_gateway.get(gateway_name) + if catalog is not None: + loader_kwargs["default_catalog"] = catalog + else: + # Gateway exists but has no entry in the dict (e.g., catalog-unsupported + # engines like ClickHouse). Clear the default catalog so the global + # default from the primary gateway doesn't leak into this model's name. + loader_kwargs["default_catalog"] = None model_blueprints.append( loader( diff --git a/tests/core/test_model.py b/tests/core/test_model.py index 81707c075f..9bdc976b56 100644 --- a/tests/core/test_model.py +++ b/tests/core/test_model.py @@ -12342,3 +12342,170 @@ def test_audits_in_embedded_model(): ) with pytest.raises(ConfigError, match="Audits are not supported for embedded models"): load_sql_based_model(expression).validate_definition() + + +def test_default_catalog_not_leaked_to_unsupported_gateway(): + """ + Regression test for https://github.com/SQLMesh/sqlmesh/issues/5748 + + When a model targets a gateway that is NOT in default_catalog_per_gateway, + the global default_catalog should be cleared (set to None) instead of + leaking through from the default gateway. + """ + from sqlglot import parse + + expressions = parse( + """ + MODEL ( + name my_schema.my_model, + kind FULL, + gateway clickhouse_gw, + dialect clickhouse, + ); + + SELECT 1 AS id + """, + read="clickhouse", + ) + + default_catalog_per_gateway = { + "default_gw": "example_catalog", + } + + models = load_sql_based_models( + expressions, + get_variables=lambda gw: {}, + dialect="clickhouse", + default_catalog_per_gateway=default_catalog_per_gateway, + default_catalog="example_catalog", + ) + + assert len(models) == 1 + model = models[0] + + assert not model.catalog, ( + f"Default gateway catalog leaked into catalog-unsupported gateway model. " + f"Expected no catalog, got: {model.catalog}" + ) + assert "example_catalog" not in model.fqn, ( + f"Default gateway catalog found in model FQN: {model.fqn}" + ) + + +def test_default_catalog_still_applied_to_supported_gateway(): + """ + Control test: when a model targets a gateway that IS in default_catalog_per_gateway, + the catalog should still be correctly applied. + """ + from sqlglot import parse + + expressions = parse( + """ + MODEL ( + name my_schema.my_model, + kind FULL, + gateway other_duckdb, + ); + + SELECT 1 AS id + """, + read="duckdb", + ) + + default_catalog_per_gateway = { + "default_gw": "example_catalog", + "other_duckdb": "other_db", + } + + models = load_sql_based_models( + expressions, + get_variables=lambda gw: {}, + dialect="duckdb", + default_catalog_per_gateway=default_catalog_per_gateway, + default_catalog="example_catalog", + ) + + assert len(models) == 1 + model = models[0] + + assert model.catalog == "other_db", f"Expected catalog 'other_db', got: {model.catalog}" + + +def test_no_gateway_uses_global_default_catalog(): + """ + Control test: when a model does NOT specify a gateway, the global + default_catalog should still be applied as before. + """ + from sqlglot import parse + + expressions = parse( + """ + MODEL ( + name my_schema.my_model, + kind FULL, + ); + + SELECT 1 AS id + """, + read="duckdb", + ) + + model = load_sql_based_model( + expressions, + default_catalog="example_catalog", + dialect="duckdb", + ) + + assert model.catalog == "example_catalog" + + +def test_blueprint_catalog_not_cross_contaminated(): + """ + When blueprints iterate over different gateways, the catalog from one + blueprint iteration should not leak into the next. A ClickHouse blueprint + setting default_catalog to None should not prevent the following blueprint + from getting its correct catalog. + """ + from sqlglot import parse + + expressions = parse( + """ + MODEL ( + name @{blueprint}.my_model, + kind FULL, + gateway @{gw}, + blueprints ( + (blueprint := ch_schema, gw := clickhouse_gw), + (blueprint := db_schema, gw := default_gw), + ), + ); + + SELECT 1 AS id + """, + read="duckdb", + ) + + default_catalog_per_gateway = { + "default_gw": "example_catalog", + } + + models = load_sql_based_models( + expressions, + get_variables=lambda gw: {}, + dialect="duckdb", + default_catalog_per_gateway=default_catalog_per_gateway, + default_catalog="example_catalog", + ) + + assert len(models) == 2 + + ch_model = next(m for m in models if "ch_schema" in m.fqn) + db_model = next(m for m in models if "db_schema" in m.fqn) + + assert not ch_model.catalog, ( + f"Catalog leaked into ClickHouse blueprint. Got: {ch_model.catalog}" + ) + + assert db_model.catalog == "example_catalog", ( + f"Catalog lost for DuckDB blueprint after ClickHouse iteration. Got: {db_model.catalog}" + )