Skip to content

Commit fbfd81a

Browse files
committed
add tests for the plan stage builder
1 parent dffdee3 commit fbfd81a

1 file changed

Lines changed: 258 additions & 0 deletions

File tree

tests/core/test_plan_stages.py

Lines changed: 258 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from pytest_mock.plugin import MockerFixture
55

66
from sqlmesh.core.config import EnvironmentSuffixTarget
7+
from sqlmesh.core.config.common import VirtualEnvironmentMode
78
from sqlmesh.core.model import SqlModel, ModelKindName
89
from sqlmesh.core.plan.definition import EvaluatablePlan
910
from sqlmesh.core.plan.stages import (
@@ -1300,3 +1301,260 @@ def test_build_plan_stages_indirect_non_breaking_view_migration(
13001301

13011302
migrate_schemas_stage = stages[4]
13021303
assert {s.snapshot_id for s in migrate_schemas_stage.snapshots} == {new_snapshot_c.snapshot_id}
1304+
1305+
1306+
def test_build_plan_stages_virtual_environment_mode_filtering(
1307+
make_snapshot, mocker: MockerFixture
1308+
) -> None:
1309+
# Create snapshots with different virtual environment modes
1310+
snapshot_full = make_snapshot(
1311+
SqlModel(
1312+
name="full_model",
1313+
query=parse_one("select 1, ds"),
1314+
kind=dict(name=ModelKindName.INCREMENTAL_BY_TIME_RANGE, time_column="ds"),
1315+
)
1316+
)
1317+
snapshot_full.virtual_environment_mode = VirtualEnvironmentMode.FULL
1318+
snapshot_full.categorize_as(SnapshotChangeCategory.BREAKING)
1319+
1320+
snapshot_dev_only = make_snapshot(
1321+
SqlModel(
1322+
name="dev_only_model",
1323+
query=parse_one("select 2, ds"),
1324+
kind=dict(name=ModelKindName.INCREMENTAL_BY_TIME_RANGE, time_column="ds"),
1325+
)
1326+
)
1327+
snapshot_dev_only.virtual_environment_mode = VirtualEnvironmentMode.DEV_ONLY
1328+
snapshot_dev_only.categorize_as(SnapshotChangeCategory.BREAKING)
1329+
1330+
# Mock state reader
1331+
state_reader = mocker.Mock(spec=StateReader)
1332+
state_reader.get_snapshots.return_value = {}
1333+
state_reader.get_environment.return_value = None
1334+
1335+
# Test 1: Dev environment - both snapshots should be included
1336+
environment_dev = Environment(
1337+
name="dev",
1338+
snapshots=[snapshot_full.table_info, snapshot_dev_only.table_info],
1339+
start_at="2023-01-01",
1340+
end_at="2023-01-02",
1341+
plan_id="test_plan",
1342+
previous_plan_id=None,
1343+
promoted_snapshot_ids=[snapshot_full.snapshot_id, snapshot_dev_only.snapshot_id],
1344+
)
1345+
1346+
plan_dev = EvaluatablePlan(
1347+
start="2023-01-01",
1348+
end="2023-01-02",
1349+
new_snapshots=[snapshot_full, snapshot_dev_only],
1350+
environment=environment_dev,
1351+
no_gaps=False,
1352+
skip_backfill=False,
1353+
empty_backfill=False,
1354+
restatements={},
1355+
is_dev=True,
1356+
allow_destructive_models=set(),
1357+
forward_only=False,
1358+
end_bounded=False,
1359+
ensure_finalized_snapshots=False,
1360+
directly_modified_snapshots=[snapshot_full.snapshot_id, snapshot_dev_only.snapshot_id],
1361+
indirectly_modified_snapshots={},
1362+
metadata_updated_snapshots=[],
1363+
removed_snapshots=[],
1364+
requires_backfill=True,
1365+
models_to_backfill=None,
1366+
execution_time="2023-01-02",
1367+
disabled_restatement_models=set(),
1368+
environment_statements=None,
1369+
user_provided_flags=None,
1370+
)
1371+
1372+
stages_dev = build_plan_stages(plan_dev, state_reader, None)
1373+
1374+
# Find VirtualLayerUpdateStage
1375+
virtual_stage_dev = next(
1376+
stage for stage in stages_dev if isinstance(stage, VirtualLayerUpdateStage)
1377+
)
1378+
1379+
# In dev environment, both snapshots should be promoted regardless of virtual_environment_mode
1380+
assert {s.name for s in virtual_stage_dev.promoted_snapshots} == {
1381+
'"full_model"',
1382+
'"dev_only_model"',
1383+
}
1384+
assert len(virtual_stage_dev.demoted_snapshots) == 0
1385+
1386+
# Test 2: Production environment - only FULL mode snapshots should be included
1387+
environment_prod = Environment(
1388+
name="prod",
1389+
snapshots=[snapshot_full.table_info, snapshot_dev_only.table_info],
1390+
start_at="2023-01-01",
1391+
end_at="2023-01-02",
1392+
plan_id="test_plan",
1393+
previous_plan_id=None,
1394+
promoted_snapshot_ids=[snapshot_full.snapshot_id, snapshot_dev_only.snapshot_id],
1395+
)
1396+
1397+
plan_prod = EvaluatablePlan(
1398+
start="2023-01-01",
1399+
end="2023-01-02",
1400+
new_snapshots=[snapshot_full, snapshot_dev_only],
1401+
environment=environment_prod,
1402+
no_gaps=False,
1403+
skip_backfill=False,
1404+
empty_backfill=False,
1405+
restatements={},
1406+
is_dev=False,
1407+
allow_destructive_models=set(),
1408+
forward_only=False,
1409+
end_bounded=False,
1410+
ensure_finalized_snapshots=False,
1411+
directly_modified_snapshots=[snapshot_full.snapshot_id, snapshot_dev_only.snapshot_id],
1412+
indirectly_modified_snapshots={},
1413+
metadata_updated_snapshots=[],
1414+
removed_snapshots=[],
1415+
requires_backfill=True,
1416+
models_to_backfill=None,
1417+
execution_time="2023-01-02",
1418+
disabled_restatement_models=set(),
1419+
environment_statements=None,
1420+
user_provided_flags=None,
1421+
)
1422+
1423+
stages_prod = build_plan_stages(plan_prod, state_reader, None)
1424+
1425+
# Find VirtualLayerUpdateStage
1426+
virtual_stage_prod = next(
1427+
stage for stage in stages_prod if isinstance(stage, VirtualLayerUpdateStage)
1428+
)
1429+
1430+
# In production environment, only FULL mode snapshots should be promoted
1431+
assert {s.name for s in virtual_stage_prod.promoted_snapshots} == {'"full_model"'}
1432+
assert len(virtual_stage_prod.demoted_snapshots) == 0
1433+
1434+
# Test 3: Production environment with demoted snapshots
1435+
existing_environment = Environment(
1436+
name="prod",
1437+
snapshots=[snapshot_full.table_info, snapshot_dev_only.table_info],
1438+
start_at="2023-01-01",
1439+
end_at="2023-01-02",
1440+
plan_id="previous_plan",
1441+
previous_plan_id=None,
1442+
promoted_snapshot_ids=[snapshot_full.snapshot_id, snapshot_dev_only.snapshot_id],
1443+
finalized_ts=to_timestamp("2023-01-02"),
1444+
)
1445+
state_reader.get_environment.return_value = existing_environment
1446+
1447+
# Remove both snapshots from the new environment
1448+
environment_prod_demote = Environment(
1449+
name="prod",
1450+
snapshots=[],
1451+
start_at="2023-01-01",
1452+
end_at="2023-01-02",
1453+
plan_id="test_plan",
1454+
previous_plan_id="previous_plan",
1455+
promoted_snapshot_ids=[],
1456+
)
1457+
1458+
plan_prod_demote = EvaluatablePlan(
1459+
start="2023-01-01",
1460+
end="2023-01-02",
1461+
new_snapshots=[],
1462+
environment=environment_prod_demote,
1463+
no_gaps=False,
1464+
skip_backfill=False,
1465+
empty_backfill=False,
1466+
restatements={},
1467+
is_dev=False,
1468+
allow_destructive_models=set(),
1469+
forward_only=False,
1470+
end_bounded=False,
1471+
ensure_finalized_snapshots=False,
1472+
directly_modified_snapshots=[],
1473+
indirectly_modified_snapshots={},
1474+
metadata_updated_snapshots=[],
1475+
removed_snapshots=[snapshot_full.snapshot_id, snapshot_dev_only.snapshot_id],
1476+
requires_backfill=False,
1477+
models_to_backfill=None,
1478+
execution_time="2023-01-02",
1479+
disabled_restatement_models=set(),
1480+
environment_statements=None,
1481+
user_provided_flags=None,
1482+
)
1483+
1484+
stages_prod_demote = build_plan_stages(plan_prod_demote, state_reader, None)
1485+
1486+
# Find VirtualLayerUpdateStage
1487+
virtual_stage_prod_demote = next(
1488+
stage for stage in stages_prod_demote if isinstance(stage, VirtualLayerUpdateStage)
1489+
)
1490+
1491+
# In production environment, only FULL mode snapshots should be demoted
1492+
assert len(virtual_stage_prod_demote.promoted_snapshots) == 0
1493+
assert {s.name for s in virtual_stage_prod_demote.demoted_snapshots} == {'"full_model"'}
1494+
assert (
1495+
virtual_stage_prod_demote.demoted_environment_naming_info
1496+
== existing_environment.naming_info
1497+
)
1498+
1499+
1500+
def test_build_plan_stages_virtual_environment_mode_no_updates(
1501+
snapshot_a: Snapshot, make_snapshot, mocker: MockerFixture
1502+
) -> None:
1503+
# Create snapshot with DEV_ONLY mode
1504+
snapshot_dev_only = make_snapshot(
1505+
SqlModel(
1506+
name="dev_only_model",
1507+
query=parse_one("select 1, ds"),
1508+
kind=dict(name=ModelKindName.INCREMENTAL_BY_TIME_RANGE, time_column="ds"),
1509+
)
1510+
)
1511+
snapshot_dev_only.virtual_environment_mode = VirtualEnvironmentMode.DEV_ONLY
1512+
snapshot_dev_only.categorize_as(SnapshotChangeCategory.BREAKING)
1513+
1514+
# Mock state reader
1515+
state_reader = mocker.Mock(spec=StateReader)
1516+
state_reader.get_snapshots.return_value = {}
1517+
state_reader.get_environment.return_value = None
1518+
1519+
# Production environment with only DEV_ONLY snapshots
1520+
environment = Environment(
1521+
name="prod",
1522+
snapshots=[snapshot_dev_only.table_info],
1523+
start_at="2023-01-01",
1524+
end_at="2023-01-02",
1525+
plan_id="test_plan",
1526+
previous_plan_id=None,
1527+
promoted_snapshot_ids=[snapshot_dev_only.snapshot_id],
1528+
)
1529+
1530+
plan = EvaluatablePlan(
1531+
start="2023-01-01",
1532+
end="2023-01-02",
1533+
new_snapshots=[snapshot_dev_only],
1534+
environment=environment,
1535+
no_gaps=False,
1536+
skip_backfill=False,
1537+
empty_backfill=False,
1538+
restatements={},
1539+
is_dev=False,
1540+
allow_destructive_models=set(),
1541+
forward_only=False,
1542+
end_bounded=False,
1543+
ensure_finalized_snapshots=False,
1544+
directly_modified_snapshots=[snapshot_dev_only.snapshot_id],
1545+
indirectly_modified_snapshots={},
1546+
metadata_updated_snapshots=[],
1547+
removed_snapshots=[],
1548+
requires_backfill=True,
1549+
models_to_backfill=None,
1550+
execution_time="2023-01-02",
1551+
disabled_restatement_models=set(),
1552+
environment_statements=None,
1553+
user_provided_flags=None,
1554+
)
1555+
1556+
stages = build_plan_stages(plan, state_reader, None)
1557+
1558+
# No VirtualLayerUpdateStage should be created since all snapshots are filtered out
1559+
virtual_stages = [stage for stage in stages if isinstance(stage, VirtualLayerUpdateStage)]
1560+
assert len(virtual_stages) == 0

0 commit comments

Comments
 (0)