2929from sqlmesh .core .snapshot .definition import check_ready_intervals
3030from sqlmesh .core .snapshot .definition import (
3131 Interval ,
32+ SnapshotEvaluationTriggers ,
3233 expand_range ,
3334 parent_snapshots_by_name ,
3435)
@@ -221,6 +222,7 @@ def run(
221222 ignore_cron : bool = False ,
222223 end_bounded : bool = False ,
223224 selected_snapshots : t .Optional [t .Set [str ]] = None ,
225+ selected_snapshots_auto_upstream : t .Optional [t .Set [str ]] = None ,
224226 circuit_breaker : t .Optional [t .Callable [[], bool ]] = None ,
225227 deployability_index : t .Optional [DeployabilityIndex ] = None ,
226228 auto_restatement_enabled : bool = False ,
@@ -237,6 +239,7 @@ def run(
237239 ignore_cron = ignore_cron ,
238240 end_bounded = end_bounded ,
239241 selected_snapshots = selected_snapshots ,
242+ selected_snapshots_auto_upstream = selected_snapshots_auto_upstream ,
240243 circuit_breaker = circuit_breaker ,
241244 deployability_index = deployability_index ,
242245 auto_restatement_enabled = auto_restatement_enabled ,
@@ -469,7 +472,9 @@ def evaluate_node(node: SchedulingUnit) -> None:
469472 evaluation_duration_ms ,
470473 num_audits - num_audits_failed ,
471474 num_audits_failed ,
472- auto_restatement_triggers = auto_restatement_triggers .get (snapshot .snapshot_id ),
475+ snapshot_evaluation_triggers = snapshot_evaluation_triggers .get (
476+ snapshot .snapshot_id
477+ ),
473478 )
474479
475480 try :
@@ -580,6 +585,7 @@ def _run_or_audit(
580585 ignore_cron : bool = False ,
581586 end_bounded : bool = False ,
582587 selected_snapshots : t .Optional [t .Set [str ]] = None ,
588+ selected_snapshots_auto_upstream : t .Optional [t .Set [str ]] = None ,
583589 circuit_breaker : t .Optional [t .Callable [[], bool ]] = None ,
584590 deployability_index : t .Optional [DeployabilityIndex ] = None ,
585591 auto_restatement_enabled : bool = False ,
@@ -603,6 +609,7 @@ def _run_or_audit(
603609 end_bounded: If set to true, the evaluated intervals will be bounded by the target end date, disregarding lookback,
604610 allow_partials, and other attributes that could cause the intervals to exceed the target end date.
605611 selected_snapshots: A set of snapshot names to run. If not provided, all snapshots will be run.
612+ selected_snapshots_auto_upstream: The set of selected_snapshots that were automatically added because they're upstream of a selected snapshot.
606613 circuit_breaker: An optional handler which checks if the run should be aborted.
607614 deployability_index: Determines snapshots that are deployable in the context of this render.
608615 auto_restatement_enabled: Whether to enable auto restatements.
@@ -658,6 +665,42 @@ def _run_or_audit(
658665 if not merged_intervals :
659666 return CompletionStatus .NOTHING_TO_DO
660667
668+ merged_intervals_snapshots = {
669+ snapshot .snapshot_id : snapshot for snapshot in merged_intervals .keys ()
670+ }
671+ select_snapshot_triggers : t .Dict [SnapshotId , t .List [SnapshotId ]] = {}
672+ if selected_snapshots and selected_snapshots_auto_upstream :
673+ # actually selected snapshots are their own triggers
674+ selected_snapshots_no_auto_upstream = (
675+ selected_snapshots - selected_snapshots_auto_upstream
676+ )
677+ select_snapshot_triggers = {
678+ s_id : [s_id ]
679+ for s_id in [
680+ snapshot_id
681+ for snapshot_id in merged_intervals_snapshots
682+ if snapshot_id .name in selected_snapshots_no_auto_upstream
683+ ]
684+ }
685+
686+ # trace upstream by reversing dag of all snapshots to evaluate
687+ reversed_intervals_dag = snapshots_to_dag (merged_intervals_snapshots .values ()).reversed
688+ for s_id in reversed_intervals_dag :
689+ if s_id not in select_snapshot_triggers :
690+ triggers = []
691+ for parent_s_id in merged_intervals_snapshots [s_id ].parents :
692+ triggers .extend (select_snapshot_triggers [parent_s_id ])
693+ select_snapshot_triggers [s_id ] = list (dict .fromkeys (triggers ))
694+
695+ all_snapshot_triggers : t .Dict [SnapshotId , SnapshotEvaluationTriggers ] = {
696+ s_id : SnapshotEvaluationTriggers (
697+ ignore_cron = ignore_cron ,
698+ auto_restatement_triggers = auto_restatement_triggers .get (s_id , []),
699+ select_snapshot_triggers = select_snapshot_triggers .get (s_id , []),
700+ )
701+ for s_id in merged_intervals_snapshots
702+ if ignore_cron or s_id in auto_restatement_triggers or s_id in select_snapshot_triggers
703+ }
661704 errors , _ = self .run_merged_intervals (
662705 merged_intervals = merged_intervals ,
663706 deployability_index = deployability_index ,
@@ -668,6 +711,7 @@ def _run_or_audit(
668711 end = end ,
669712 run_environment_statements = run_environment_statements ,
670713 audit_only = audit_only ,
714+ restatements = remove_intervals ,
671715 auto_restatement_triggers = auto_restatement_triggers ,
672716 )
673717
0 commit comments