@@ -2192,7 +2192,7 @@ def snapshots_to_dag(snapshots: t.Collection[Snapshot]) -> DAG[SnapshotId]:
21922192
21932193def apply_auto_restatements (
21942194 snapshots : t .Dict [SnapshotId , Snapshot ], execution_time : TimeLike
2195- ) -> t .List [SnapshotIntervals ]:
2195+ ) -> t .Tuple [ t . List [SnapshotIntervals ], t . Dict [ SnapshotId , SnapshotId ] ]:
21962196 """Applies auto restatements to the snapshots.
21972197
21982198 This operation results in the removal of intervals for snapshots that are ready to be restated based
@@ -2207,6 +2207,8 @@ def apply_auto_restatements(
22072207 A list of SnapshotIntervals with **new** intervals that need to be restated.
22082208 """
22092209 dag = snapshots_to_dag (snapshots .values ())
2210+ snapshots_with_auto_restatements : t .List [SnapshotId ] = []
2211+ auto_restatement_triggers : t .Dict [SnapshotId , SnapshotId ] = {}
22102212 auto_restated_intervals_per_snapshot : t .Dict [SnapshotId , Interval ] = {}
22112213 for s_id in dag :
22122214 if s_id not in snapshots :
@@ -2230,6 +2232,23 @@ def apply_auto_restatements(
22302232 )
22312233 auto_restated_intervals .append (next_auto_restated_interval )
22322234
2235+ # auto-restated snapshot is its own trigger
2236+ snapshots_with_auto_restatements .append (s_id )
2237+ auto_restatement_triggers [s_id ] = s_id
2238+ else :
2239+ for parent_s_id in snapshot .parents :
2240+ # first auto-restated parent is the trigger
2241+ if parent_s_id in snapshots_with_auto_restatements :
2242+ auto_restatement_triggers [s_id ] = parent_s_id
2243+ break
2244+ # if no trigger yet and parent has trigger, inherit their trigger
2245+ # - will be overwritten if a different parent is auto-restated
2246+ if (
2247+ parent_s_id in auto_restatement_triggers
2248+ and s_id not in auto_restatement_triggers
2249+ ):
2250+ auto_restatement_triggers [s_id ] = auto_restatement_triggers [parent_s_id ]
2251+
22332252 if auto_restated_intervals :
22342253 auto_restated_interval_start = sys .maxsize
22352254 auto_restated_interval_end = - sys .maxsize
@@ -2259,20 +2278,22 @@ def apply_auto_restatements(
22592278
22602279 snapshot .apply_pending_restatement_intervals ()
22612280 snapshot .update_next_auto_restatement_ts (execution_time )
2262-
2263- return [
2264- SnapshotIntervals (
2265- name = snapshots [s_id ].name ,
2266- identifier = None ,
2267- version = snapshots [s_id ].version ,
2268- dev_version = None ,
2269- intervals = [],
2270- dev_intervals = [],
2271- pending_restatement_intervals = [interval ],
2272- )
2273- for s_id , interval in auto_restated_intervals_per_snapshot .items ()
2274- if s_id in snapshots
2275- ]
2281+ return (
2282+ [
2283+ SnapshotIntervals (
2284+ name = snapshots [s_id ].name ,
2285+ identifier = None ,
2286+ version = snapshots [s_id ].version ,
2287+ dev_version = None ,
2288+ intervals = [],
2289+ dev_intervals = [],
2290+ pending_restatement_intervals = [interval ],
2291+ )
2292+ for s_id , interval in auto_restated_intervals_per_snapshot .items ()
2293+ if s_id in snapshots
2294+ ],
2295+ auto_restatement_triggers ,
2296+ )
22762297
22772298
22782299def parent_snapshots_by_name (
0 commit comments