Skip to content

Commit c504145

Browse files
committed
Feat: Add support for the non-virtual prod mode
1 parent 009f4b1 commit c504145

8 files changed

Lines changed: 108 additions & 35 deletions

File tree

sqlmesh/core/config/common.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,35 @@ def __repr__(self) -> str:
4949
return str(self)
5050

5151

52+
class VirtualEnvironmentMode(str, Enum):
53+
"""Mode for virtual environment behavior.
54+
55+
FULL: Use full virtual environment functionality with hashed table names and virtual layer updates.
56+
DEV_ONLY: Bypass virtual environments in production, using simple table names without hashes.
57+
"""
58+
59+
FULL = "full"
60+
DEV_ONLY = "dev_only"
61+
62+
@property
63+
def is_full(self) -> bool:
64+
return self == VirtualEnvironmentMode.FULL
65+
66+
@property
67+
def is_dev_only(self) -> bool:
68+
return self == VirtualEnvironmentMode.DEV_ONLY
69+
70+
@classproperty
71+
def default(cls) -> VirtualEnvironmentMode:
72+
return VirtualEnvironmentMode.FULL
73+
74+
def __str__(self) -> str:
75+
return self.name
76+
77+
def __repr__(self) -> str:
78+
return str(self)
79+
80+
5281
class TableNamingConvention(str, Enum):
5382
# Causes table names at the physical layer to follow the convention:
5483
# <schema-name>__<table-name>__<fingerprint>

sqlmesh/core/config/root.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,11 @@
1414
from sqlmesh.cicd.config import CICDBotConfig
1515
from sqlmesh.core import constants as c
1616
from sqlmesh.core.console import get_console
17-
from sqlmesh.core.config import EnvironmentSuffixTarget, TableNamingConvention
17+
from sqlmesh.core.config.common import (
18+
EnvironmentSuffixTarget,
19+
TableNamingConvention,
20+
VirtualEnvironmentMode,
21+
)
1822
from sqlmesh.core.config.base import BaseConfig, UpdateStrategy
1923
from sqlmesh.core.config.common import variables_validator, compile_regex_mapping
2024
from sqlmesh.core.config.connection import (
@@ -107,6 +111,7 @@ class Config(BaseConfig):
107111
physical_schema_mapping: A mapping from regular expressions to names of schemas in which physical tables for corresponding models will be placed.
108112
environment_suffix_target: Indicates whether to append the environment name to the schema or table name.
109113
physical_table_naming_convention: Indicates how tables should be named at the physical layer
114+
virtual_environment_mode: Indicates how environments should be handled.
110115
gateway_managed_virtual_layer: Whether the models' views in the virtual layer are created by the model-specific gateway rather than the default gateway.
111116
infer_python_dependencies: Whether to statically analyze Python code to automatically infer Python package requirements.
112117
environment_catalog_mapping: A mapping from regular expressions to catalog names. The catalog name is used to determine the target catalog for a given environment.
@@ -148,9 +153,8 @@ class Config(BaseConfig):
148153
environment_suffix_target: EnvironmentSuffixTarget = Field(
149154
default=EnvironmentSuffixTarget.default
150155
)
151-
physical_table_naming_convention: TableNamingConvention = Field(
152-
default=TableNamingConvention.default
153-
)
156+
physical_table_naming_convention: TableNamingConvention = TableNamingConvention.default
157+
virtual_environment_mode: VirtualEnvironmentMode = VirtualEnvironmentMode.default
154158
gateway_managed_virtual_layer: bool = False
155159
infer_python_dependencies: bool = True
156160
environment_catalog_mapping: RegexKeyDict = {}

sqlmesh/core/context.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2909,6 +2909,7 @@ def _nodes_to_snapshots(self, nodes: t.Dict[str, Node]) -> t.Dict[str, Snapshot]
29092909
config = self.config_for_node(node)
29102910
kwargs["ttl"] = config.snapshot_ttl
29112911
kwargs["table_naming_convention"] = config.physical_table_naming_convention
2912+
kwargs["virtual_environment_mode"] = config.virtual_environment_mode
29122913

29132914
snapshot = Snapshot.from_node(
29142915
node,
@@ -2936,7 +2937,7 @@ def _node_or_snapshot_to_fqn(self, node_or_snapshot: NodeOrSnapshot) -> str:
29362937
def _plan_preview_enabled(self) -> bool:
29372938
if self.config.plan.enable_preview is not None:
29382939
return self.config.plan.enable_preview
2939-
# It is dangerous to enable preview by default for dbt projects that rely on engines that dont support cloning.
2940+
# It is dangerous to enable preview by default for dbt projects that rely on engines that don't support cloning.
29402941
# Enabling previews in such cases can result in unintended full refreshes because dbt incremental models rely on
29412942
# the maximum timestamp value in the target table.
29422943
return self._project_type == c.NATIVE or self.engine_adapter.SUPPORTS_CLONING

sqlmesh/core/plan/builder.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -811,7 +811,7 @@ def _ensure_no_forward_only_revert(self) -> None:
811811
and not candidate.model.forward_only
812812
and promoted.is_forward_only
813813
and not promoted.is_paused
814-
and not candidate.reuses_previous_version
814+
and not candidate.is_no_rebuild
815815
and promoted.version == candidate.version
816816
):
817817
raise PlanError(

sqlmesh/core/plan/stages.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -370,6 +370,7 @@ def build(self, plan: EvaluatablePlan) -> t.List[PlanStage]:
370370
demoted_environment_naming_info,
371371
snapshots | full_demoted_snapshots,
372372
deployability_index,
373+
plan.is_dev,
373374
)
374375
if virtual_layer_update_stage:
375376
stages.append(virtual_layer_update_stage)
@@ -449,11 +450,18 @@ def _get_virtual_layer_update_stage(
449450
demoted_environment_naming_info: t.Optional[EnvironmentNamingInfo],
450451
all_snapshots: t.Dict[SnapshotId, Snapshot],
451452
deployability_index: DeployabilityIndex,
453+
is_dev: bool,
452454
) -> t.Optional[VirtualLayerUpdateStage]:
453-
promoted_snapshots = {s for s in promoted_snapshots if s.is_model and not s.is_symbolic}
454-
demoted_snapshots = {s for s in demoted_snapshots if s.is_model and not s.is_symbolic}
455+
def _should_update_virtual_layer(snapshot: SnapshotTableInfo) -> bool:
456+
# Skip virtual layer update for snapshots with virtual environment support disabled
457+
virtual_environment_enabled = is_dev or snapshot.virtual_environment_mode.is_full
458+
return snapshot.is_model and not snapshot.is_symbolic and virtual_environment_enabled
459+
460+
promoted_snapshots = {s for s in promoted_snapshots if _should_update_virtual_layer(s)}
461+
demoted_snapshots = {s for s in demoted_snapshots if _should_update_virtual_layer(s)}
455462
if not promoted_snapshots and not demoted_snapshots:
456463
return None
464+
457465
return VirtualLayerUpdateStage(
458466
promoted_snapshots=promoted_snapshots,
459467
demoted_snapshots=demoted_snapshots,

sqlmesh/core/snapshot/definition.py

Lines changed: 52 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
from sqlglot import exp
1414
from sqlglot.optimizer.normalize_identifiers import normalize_identifiers
1515

16-
from sqlmesh.core.config import TableNamingConvention
16+
from sqlmesh.core.config.common import TableNamingConvention, VirtualEnvironmentMode
1717
from sqlmesh.core import constants as c
1818
from sqlmesh.core.audit import StandaloneAudit
1919
from sqlmesh.core.environment import EnvironmentSuffixTarget
@@ -229,6 +229,7 @@ class SnapshotDataVersion(PydanticModel, frozen=True):
229229
physical_schema_: t.Optional[str] = Field(default=None, alias="physical_schema")
230230
dev_table_suffix: str
231231
table_naming_convention: TableNamingConvention = Field(default=TableNamingConvention.default)
232+
virtual_environment_mode: VirtualEnvironmentMode = Field(default=VirtualEnvironmentMode.default)
232233

233234
def snapshot_id(self, name: str) -> SnapshotId:
234235
return SnapshotId(name=name, identifier=self.fingerprint.to_identifier())
@@ -335,7 +336,8 @@ class SnapshotInfoMixin(ModelKindMixin):
335336
# This can be removed from this model once Pydantic 1 support is dropped (must remain in `Snapshot` though)
336337
base_table_name_override: t.Optional[str]
337338
dev_table_suffix: str
338-
table_naming_convention: TableNamingConvention = Field(default=TableNamingConvention.default)
339+
table_naming_convention: TableNamingConvention
340+
virtual_environment_mode: VirtualEnvironmentMode
339341

340342
@cached_property
341343
def identifier(self) -> str:
@@ -394,7 +396,7 @@ def is_indirect_non_breaking(self) -> bool:
394396
return self.change_category == SnapshotChangeCategory.INDIRECT_NON_BREAKING
395397

396398
@property
397-
def reuses_previous_version(self) -> bool:
399+
def is_no_rebuild(self) -> bool:
398400
return self.change_category in (
399401
SnapshotChangeCategory.FORWARD_ONLY,
400402
SnapshotChangeCategory.METADATA,
@@ -432,6 +434,10 @@ def _table_name(self, version: str, is_deployable: bool) -> str:
432434
if self.is_external:
433435
return self.name
434436

437+
if is_deployable and self.virtual_environment_mode.is_dev_only:
438+
# Use the model name as is if the target is deployable and the virtual environment mode is set to dev-only
439+
return self.name
440+
435441
is_dev_table = not is_deployable
436442
if is_dev_table:
437443
version = self.dev_version
@@ -448,6 +454,7 @@ def _table_name(self, version: str, is_deployable: bool) -> str:
448454
fqt = self.fully_qualified_table.copy()
449455
fqt.set("catalog", None)
450456
base_table_name = fqt.sql()
457+
451458
return table_name(
452459
self.physical_schema,
453460
base_table_name,
@@ -487,6 +494,8 @@ class SnapshotTableInfo(PydanticModel, SnapshotInfoMixin, frozen=True):
487494
custom_materialization: t.Optional[str] = None
488495
dev_table_suffix: str
489496
model_gateway: t.Optional[str] = None
497+
table_naming_convention: TableNamingConvention = Field(default=TableNamingConvention.default)
498+
virtual_environment_mode: VirtualEnvironmentMode = Field(default=VirtualEnvironmentMode.default)
490499

491500
def __lt__(self, other: SnapshotTableInfo) -> bool:
492501
return self.name < other.name
@@ -528,6 +537,7 @@ def data_version(self) -> SnapshotDataVersion:
528537
physical_schema=self.physical_schema,
529538
dev_table_suffix=self.dev_table_suffix,
530539
table_naming_convention=self.table_naming_convention,
540+
virtual_environment_mode=self.virtual_environment_mode,
531541
)
532542

533543
@property
@@ -614,6 +624,7 @@ class Snapshot(PydanticModel, SnapshotInfoMixin):
614624
table_naming_convention_: TableNamingConvention = Field(
615625
default=TableNamingConvention.default, alias="table_naming_convention"
616626
)
627+
virtual_environment_mode: VirtualEnvironmentMode = Field(default=VirtualEnvironmentMode.default)
617628

618629
@field_validator("ttl")
619630
@classmethod
@@ -666,6 +677,7 @@ def from_node(
666677
version: t.Optional[str] = None,
667678
cache: t.Optional[t.Dict[str, SnapshotFingerprint]] = None,
668679
table_naming_convention: TableNamingConvention = TableNamingConvention.default,
680+
virtual_environment_mode: VirtualEnvironmentMode = VirtualEnvironmentMode.default,
669681
) -> Snapshot:
670682
"""Creates a new snapshot for a node.
671683
@@ -677,6 +689,7 @@ def from_node(
677689
version: The version that a snapshot is associated with. Usually set during the planning phase.
678690
cache: Cache of node name to fingerprints.
679691
table_naming_convention: Convention to follow when generating the physical table name
692+
virtual_environment_mode: Mode for handling virtual environments
680693
681694
Returns:
682695
The newly created snapshot.
@@ -709,6 +722,7 @@ def from_node(
709722
ttl=ttl,
710723
version=version,
711724
table_naming_convention=table_naming_convention,
725+
virtual_environment_mode=virtual_environment_mode,
712726
)
713727

714728
def __eq__(self, other: t.Any) -> bool:
@@ -863,16 +877,19 @@ def merge_intervals(self, other: t.Union[Snapshot, SnapshotIntervals]) -> None:
863877
Args:
864878
other: The target snapshot to inherit intervals from.
865879
"""
866-
effective_from_ts = self.normalized_effective_from_ts or 0
867-
apply_effective_from = effective_from_ts > 0 and self.identifier != other.identifier
868-
869-
for start, end in other.intervals:
870-
# If the effective_from is set, then intervals that come after it must come from
871-
# the current snapshost.
872-
if apply_effective_from and start < effective_from_ts:
873-
end = min(end, effective_from_ts)
874-
if not apply_effective_from or end <= effective_from_ts:
875-
self.add_interval(start, end)
880+
if self.is_no_rebuild or self.virtual_environment_mode.is_full or not self.is_paused:
881+
# If the virtual environment mode is not full we can only merge prod intervals if this snapshot
882+
# is currently promoted in production or if it's forward-only / metadata / indirect non-breaking.
883+
# Otherwise, we want to ignore any existing intervals and backfill this snapshot from scratch.
884+
effective_from_ts = self.normalized_effective_from_ts or 0
885+
apply_effective_from = effective_from_ts > 0 and self.identifier != other.identifier
886+
for start, end in other.intervals:
887+
# If the effective_from is set, then intervals that come after it must come from
888+
# the current snapshost.
889+
if apply_effective_from and start < effective_from_ts:
890+
end = min(end, effective_from_ts)
891+
if not apply_effective_from or end <= effective_from_ts:
892+
self.add_interval(start, end)
876893

877894
if self.dev_version == other.dev_version:
878895
# Merge dev intervals if the dev versions match which would mean
@@ -1013,15 +1030,18 @@ def categorize_as(self, category: SnapshotChangeCategory) -> None:
10131030
category: The change category to assign to this snapshot.
10141031
"""
10151032
self.dev_version_ = self.fingerprint.to_version()
1016-
reuse_previous_version = category in (
1033+
is_no_rebuild = category in (
10171034
SnapshotChangeCategory.FORWARD_ONLY,
10181035
SnapshotChangeCategory.INDIRECT_NON_BREAKING,
10191036
SnapshotChangeCategory.METADATA,
10201037
)
1021-
if self.is_model and self.model.physical_version:
1038+
if self.is_model and not self.virtual_environment_mode.is_full:
1039+
# Hardcode the version if the virtual environment is not fully enabled.
1040+
self.version = "novde"
1041+
elif self.is_model and self.model.physical_version:
10221042
# If the model has a pinned version then use that.
10231043
self.version = self.model.physical_version
1024-
elif reuse_previous_version and self.previous_version:
1044+
elif is_no_rebuild and self.previous_version:
10251045
previous_version = self.previous_version
10261046
self.version = previous_version.data_version.version
10271047
self.physical_schema_ = previous_version.physical_schema
@@ -1219,7 +1239,8 @@ def table_info(self) -> SnapshotTableInfo:
12191239
custom_materialization=custom_materialization,
12201240
dev_table_suffix=self.dev_table_suffix,
12211241
model_gateway=self.model_gateway,
1222-
table_naming_convention=self.table_naming_convention, # type: ignore
1242+
table_naming_convention=self.table_naming_convention,
1243+
virtual_environment_mode=self.virtual_environment_mode,
12231244
)
12241245

12251246
@property
@@ -1233,6 +1254,7 @@ def data_version(self) -> SnapshotDataVersion:
12331254
physical_schema=self.physical_schema,
12341255
dev_table_suffix=self.dev_table_suffix,
12351256
table_naming_convention=self.table_naming_convention,
1257+
virtual_environment_mode=self.virtual_environment_mode,
12361258
)
12371259

12381260
@property
@@ -1501,14 +1523,20 @@ def create(
15011523
for node in dag:
15021524
if node not in snapshots:
15031525
continue
1504-
# Make sure that the node is deployable according to all its parents
1505-
this_deployable = all(
1506-
children_deployability_mapping[p_id]
1507-
for p_id in snapshots[node].parents
1508-
if p_id in children_deployability_mapping
1509-
)
1526+
snapshot = snapshots[node]
1527+
1528+
if not snapshot.virtual_environment_mode.is_full:
1529+
# If the virtual environment is not fully enabled, then the snapshot can never be deployable
1530+
this_deployable = False
1531+
else:
1532+
# Make sure that the node is deployable according to all its parents
1533+
this_deployable = all(
1534+
children_deployability_mapping[p_id]
1535+
for p_id in snapshots[node].parents
1536+
if p_id in children_deployability_mapping
1537+
)
1538+
15101539
if this_deployable:
1511-
snapshot = snapshots[node]
15121540
is_forward_only_model = (
15131541
snapshot.is_model and snapshot.model.forward_only and not snapshot.is_metadata
15141542
)

sqlmesh/core/snapshot/evaluator.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -338,7 +338,7 @@ def create(
338338
continue
339339
deployability_flags = [True]
340340
if (
341-
snapshot.reuses_previous_version
341+
snapshot.is_no_rebuild
342342
or snapshot.is_managed
343343
or (snapshot.is_model and snapshot.model.forward_only)
344344
or (deployability_index and not deployability_index.is_deployable(snapshot))
@@ -2278,8 +2278,10 @@ def _check_destructive_schema_change(
22782278
alter_expressions: t.List[exp.Alter],
22792279
allow_destructive_snapshots: t.Set[str],
22802280
) -> None:
2281-
if snapshot.needs_destructive_check(allow_destructive_snapshots) and has_drop_alteration(
2282-
alter_expressions
2281+
if (
2282+
snapshot.is_no_rebuild
2283+
and snapshot.needs_destructive_check(allow_destructive_snapshots)
2284+
and has_drop_alteration(alter_expressions)
22832285
):
22842286
snapshot_name = snapshot.name
22852287
dropped_column_names = get_dropped_column_names(alter_expressions)

tests/core/test_snapshot.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,7 @@ def test_json(snapshot: Snapshot):
167167
"parents": [{"name": '"parent"."tbl"', "identifier": snapshot.parents[0].identifier}],
168168
"previous_versions": [],
169169
"table_naming_convention": "schema_and_table",
170+
"virtual_environment_mode": "full",
170171
"updated_ts": 1663891973000,
171172
"version": snapshot.fingerprint.to_version(),
172173
"migrated": False,

0 commit comments

Comments
 (0)