Skip to content

Commit 2d23216

Browse files
committed
fix(clickhouse): warn at plan time when virtual catalog triggers re-materialization
When a catalog-aware gateway is added to an existing ClickHouse-only project, ClickHouse model FQNs change from 2-level to 3-level, causing SQLMesh to treat them as brand-new snapshots. This triggers full re-materialization — including historical backfills for incremental models — with no user-visible signal before apply. Add _warn_if_virtual_catalog_rematerialization() called from plan() between build() and console.plan(). It detects when new 3-level CH snapshots map to existing 2-level names in the current environment and emits a console warning listing the affected models with a plain-language explanation of the cost (FULL = recreate once, IBTR = full backfill). Signed-off-by: Michael Day <michael.day@cloudkitchens.com> Signed-off-by: mday-io <mdaytn@gmail.com>
1 parent 78da023 commit 2d23216

2 files changed

Lines changed: 188 additions & 0 deletions

File tree

sqlmesh/core/context.py

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1445,6 +1445,8 @@ def plan(
14451445

14461446
plan = plan_builder.build()
14471447

1448+
self._warn_if_virtual_catalog_rematerialization(plan)
1449+
14481450
if no_auto_categorization or plan.uncategorized:
14491451
# Prompts are required if the auto categorization is disabled
14501452
# or if there are any uncategorized snapshots in the plan
@@ -2743,6 +2745,61 @@ def _run_plan_tests(self, skip_tests: bool = False) -> t.Optional[ModelTextTestR
27432745
return result
27442746
return None
27452747

2748+
def _warn_if_virtual_catalog_rematerialization(self, plan: "Plan") -> None:
2749+
"""Warn when ClickHouse models appear as new snapshots solely because a virtual catalog
2750+
prefix was added to their FQNs after a catalog-aware gateway joined the project.
2751+
2752+
This situation causes every previously-applied ClickHouse model to be treated as brand-new
2753+
by SQLMesh, triggering full re-materialization and historical backfills. Emitting a warning
2754+
before the plan is displayed gives users a chance to understand the cost before applying.
2755+
"""
2756+
from sqlglot import exp
2757+
2758+
# Collect the set of old 2-level snapshot names from the current environment so we can
2759+
# detect which new 3-level names are renames rather than genuinely new models.
2760+
old_names: t.Set[str] = set()
2761+
for s_id in plan.context_diff.removed_snapshots:
2762+
old_names.add(s_id.name)
2763+
for name in plan.context_diff.snapshots_by_name:
2764+
old_names.add(name)
2765+
2766+
affected: t.List[t.Tuple[str, str]] = [] # (new_3level_name, old_2level_name)
2767+
2768+
for gateway, adapter in self.engine_adapters.items():
2769+
if not adapter.supports_virtual_catalog() or not adapter._default_catalog:
2770+
continue
2771+
virtual_catalog = adapter._default_catalog
2772+
2773+
for snapshot in plan.new_snapshots:
2774+
table = exp.to_table(snapshot.name)
2775+
if table.catalog != virtual_catalog:
2776+
continue
2777+
# Reconstruct the 2-level name that would have been used before injection.
2778+
old_name = f"{table.db}.{table.name}"
2779+
if old_name in old_names:
2780+
affected.append((snapshot.name, old_name))
2781+
2782+
if not affected:
2783+
return
2784+
2785+
max_display = 10
2786+
model_lines = "\n".join(
2787+
f" - {new_name} (was: {old_name})" for new_name, old_name in affected[:max_display]
2788+
)
2789+
if len(affected) > max_display:
2790+
model_lines += f"\n ... and {len(affected) - max_display} more"
2791+
2792+
self.console.log_warning(
2793+
"ClickHouse models are being re-materialized due to virtual catalog FQN change.\n\n"
2794+
"The following ClickHouse models appear as new because their fully-qualified\n"
2795+
"names changed from 2-level (db.table) to 3-level (__gateway__.db.table):\n\n"
2796+
f"{model_lines}\n\n"
2797+
"FULL models will be recreated once. INCREMENTAL_BY_TIME_RANGE models will\n"
2798+
"require a full historical backfill from their configured start date.\n\n"
2799+
"This is a one-time cost when first adding a catalog-aware gateway to an\n"
2800+
"existing ClickHouse project. To proceed, run `sqlmesh apply`."
2801+
)
2802+
27462803
@property
27472804
def _model_tables(self) -> t.Dict[str, str]:
27482805
"""Mapping of model name to physical table name.

tests/core/test_context.py

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -677,6 +677,137 @@ def _capture_create_schema(
677677
)
678678

679679

680+
@pytest.mark.fast
681+
def test_warn_if_virtual_catalog_rematerialization_emits_warning(mocker):
682+
"""_warn_if_virtual_catalog_rematerialization must emit a log_warning when new snapshots have
683+
3-level FQNs that map to existing 2-level FQNs in the current environment, indicating that the
684+
virtual catalog prefix was added to previously-applied ClickHouse models."""
685+
from unittest.mock import MagicMock
686+
687+
from sqlmesh.core.engine_adapter.clickhouse import ClickhouseEngineAdapter
688+
from sqlmesh.core.snapshot.definition import SnapshotId
689+
690+
# Build a minimal Context with no models.
691+
ctx = Context(config=Config())
692+
693+
# Create a ClickHouse adapter with a virtual catalog already injected.
694+
ch_adapter = ClickhouseEngineAdapter(
695+
lambda *a, **k: mocker.NonCallableMock(),
696+
dialect="clickhouse",
697+
)
698+
ch_adapter._default_catalog = "__ch_gw__"
699+
700+
# Override engine_adapters so the context sees our prepared adapter.
701+
mocker.patch.object(
702+
type(ctx), "engine_adapters", new_callable=PropertyMock, return_value={"ch_gw": ch_adapter}
703+
)
704+
705+
# Build a mock snapshot with a 3-level name that has the virtual catalog prefix.
706+
new_snapshot = MagicMock()
707+
new_snapshot.name = "__ch_gw__.mydb.my_table"
708+
709+
# The old 2-level name must appear in snapshots_by_name so we detect the rename.
710+
old_snapshot_id = SnapshotId(name="mydb.my_table", identifier="abc123")
711+
712+
context_diff = MagicMock()
713+
context_diff.new_snapshots = {new_snapshot.name: new_snapshot}
714+
context_diff.removed_snapshots = {}
715+
context_diff.snapshots_by_name = {"mydb.my_table": MagicMock()}
716+
717+
plan = MagicMock()
718+
plan.new_snapshots = [new_snapshot]
719+
plan.context_diff = context_diff
720+
721+
warning_mock = mocker.patch.object(ctx.console, "log_warning")
722+
723+
ctx._warn_if_virtual_catalog_rematerialization(plan)
724+
725+
warning_mock.assert_called_once()
726+
warning_text = warning_mock.call_args[0][0]
727+
assert "__ch_gw__" in warning_text
728+
assert "mydb.my_table" in warning_text
729+
730+
731+
@pytest.mark.fast
732+
def test_warn_if_virtual_catalog_rematerialization_no_warning_when_genuinely_new(mocker):
733+
"""_warn_if_virtual_catalog_rematerialization must NOT warn when there is no matching old
734+
2-level name — i.e. the model is a brand-new model, not a renamed existing one."""
735+
from unittest.mock import MagicMock
736+
737+
from sqlmesh.core.engine_adapter.clickhouse import ClickhouseEngineAdapter
738+
739+
ctx = Context(config=Config())
740+
741+
ch_adapter = ClickhouseEngineAdapter(
742+
lambda *a, **k: mocker.NonCallableMock(),
743+
dialect="clickhouse",
744+
)
745+
ch_adapter._default_catalog = "__ch_gw__"
746+
747+
mocker.patch.object(
748+
type(ctx), "engine_adapters", new_callable=PropertyMock, return_value={"ch_gw": ch_adapter}
749+
)
750+
751+
new_snapshot = MagicMock()
752+
new_snapshot.name = "__ch_gw__.mydb.brand_new_table"
753+
754+
context_diff = MagicMock()
755+
context_diff.new_snapshots = {new_snapshot.name: new_snapshot}
756+
context_diff.removed_snapshots = {}
757+
# No matching old name.
758+
context_diff.snapshots_by_name = {}
759+
760+
plan = MagicMock()
761+
plan.new_snapshots = [new_snapshot]
762+
plan.context_diff = context_diff
763+
764+
warning_mock = mocker.patch.object(ctx.console, "log_warning")
765+
766+
ctx._warn_if_virtual_catalog_rematerialization(plan)
767+
768+
warning_mock.assert_not_called()
769+
770+
771+
@pytest.mark.fast
772+
def test_warn_if_virtual_catalog_rematerialization_no_warning_without_virtual_catalog(mocker):
773+
"""_warn_if_virtual_catalog_rematerialization must NOT warn when the ClickHouse adapter has no
774+
virtual catalog injected (i.e. _default_catalog is None)."""
775+
from unittest.mock import MagicMock
776+
777+
from sqlmesh.core.engine_adapter.clickhouse import ClickhouseEngineAdapter
778+
779+
ctx = Context(config=Config())
780+
781+
ch_adapter = ClickhouseEngineAdapter(
782+
lambda *a, **k: mocker.NonCallableMock(),
783+
dialect="clickhouse",
784+
)
785+
# No virtual catalog injected — adapter stays at 2-level mode.
786+
assert ch_adapter._default_catalog is None
787+
788+
mocker.patch.object(
789+
type(ctx), "engine_adapters", new_callable=PropertyMock, return_value={"ch_gw": ch_adapter}
790+
)
791+
792+
new_snapshot = MagicMock()
793+
new_snapshot.name = "mydb.my_table"
794+
795+
context_diff = MagicMock()
796+
context_diff.new_snapshots = {new_snapshot.name: new_snapshot}
797+
context_diff.removed_snapshots = {}
798+
context_diff.snapshots_by_name = {}
799+
800+
plan = MagicMock()
801+
plan.new_snapshots = [new_snapshot]
802+
plan.context_diff = context_diff
803+
804+
warning_mock = mocker.patch.object(ctx.console, "log_warning")
805+
806+
ctx._warn_if_virtual_catalog_rematerialization(plan)
807+
808+
warning_mock.assert_not_called()
809+
810+
680811
def test_plan_execution_time():
681812
context = Context(config=Config())
682813
context.upsert_model(

0 commit comments

Comments
 (0)