Skip to content

Commit 47141d7

Browse files
committed
Print auto-restated trigger of model evaluation in debug console
1 parent e4ea4c8 commit 47141d7

5 files changed

Lines changed: 55 additions & 19 deletions

File tree

sqlmesh/core/console.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -428,6 +428,7 @@ def update_snapshot_evaluation_progress(
428428
num_audits_passed: int,
429429
num_audits_failed: int,
430430
audit_only: bool = False,
431+
auto_restatement_trigger: t.Optional[SnapshotId] = None,
431432
) -> None:
432433
"""Updates the snapshot evaluation progress."""
433434

@@ -575,6 +576,7 @@ def update_snapshot_evaluation_progress(
575576
num_audits_passed: int,
576577
num_audits_failed: int,
577578
audit_only: bool = False,
579+
auto_restatement_trigger: t.Optional[SnapshotId] = None,
578580
) -> None:
579581
pass
580582

@@ -1056,6 +1058,7 @@ def update_snapshot_evaluation_progress(
10561058
num_audits_passed: int,
10571059
num_audits_failed: int,
10581060
audit_only: bool = False,
1061+
auto_restatement_trigger: t.Optional[SnapshotId] = None,
10591062
) -> None:
10601063
"""Update the snapshot evaluation progress."""
10611064
if (
@@ -3639,6 +3642,7 @@ def update_snapshot_evaluation_progress(
36393642
num_audits_passed: int,
36403643
num_audits_failed: int,
36413644
audit_only: bool = False,
3645+
auto_restatement_trigger: t.Optional[SnapshotId] = None,
36423646
) -> None:
36433647
view_name, loaded_batches = self.evaluation_batch_progress[snapshot.snapshot_id]
36443648

@@ -3808,9 +3812,13 @@ def update_snapshot_evaluation_progress(
38083812
num_audits_passed: int,
38093813
num_audits_failed: int,
38103814
audit_only: bool = False,
3815+
auto_restatement_trigger: t.Optional[SnapshotId] = None,
38113816
) -> None:
38123817
message = f"Evaluating {snapshot.name} | batch={batch_idx} | duration={duration_ms}ms | num_audits_passed={num_audits_passed} | num_audits_failed={num_audits_failed}"
38133818

3819+
if auto_restatement_trigger:
3820+
message += f" | evaluation_triggered_by={auto_restatement_trigger.name}"
3821+
38143822
if audit_only:
38153823
message = f"Auditing {snapshot.name} duration={duration_ms}ms | num_audits_passed={num_audits_passed} | num_audits_failed={num_audits_failed}"
38163824

sqlmesh/core/scheduler.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -371,6 +371,7 @@ def run_merged_intervals(
371371
end: t.Optional[TimeLike] = None,
372372
run_environment_statements: bool = False,
373373
audit_only: bool = False,
374+
auto_restatement_triggers: t.Dict[SnapshotId, SnapshotId] = {},
374375
) -> t.Tuple[t.List[NodeExecutionFailedError[SchedulingUnit]], t.List[SchedulingUnit]]:
375376
"""Runs precomputed batches of missing intervals.
376377
@@ -468,6 +469,7 @@ def evaluate_node(node: SchedulingUnit) -> None:
468469
evaluation_duration_ms,
469470
num_audits - num_audits_failed,
470471
num_audits_failed,
472+
auto_restatement_trigger=auto_restatement_triggers.get(snapshot.snapshot_id),
471473
)
472474

473475
try:
@@ -631,8 +633,11 @@ def _run_or_audit(
631633
for s_id, interval in (remove_intervals or {}).items():
632634
self.snapshots[s_id].remove_interval(interval)
633635

636+
auto_restatement_triggers: t.Dict[SnapshotId, SnapshotId] = {}
634637
if auto_restatement_enabled:
635-
auto_restated_intervals = apply_auto_restatements(self.snapshots, execution_time)
638+
auto_restated_intervals, auto_restatement_triggers = apply_auto_restatements(
639+
self.snapshots, execution_time
640+
)
636641
self.state_sync.add_snapshots_intervals(auto_restated_intervals)
637642
self.state_sync.update_auto_restatements(
638643
{s.name_version: s.next_auto_restatement_ts for s in self.snapshots.values()}
@@ -663,6 +668,7 @@ def _run_or_audit(
663668
end=end,
664669
run_environment_statements=run_environment_statements,
665670
audit_only=audit_only,
671+
auto_restatement_triggers=auto_restatement_triggers,
666672
)
667673

668674
return CompletionStatus.FAILURE if errors else CompletionStatus.SUCCESS

sqlmesh/core/snapshot/definition.py

Lines changed: 36 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2177,7 +2177,7 @@ def snapshots_to_dag(snapshots: t.Collection[Snapshot]) -> DAG[SnapshotId]:
21772177

21782178
def apply_auto_restatements(
21792179
snapshots: t.Dict[SnapshotId, Snapshot], execution_time: TimeLike
2180-
) -> t.List[SnapshotIntervals]:
2180+
) -> t.Tuple[t.List[SnapshotIntervals], t.Dict[SnapshotId, SnapshotId]]:
21812181
"""Applies auto restatements to the snapshots.
21822182
21832183
This operation results in the removal of intervals for snapshots that are ready to be restated based
@@ -2192,6 +2192,8 @@ def apply_auto_restatements(
21922192
A list of SnapshotIntervals with **new** intervals that need to be restated.
21932193
"""
21942194
dag = snapshots_to_dag(snapshots.values())
2195+
snapshots_with_auto_restatements: t.List[SnapshotId] = []
2196+
auto_restatement_triggers: t.Dict[SnapshotId, SnapshotId] = {}
21952197
auto_restated_intervals_per_snapshot: t.Dict[SnapshotId, Interval] = {}
21962198
for s_id in dag:
21972199
if s_id not in snapshots:
@@ -2215,6 +2217,23 @@ def apply_auto_restatements(
22152217
)
22162218
auto_restated_intervals.append(next_auto_restated_interval)
22172219

2220+
# auto-restated snapshot is its own trigger
2221+
snapshots_with_auto_restatements.append(s_id)
2222+
auto_restatement_triggers[s_id] = s_id
2223+
else:
2224+
for parent_s_id in snapshot.parents:
2225+
# first auto-restated parent is the trigger
2226+
if parent_s_id in snapshots_with_auto_restatements:
2227+
auto_restatement_triggers[s_id] = parent_s_id
2228+
break
2229+
# if no trigger yet and parent has trigger, inherit their trigger
2230+
# - will be overwritten if a different parent is auto-restated
2231+
if (
2232+
parent_s_id in auto_restatement_triggers
2233+
and s_id not in auto_restatement_triggers
2234+
):
2235+
auto_restatement_triggers[s_id] = auto_restatement_triggers[parent_s_id]
2236+
22182237
if auto_restated_intervals:
22192238
auto_restated_interval_start = sys.maxsize
22202239
auto_restated_interval_end = -sys.maxsize
@@ -2244,20 +2263,22 @@ def apply_auto_restatements(
22442263

22452264
snapshot.apply_pending_restatement_intervals()
22462265
snapshot.update_next_auto_restatement_ts(execution_time)
2247-
2248-
return [
2249-
SnapshotIntervals(
2250-
name=snapshots[s_id].name,
2251-
identifier=None,
2252-
version=snapshots[s_id].version,
2253-
dev_version=None,
2254-
intervals=[],
2255-
dev_intervals=[],
2256-
pending_restatement_intervals=[interval],
2257-
)
2258-
for s_id, interval in auto_restated_intervals_per_snapshot.items()
2259-
if s_id in snapshots
2260-
]
2266+
return (
2267+
[
2268+
SnapshotIntervals(
2269+
name=snapshots[s_id].name,
2270+
identifier=None,
2271+
version=snapshots[s_id].version,
2272+
dev_version=None,
2273+
intervals=[],
2274+
dev_intervals=[],
2275+
pending_restatement_intervals=[interval],
2276+
)
2277+
for s_id, interval in auto_restated_intervals_per_snapshot.items()
2278+
if s_id in snapshots
2279+
],
2280+
auto_restatement_triggers,
2281+
)
22612282

22622283

22632284
def parent_snapshots_by_name(

tests/core/test_snapshot.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3101,7 +3101,7 @@ def test_apply_auto_restatements(make_snapshot):
31013101
(to_timestamp("2020-01-01"), to_timestamp("2020-01-06")),
31023102
]
31033103

3104-
restated_intervals = apply_auto_restatements(
3104+
restated_intervals, _ = apply_auto_restatements(
31053105
{
31063106
snapshot_a.snapshot_id: snapshot_a,
31073107
snapshot_b.snapshot_id: snapshot_b,
@@ -3238,7 +3238,7 @@ def test_apply_auto_restatements_disable_restatement_downstream(make_snapshot):
32383238
snapshot_b.add_interval("2020-01-01", "2020-01-05")
32393239
assert snapshot_a.snapshot_id in snapshot_b.parents
32403240

3241-
restated_intervals = apply_auto_restatements(
3241+
restated_intervals, _ = apply_auto_restatements(
32423242
{
32433243
snapshot_a.snapshot_id: snapshot_a,
32443244
snapshot_b.snapshot_id: snapshot_b,

web/server/console.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from sqlmesh.core.console import TerminalConsole
1010
from sqlmesh.core.environment import EnvironmentNamingInfo
1111
from sqlmesh.core.plan.definition import EvaluatablePlan
12-
from sqlmesh.core.snapshot import Snapshot, SnapshotInfoLike, SnapshotTableInfo
12+
from sqlmesh.core.snapshot import Snapshot, SnapshotInfoLike, SnapshotTableInfo, SnapshotId
1313
from sqlmesh.core.test import ModelTest
1414
from sqlmesh.core.test.result import ModelTextTestResult
1515
from sqlmesh.utils.date import now_timestamp
@@ -142,6 +142,7 @@ def update_snapshot_evaluation_progress(
142142
num_audits_passed: int,
143143
num_audits_failed: int,
144144
audit_only: bool = False,
145+
auto_restatement_trigger: t.Optional[SnapshotId] = None,
145146
) -> None:
146147
if audit_only:
147148
return

0 commit comments

Comments
 (0)