Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions sqlmesh/core/model/definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -2065,7 +2065,9 @@ def create_models_from_blueprints(
**loader_kwargs: t.Any,
) -> t.List[Model]:
model_blueprints: t.List[Model] = []
original_default_catalog = loader_kwargs.get("default_catalog")
for blueprint in _extract_blueprints(blueprints, path):
loader_kwargs["default_catalog"] = original_default_catalog
blueprint_variables = _extract_blueprint_variables(blueprint, path)

if gateway:
Expand All @@ -2083,12 +2085,15 @@ def create_models_from_blueprints(
else:
gateway_name = None

if (
default_catalog_per_gateway
and gateway_name
and (catalog := default_catalog_per_gateway.get(gateway_name)) is not None
):
loader_kwargs["default_catalog"] = catalog
if default_catalog_per_gateway and gateway_name:
catalog = default_catalog_per_gateway.get(gateway_name)
if catalog is not None:
loader_kwargs["default_catalog"] = catalog
else:
# Gateway exists but has no entry in the dict (e.g., catalog-unsupported
# engines like ClickHouse). Clear the default catalog so the global
# default from the primary gateway doesn't leak into this model's name.
loader_kwargs["default_catalog"] = None

model_blueprints.append(
loader(
Expand Down
167 changes: 167 additions & 0 deletions tests/core/test_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -12342,3 +12342,170 @@ def test_audits_in_embedded_model():
)
with pytest.raises(ConfigError, match="Audits are not supported for embedded models"):
load_sql_based_model(expression).validate_definition()


def test_default_catalog_not_leaked_to_unsupported_gateway():
"""
Regression test for https://github.com/SQLMesh/sqlmesh/issues/5748
When a model targets a gateway that is NOT in default_catalog_per_gateway,
the global default_catalog should be cleared (set to None) instead of
leaking through from the default gateway.
"""
from sqlglot import parse

expressions = parse(
"""
MODEL (
name my_schema.my_model,
kind FULL,
gateway clickhouse_gw,
dialect clickhouse,
);
SELECT 1 AS id
""",
read="clickhouse",
)

default_catalog_per_gateway = {
"default_gw": "example_catalog",
}

models = load_sql_based_models(
expressions,
get_variables=lambda gw: {},
dialect="clickhouse",
default_catalog_per_gateway=default_catalog_per_gateway,
default_catalog="example_catalog",
)

assert len(models) == 1
model = models[0]

assert not model.catalog, (
f"Default gateway catalog leaked into catalog-unsupported gateway model. "
f"Expected no catalog, got: {model.catalog}"
)
assert "example_catalog" not in model.fqn, (
f"Default gateway catalog found in model FQN: {model.fqn}"
)


def test_default_catalog_still_applied_to_supported_gateway():
"""
Control test: when a model targets a gateway that IS in default_catalog_per_gateway,
the catalog should still be correctly applied.
"""
from sqlglot import parse

expressions = parse(
"""
MODEL (
name my_schema.my_model,
kind FULL,
gateway other_duckdb,
);
SELECT 1 AS id
""",
read="duckdb",
)

default_catalog_per_gateway = {
"default_gw": "example_catalog",
"other_duckdb": "other_db",
}

models = load_sql_based_models(
expressions,
get_variables=lambda gw: {},
dialect="duckdb",
default_catalog_per_gateway=default_catalog_per_gateway,
default_catalog="example_catalog",
)

assert len(models) == 1
model = models[0]

assert model.catalog == "other_db", f"Expected catalog 'other_db', got: {model.catalog}"


def test_no_gateway_uses_global_default_catalog():
"""
Control test: when a model does NOT specify a gateway, the global
default_catalog should still be applied as before.
"""
from sqlglot import parse

expressions = parse(
"""
MODEL (
name my_schema.my_model,
kind FULL,
);
SELECT 1 AS id
""",
read="duckdb",
)

model = load_sql_based_model(
expressions,
default_catalog="example_catalog",
dialect="duckdb",
)

assert model.catalog == "example_catalog"


def test_blueprint_catalog_not_cross_contaminated():
"""
When blueprints iterate over different gateways, the catalog from one
blueprint iteration should not leak into the next. A ClickHouse blueprint
setting default_catalog to None should not prevent the following blueprint
from getting its correct catalog.
"""
from sqlglot import parse

expressions = parse(
"""
MODEL (
name @{blueprint}.my_model,
kind FULL,
gateway @{gw},
blueprints (
(blueprint := ch_schema, gw := clickhouse_gw),
(blueprint := db_schema, gw := default_gw),
),
);
SELECT 1 AS id
""",
read="duckdb",
)

default_catalog_per_gateway = {
"default_gw": "example_catalog",
}

models = load_sql_based_models(
expressions,
get_variables=lambda gw: {},
dialect="duckdb",
default_catalog_per_gateway=default_catalog_per_gateway,
default_catalog="example_catalog",
)

assert len(models) == 2

ch_model = next(m for m in models if "ch_schema" in m.fqn)
db_model = next(m for m in models if "db_schema" in m.fqn)

assert not ch_model.catalog, (
f"Catalog leaked into ClickHouse blueprint. Got: {ch_model.catalog}"
)

assert db_model.catalog == "example_catalog", (
f"Catalog lost for DuckDB blueprint after ClickHouse iteration. Got: {db_model.catalog}"
)
Loading