@@ -2124,7 +2124,7 @@ def snapshots_to_dag(snapshots: t.Collection[Snapshot]) -> DAG[SnapshotId]:
21242124
21252125def apply_auto_restatements (
21262126 snapshots : t .Dict [SnapshotId , Snapshot ], execution_time : TimeLike
2127- ) -> t .List [SnapshotIntervals ]:
2127+ ) -> t .Tuple [ t . List [SnapshotIntervals ], t . Dict [ SnapshotId , SnapshotId ] ]:
21282128 """Applies auto restatements to the snapshots.
21292129
21302130 This operation results in the removal of intervals for snapshots that are ready to be restated based
@@ -2139,6 +2139,8 @@ def apply_auto_restatements(
21392139 A list of SnapshotIntervals with **new** intervals that need to be restated.
21402140 """
21412141 dag = snapshots_to_dag (snapshots .values ())
2142+ snapshots_with_auto_restatements : t .List [SnapshotId ] = []
2143+ auto_restatement_triggers : t .Dict [SnapshotId , SnapshotId ] = {}
21422144 auto_restated_intervals_per_snapshot : t .Dict [SnapshotId , Interval ] = {}
21432145 for s_id in dag :
21442146 if s_id not in snapshots :
@@ -2162,6 +2164,23 @@ def apply_auto_restatements(
21622164 )
21632165 auto_restated_intervals .append (next_auto_restated_interval )
21642166
2167+ # auto-restated snapshot is its own trigger
2168+ snapshots_with_auto_restatements .append (s_id )
2169+ auto_restatement_triggers [s_id ] = s_id
2170+ else :
2171+ for parent_s_id in snapshot .parents :
2172+ # first auto-restated parent is the trigger
2173+ if parent_s_id in snapshots_with_auto_restatements :
2174+ auto_restatement_triggers [s_id ] = parent_s_id
2175+ break
2176+ # if no trigger yet and parent has trigger, inherit their trigger
2177+ # - will be overwritten if a different parent is auto-restated
2178+ if (
2179+ parent_s_id in auto_restatement_triggers
2180+ and s_id not in auto_restatement_triggers
2181+ ):
2182+ auto_restatement_triggers [s_id ] = auto_restatement_triggers [parent_s_id ]
2183+
21652184 if auto_restated_intervals :
21662185 auto_restated_interval_start = sys .maxsize
21672186 auto_restated_interval_end = - sys .maxsize
@@ -2191,20 +2210,22 @@ def apply_auto_restatements(
21912210
21922211 snapshot .apply_pending_restatement_intervals ()
21932212 snapshot .update_next_auto_restatement_ts (execution_time )
2194-
2195- return [
2196- SnapshotIntervals (
2197- name = snapshots [s_id ].name ,
2198- identifier = None ,
2199- version = snapshots [s_id ].version ,
2200- dev_version = None ,
2201- intervals = [],
2202- dev_intervals = [],
2203- pending_restatement_intervals = [interval ],
2204- )
2205- for s_id , interval in auto_restated_intervals_per_snapshot .items ()
2206- if s_id in snapshots
2207- ]
2213+ return (
2214+ [
2215+ SnapshotIntervals (
2216+ name = snapshots [s_id ].name ,
2217+ identifier = None ,
2218+ version = snapshots [s_id ].version ,
2219+ dev_version = None ,
2220+ intervals = [],
2221+ dev_intervals = [],
2222+ pending_restatement_intervals = [interval ],
2223+ )
2224+ for s_id , interval in auto_restated_intervals_per_snapshot .items ()
2225+ if s_id in snapshots
2226+ ],
2227+ auto_restatement_triggers ,
2228+ )
22082229
22092230
22102231def parent_snapshots_by_name (
0 commit comments