Skip to content

Commit dffdee3

Browse files
committed
Feat: Add support for the non-virtual prod mode
1 parent 13be24c commit dffdee3

7 files changed

Lines changed: 101 additions & 28 deletions

File tree

sqlmesh/core/config/common.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,35 @@ def __repr__(self) -> str:
4949
return str(self)
5050

5151

52+
class VirtualEnvironmentMode(str, Enum):
53+
"""Mode for virtual environment behavior.
54+
55+
FULL: Use full virtual environment functionality with hashed table names and virtual layer updates.
56+
DEV_ONLY: Bypass virtual environments in production, using simple table names without hashes.
57+
"""
58+
59+
FULL = "full"
60+
DEV_ONLY = "dev_only"
61+
62+
@property
63+
def is_full(self) -> bool:
64+
return self == VirtualEnvironmentMode.FULL
65+
66+
@property
67+
def is_dev_only(self) -> bool:
68+
return self == VirtualEnvironmentMode.DEV_ONLY
69+
70+
@classproperty
71+
def default(cls) -> VirtualEnvironmentMode:
72+
return VirtualEnvironmentMode.FULL
73+
74+
def __str__(self) -> str:
75+
return self.name
76+
77+
def __repr__(self) -> str:
78+
return str(self)
79+
80+
5281
class TableNamingConvention(str, Enum):
5382
# Causes table names at the physical layer to follow the convention:
5483
# <schema-name>__<table-name>__<fingerprint>

sqlmesh/core/config/root.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,11 @@
1414
from sqlmesh.cicd.config import CICDBotConfig
1515
from sqlmesh.core import constants as c
1616
from sqlmesh.core.console import get_console
17-
from sqlmesh.core.config import EnvironmentSuffixTarget, TableNamingConvention
17+
from sqlmesh.core.config.common import (
18+
EnvironmentSuffixTarget,
19+
TableNamingConvention,
20+
VirtualEnvironmentMode,
21+
)
1822
from sqlmesh.core.config.base import BaseConfig, UpdateStrategy
1923
from sqlmesh.core.config.common import variables_validator, compile_regex_mapping
2024
from sqlmesh.core.config.connection import (
@@ -110,6 +114,7 @@ class Config(BaseConfig):
110114
physical_schema_mapping: A mapping from regular expressions to names of schemas in which physical tables for corresponding models will be placed.
111115
environment_suffix_target: Indicates whether to append the environment name to the schema or table name.
112116
physical_table_naming_convention: Indicates how tables should be named at the physical layer
117+
virtual_environment_mode: Indicates how environments should be handled.
113118
gateway_managed_virtual_layer: Whether the models' views in the virtual layer are created by the model-specific gateway rather than the default gateway.
114119
infer_python_dependencies: Whether to statically analyze Python code to automatically infer Python package requirements.
115120
environment_catalog_mapping: A mapping from regular expressions to catalog names. The catalog name is used to determine the target catalog for a given environment.
@@ -151,9 +156,8 @@ class Config(BaseConfig):
151156
environment_suffix_target: EnvironmentSuffixTarget = Field(
152157
default=EnvironmentSuffixTarget.default
153158
)
154-
physical_table_naming_convention: TableNamingConvention = Field(
155-
default=TableNamingConvention.default
156-
)
159+
physical_table_naming_convention: TableNamingConvention = TableNamingConvention.default
160+
virtual_environment_mode: VirtualEnvironmentMode = VirtualEnvironmentMode.default
157161
gateway_managed_virtual_layer: bool = False
158162
infer_python_dependencies: bool = True
159163
environment_catalog_mapping: RegexKeyDict = {}

sqlmesh/core/context.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2909,6 +2909,7 @@ def _nodes_to_snapshots(self, nodes: t.Dict[str, Node]) -> t.Dict[str, Snapshot]
29092909
config = self.config_for_node(node)
29102910
kwargs["ttl"] = config.snapshot_ttl
29112911
kwargs["table_naming_convention"] = config.physical_table_naming_convention
2912+
kwargs["virtual_environment_mode"] = config.virtual_environment_mode
29122913

29132914
snapshot = Snapshot.from_node(
29142915
node,
@@ -2936,7 +2937,7 @@ def _node_or_snapshot_to_fqn(self, node_or_snapshot: NodeOrSnapshot) -> str:
29362937
def _plan_preview_enabled(self) -> bool:
29372938
if self.config.plan.enable_preview is not None:
29382939
return self.config.plan.enable_preview
2939-
# It is dangerous to enable preview by default for dbt projects that rely on engines that dont support cloning.
2940+
# It is dangerous to enable preview by default for dbt projects that rely on engines that don't support cloning.
29402941
# Enabling previews in such cases can result in unintended full refreshes because dbt incremental models rely on
29412942
# the maximum timestamp value in the target table.
29422943
return self._project_type == c.NATIVE or self.engine_adapter.SUPPORTS_CLONING

sqlmesh/core/plan/stages.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,7 @@ def build(self, plan: EvaluatablePlan) -> t.List[PlanStage]:
358358
demoted_environment_naming_info,
359359
snapshots | full_demoted_snapshots,
360360
deployability_index,
361+
plan.is_dev,
361362
)
362363
if virtual_layer_update_stage:
363364
stages.append(virtual_layer_update_stage)
@@ -437,11 +438,18 @@ def _get_virtual_layer_update_stage(
437438
demoted_environment_naming_info: t.Optional[EnvironmentNamingInfo],
438439
all_snapshots: t.Dict[SnapshotId, Snapshot],
439440
deployability_index: DeployabilityIndex,
441+
is_dev: bool,
440442
) -> t.Optional[VirtualLayerUpdateStage]:
441-
promoted_snapshots = {s for s in promoted_snapshots if s.is_model and not s.is_symbolic}
442-
demoted_snapshots = {s for s in demoted_snapshots if s.is_model and not s.is_symbolic}
443+
def _should_update_virtual_layer(snapshot: SnapshotTableInfo) -> bool:
444+
# Skip virtual layer update for snapshots with virtual environment support disabled
445+
virtual_environment_enabled = is_dev or snapshot.virtual_environment_mode.is_full
446+
return snapshot.is_model and not snapshot.is_symbolic and virtual_environment_enabled
447+
448+
promoted_snapshots = {s for s in promoted_snapshots if _should_update_virtual_layer(s)}
449+
demoted_snapshots = {s for s in demoted_snapshots if _should_update_virtual_layer(s)}
443450
if not promoted_snapshots and not demoted_snapshots:
444451
return None
452+
445453
return VirtualLayerUpdateStage(
446454
promoted_snapshots=promoted_snapshots,
447455
demoted_snapshots=demoted_snapshots,

sqlmesh/core/snapshot/definition.py

Lines changed: 47 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
from sqlglot import exp
1414
from sqlglot.optimizer.normalize_identifiers import normalize_identifiers
1515

16-
from sqlmesh.core.config import TableNamingConvention
16+
from sqlmesh.core.config.common import TableNamingConvention, VirtualEnvironmentMode
1717
from sqlmesh.core import constants as c
1818
from sqlmesh.core.audit import StandaloneAudit
1919
from sqlmesh.core.environment import EnvironmentSuffixTarget
@@ -230,6 +230,7 @@ class SnapshotDataVersion(PydanticModel, frozen=True):
230230
physical_schema_: t.Optional[str] = Field(default=None, alias="physical_schema")
231231
dev_table_suffix: str
232232
table_naming_convention: TableNamingConvention = Field(default=TableNamingConvention.default)
233+
virtual_environment_mode: VirtualEnvironmentMode = Field(default=VirtualEnvironmentMode.default)
233234

234235
def snapshot_id(self, name: str) -> SnapshotId:
235236
return SnapshotId(name=name, identifier=self.fingerprint.to_identifier())
@@ -338,6 +339,7 @@ class SnapshotInfoMixin(ModelKindMixin):
338339
dev_table_suffix: str
339340
table_naming_convention: TableNamingConvention = Field(default=TableNamingConvention.default)
340341
forward_only: bool
342+
virtual_environment_mode: VirtualEnvironmentMode
341343

342344
@cached_property
343345
def identifier(self) -> str:
@@ -443,6 +445,10 @@ def _table_name(self, version: str, is_deployable: bool) -> str:
443445
if self.is_external:
444446
return self.name
445447

448+
if is_deployable and self.virtual_environment_mode.is_dev_only:
449+
# Use the model name as is if the target is deployable and the virtual environment mode is set to dev-only
450+
return self.name
451+
446452
is_dev_table = not is_deployable
447453
if is_dev_table:
448454
version = self.dev_version
@@ -459,6 +465,7 @@ def _table_name(self, version: str, is_deployable: bool) -> str:
459465
fqt = self.fully_qualified_table.copy()
460466
fqt.set("catalog", None)
461467
base_table_name = fqt.sql()
468+
462469
return table_name(
463470
self.physical_schema,
464471
base_table_name,
@@ -499,6 +506,8 @@ class SnapshotTableInfo(PydanticModel, SnapshotInfoMixin, frozen=True):
499506
dev_table_suffix: str
500507
model_gateway: t.Optional[str] = None
501508
forward_only: bool = False
509+
table_naming_convention: TableNamingConvention = Field(default=TableNamingConvention.default)
510+
virtual_environment_mode: VirtualEnvironmentMode = Field(default=VirtualEnvironmentMode.default)
502511

503512
def __lt__(self, other: SnapshotTableInfo) -> bool:
504513
return self.name < other.name
@@ -540,6 +549,7 @@ def data_version(self) -> SnapshotDataVersion:
540549
physical_schema=self.physical_schema,
541550
dev_table_suffix=self.dev_table_suffix,
542551
table_naming_convention=self.table_naming_convention,
552+
virtual_environment_mode=self.virtual_environment_mode,
543553
)
544554

545555
@property
@@ -627,6 +637,7 @@ class Snapshot(PydanticModel, SnapshotInfoMixin):
627637
default=TableNamingConvention.default, alias="table_naming_convention"
628638
)
629639
forward_only: bool = False
640+
virtual_environment_mode: VirtualEnvironmentMode = Field(default=VirtualEnvironmentMode.default)
630641

631642
@field_validator("ttl")
632643
@classmethod
@@ -679,6 +690,7 @@ def from_node(
679690
version: t.Optional[str] = None,
680691
cache: t.Optional[t.Dict[str, SnapshotFingerprint]] = None,
681692
table_naming_convention: TableNamingConvention = TableNamingConvention.default,
693+
virtual_environment_mode: VirtualEnvironmentMode = VirtualEnvironmentMode.default,
682694
) -> Snapshot:
683695
"""Creates a new snapshot for a node.
684696
@@ -690,6 +702,7 @@ def from_node(
690702
version: The version that a snapshot is associated with. Usually set during the planning phase.
691703
cache: Cache of node name to fingerprints.
692704
table_naming_convention: Convention to follow when generating the physical table name
705+
virtual_environment_mode: Mode for handling virtual environments
693706
694707
Returns:
695708
The newly created snapshot.
@@ -722,6 +735,7 @@ def from_node(
722735
ttl=ttl,
723736
version=version,
724737
table_naming_convention=table_naming_convention,
738+
virtual_environment_mode=virtual_environment_mode,
725739
)
726740

727741
def __eq__(self, other: t.Any) -> bool:
@@ -876,16 +890,19 @@ def merge_intervals(self, other: t.Union[Snapshot, SnapshotIntervals]) -> None:
876890
Args:
877891
other: The target snapshot to inherit intervals from.
878892
"""
879-
effective_from_ts = self.normalized_effective_from_ts or 0
880-
apply_effective_from = effective_from_ts > 0 and self.identifier != other.identifier
881-
882-
for start, end in other.intervals:
883-
# If the effective_from is set, then intervals that come after it must come from
884-
# the current snapshost.
885-
if apply_effective_from and start < effective_from_ts:
886-
end = min(end, effective_from_ts)
887-
if not apply_effective_from or end <= effective_from_ts:
888-
self.add_interval(start, end)
893+
if self.is_no_rebuild or self.virtual_environment_mode.is_full or not self.is_paused:
894+
# If the virtual environment mode is not full we can only merge prod intervals if this snapshot
895+
# is currently promoted in production or if it's forward-only / metadata / indirect non-breaking.
896+
# Otherwise, we want to ignore any existing intervals and backfill this snapshot from scratch.
897+
effective_from_ts = self.normalized_effective_from_ts or 0
898+
apply_effective_from = effective_from_ts > 0 and self.identifier != other.identifier
899+
for start, end in other.intervals:
900+
# If the effective_from is set, then intervals that come after it must come from
901+
# the current snapshost.
902+
if apply_effective_from and start < effective_from_ts:
903+
end = min(end, effective_from_ts)
904+
if not apply_effective_from or end <= effective_from_ts:
905+
self.add_interval(start, end)
889906

890907
if self.dev_version == other.dev_version:
891908
# Merge dev intervals if the dev versions match which would mean
@@ -1035,7 +1052,10 @@ def categorize_as(self, category: SnapshotChangeCategory, forward_only: bool = F
10351052
SnapshotChangeCategory.INDIRECT_NON_BREAKING,
10361053
SnapshotChangeCategory.METADATA,
10371054
)
1038-
if self.is_model and self.model.physical_version:
1055+
if self.is_model and not self.virtual_environment_mode.is_full:
1056+
# Hardcode the version if the virtual environment is not fully enabled.
1057+
self.version = "novde"
1058+
elif self.is_model and self.model.physical_version:
10391059
# If the model has a pinned version then use that.
10401060
self.version = self.model.physical_version
10411061
elif is_no_rebuild and self.previous_version:
@@ -1239,6 +1259,7 @@ def table_info(self) -> SnapshotTableInfo:
12391259
model_gateway=self.model_gateway,
12401260
table_naming_convention=self.table_naming_convention, # type: ignore
12411261
forward_only=self.forward_only,
1262+
virtual_environment_mode=self.virtual_environment_mode,
12421263
)
12431264

12441265
@property
@@ -1252,6 +1273,7 @@ def data_version(self) -> SnapshotDataVersion:
12521273
physical_schema=self.physical_schema,
12531274
dev_table_suffix=self.dev_table_suffix,
12541275
table_naming_convention=self.table_naming_convention,
1276+
virtual_environment_mode=self.virtual_environment_mode,
12551277
)
12561278

12571279
@property
@@ -1535,14 +1557,20 @@ def create(
15351557
for node in dag:
15361558
if node not in snapshots:
15371559
continue
1538-
# Make sure that the node is deployable according to all its parents
1539-
this_deployable = all(
1540-
children_deployability_mapping[p_id]
1541-
for p_id in snapshots[node].parents
1542-
if p_id in children_deployability_mapping
1543-
)
1560+
snapshot = snapshots[node]
1561+
1562+
if not snapshot.virtual_environment_mode.is_full:
1563+
# If the virtual environment is not fully enabled, then the snapshot can never be deployable
1564+
this_deployable = False
1565+
else:
1566+
# Make sure that the node is deployable according to all its parents
1567+
this_deployable = all(
1568+
children_deployability_mapping[p_id]
1569+
for p_id in snapshots[node].parents
1570+
if p_id in children_deployability_mapping
1571+
)
1572+
15441573
if this_deployable:
1545-
snapshot = snapshots[node]
15461574
is_forward_only_model = (
15471575
snapshot.is_model and snapshot.model.forward_only and not snapshot.is_metadata
15481576
)

sqlmesh/core/snapshot/evaluator.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2274,8 +2274,10 @@ def _check_destructive_schema_change(
22742274
alter_expressions: t.List[exp.Alter],
22752275
allow_destructive_snapshots: t.Set[str],
22762276
) -> None:
2277-
if snapshot.needs_destructive_check(allow_destructive_snapshots) and has_drop_alteration(
2278-
alter_expressions
2277+
if (
2278+
snapshot.is_no_rebuild
2279+
and snapshot.needs_destructive_check(allow_destructive_snapshots)
2280+
and has_drop_alteration(alter_expressions)
22792281
):
22802282
snapshot_name = snapshot.name
22812283
dropped_column_names = get_dropped_column_names(alter_expressions)

tests/core/test_snapshot.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,7 @@ def test_json(snapshot: Snapshot):
167167
"parents": [{"name": '"parent"."tbl"', "identifier": snapshot.parents[0].identifier}],
168168
"previous_versions": [],
169169
"table_naming_convention": "schema_and_table",
170+
"virtual_environment_mode": "full",
170171
"updated_ts": 1663891973000,
171172
"version": snapshot.fingerprint.to_version(),
172173
"migrated": False,

0 commit comments

Comments
 (0)