Skip to content

Commit 51cbdbd

Browse files
fix: time travel when going back to interrupt node (#7498)
# Fix: Create fork checkpoint on subgraph time travel ## Problem When time-traveling to a subgraph checkpoint that has an interrupt, and then resuming, the resume would load the **wrong state** — it would pick up the original execution's latest checkpoint instead of the time-traveled one. This happened because replaying from a subgraph checkpoint never created a new parent checkpoint. If the replay hit an interrupt before `after_tick()` ran, no checkpoint was written at all, so the parent's "latest" checkpoint was still the old one from the original execution. ## Fix When the loop detects a time-travel replay (not an `update_state` fork), it now **eagerly writes a fork checkpoint** at the start of the tick. This ensures: 1. The parent thread's latest checkpoint points to the replayed state 2. Subsequent `Command(resume=...)` calls find the correct checkpoint 3. Stale `INTERRUPT` pending writes from the old checkpoint are cleared (they reference old task IDs) Additionally, the subgraph replay logic now uses the **parent checkpoint ID** (from `prev_checkpoint_config`) when resolving subgraph checkpoints during time-travel, matching the existing behavior for `update_state` forks. ## Checkpoint flow diagrams ### Before fix: time travel leaves no fork ``` Original execution: C0 (start) --> C1 (step_a) --> C2 (ask_1 interrupt) --> C3 (resume) --> C4 (ask_2 interrupt) --> C5 (done) Time travel to C2 (subgraph config): Replay runs... hits interrupt... no new checkpoint written. Parent "latest" is still C5. Command(resume="new_answer"): Loads C5 (wrong!) instead of the replayed C2 state. ``` ### After fix: time travel creates a fork ``` Original execution: C0 --> C1 --> C2 --> C3 --> C4 --> C5 (done) Time travel to C2 (subgraph config): C0 --> C1 --> C2 --> C3 --> C4 --> C5 \ F1 (fork, source="fork") <-- new latest Command(resume="new_answer"): Loads F1 (correct!) --> resumes from the right state. After full resume: C0 --> C1 --> C2 --> C3 --> C4 --> C5 \ F1 --> F2 (ask_1 result) --> F3 (ask_2 interrupt) --> F4 (done) ``` ### Manual fork via `update_state` (unchanged) ``` C0 --> C1 --> C2 --> C3 \ U1 (source="update") <-- created by update_state() This path already worked. The fix skips update/fork sources so existing behavior is preserved. ``` ## Changes - **`libs/langgraph/langgraph/pregel/_loop.py`**: - Extract `is_time_traveling` flag from the existing replay detection logic for reuse - Write a fork checkpoint (`source="fork"`) eagerly at the start of a time-travel tick, before execution begins - Clear stale `INTERRUPT` pending writes when creating the fork (they reference old task IDs that won't match the new checkpoint) - Unify subgraph replay ID resolution: check `source in ("update", "fork")` instead of a separate `is_time_traveling` condition, since the new fork checkpoint now has `source="fork"` - **`libs/langgraph/tests/test_time_travel.py`** and **`test_time_travel_async.py`**: Added 4 new test cases (sync + async): - `test_replay_from_before_interrupt_then_resume` — replays from a checkpoint before an interrupt, resumes with a new answer, and verifies the full checkpoint history (source, next, values) at each stage - `test_subgraph_time_travel_resume_from_first_interrupt` — time-travels to a subgraph's first interrupt, resumes both interrupts with new answers, and verifies the fork creates a new branch while preserving the original - `test_subgraph_time_travel_resume_from_second_interrupt` — time-travels to a subgraph's second interrupt, resumes with a new answer, and verifies the first interrupt's original answer is preserved - `test_subgraph_time_travel_checkpoint_pattern` — verifies the fork checkpoint branches from the correct replay point and that the full checkpoint tree is correct after resume - **`libs/langgraph/tests/test_pregel.py`** / **`test_pregel_async.py`**: Updated existing `test_weather_subgraph_state` to account for the new fork checkpoint appearing in history (history length increases by 1)
1 parent 4d64227 commit 51cbdbd

5 files changed

Lines changed: 1171 additions & 24 deletions

File tree

libs/langgraph/langgraph/pregel/_loop.py

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -692,7 +692,7 @@ def _first(
692692
# writes so that interrupt() calls re-fire instead of returning
693693
# stale values. But if we're actively resuming, keep them —
694694
# multi-interrupt scenarios need previously resolved values preserved.
695-
if self.is_replaying and (
695+
is_time_traveling = self.is_replaying and (
696696
# Time-travel to a subgraph checkpoint: the parent sets
697697
# RESUMING=True (it can't distinguish time-travel from resume),
698698
# so we check if this subgraph's own ns is in checkpoint_map.
@@ -710,7 +710,8 @@ def _first(
710710
# (subgraph input is a Send arg, not a Command)
711711
or configurable.get(CONFIG_KEY_RESUMING, False)
712712
)
713-
):
713+
)
714+
if is_time_traveling:
714715
self.checkpoint_pending_writes = [
715716
w for w in self.checkpoint_pending_writes if w[1] != RESUME
716717
]
@@ -765,6 +766,26 @@ def _first(
765766
if k in self.checkpoint["channel_versions"]:
766767
version = self.checkpoint["channel_versions"][k]
767768
self.checkpoint["versions_seen"][INTERRUPT][k] = version
769+
# When time-traveling (replaying from a specific checkpoint),
770+
# save a fork checkpoint so the replayed execution creates a
771+
# new branch. Without this, if the execution hits an interrupt
772+
# before after_tick() runs, no new checkpoint is created —
773+
# the parent's latest checkpoint remains the old one and
774+
# subsequent resumes load the wrong state.
775+
# Skip for update_state forks (source=update/fork) since they
776+
# already have their own fork checkpoint.
777+
if is_time_traveling and self.checkpoint_metadata.get("source") not in (
778+
"update",
779+
"fork",
780+
):
781+
# Clear old INTERRUPT writes from the loaded checkpoint.
782+
# The fork will have a new checkpoint_id which changes
783+
# task IDs — stale interrupt writes would accumulate and
784+
# confuse the multiple-interrupt check in future resumes.
785+
self.checkpoint_pending_writes = [
786+
w for w in self.checkpoint_pending_writes if w[1] != INTERRUPT
787+
]
788+
self._put_checkpoint({"source": "fork"})
768789
# produce values output
769790
self._emit(
770791
"values", map_output_values, self.output_keys, True, self.channels
@@ -807,14 +828,18 @@ def _first(
807828
if not self.is_nested:
808829
# Pass the resolved before-bound checkpoint ID so subgraphs can
809830
# find their corresponding checkpoint without re-fetching the
810-
# parent. For forks (source=update), use the fork's parent
831+
# parent. For forks (source=update/fork), use the fork's parent
811832
# checkpoint ID since the fork was created after the subgraph's
812833
# checkpoints from the original execution.
813834
replay_state: ReplayState | None = None
814835
if self.is_replaying:
815836
replay_checkpoint_id = self.checkpoint["id"]
816837
if (
817-
self.checkpoint_metadata.get("source") == "update"
838+
self.checkpoint_metadata.get("source")
839+
in (
840+
"update",
841+
"fork",
842+
)
818843
and self.prev_checkpoint_config
819844
):
820845
replay_checkpoint_id = self.prev_checkpoint_config[CONF].get(

libs/langgraph/tests/test_pregel.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -615,16 +615,19 @@ def _edge(st: MyState) -> Literal["__end__", "node_one", "node_two"]:
615615
)
616616
]
617617

618-
assert len(new_history) == len(history) + 1
619-
for original, new in zip(history, new_history[1:]):
618+
# +2: one fork checkpoint from time travel, one from the new execution
619+
assert len(new_history) == len(history) + 2
620+
# new_history[0] is the new execution result, new_history[1] is the fork
621+
assert new_history[1].metadata["source"] == "fork"
622+
for original, new in zip(history, new_history[2:]):
620623
assert original.values == new.values
621624
assert original.next == new.next
622625
assert original.metadata["step"] == new.metadata["step"]
623626

624627
def _get_tasks(hist: list, start: int):
625628
return [h.tasks for h in hist[start:]]
626629

627-
assert _get_tasks(new_history, 1) == _get_tasks(history, 0)
630+
assert _get_tasks(new_history, 2) == _get_tasks(history, 0)
628631

629632

630633
def test_batch_two_processes_in_out() -> None:

libs/langgraph/tests/test_pregel_async.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2086,16 +2086,19 @@ def _edge(st: MyState) -> Literal["__end__", "node_one", "node_two"]:
20862086
)
20872087
]
20882088

2089-
assert len(new_history) == len(history) + 1
2090-
for original, new in zip(history, new_history[1:]):
2089+
# +2: one fork checkpoint from time travel, one from the new execution
2090+
assert len(new_history) == len(history) + 2
2091+
# new_history[0] is the new execution result, new_history[1] is the fork
2092+
assert new_history[1].metadata["source"] == "fork"
2093+
for original, new in zip(history, new_history[2:]):
20912094
assert original.values == new.values
20922095
assert original.next == new.next
20932096
assert original.metadata["step"] == new.metadata["step"]
20942097

20952098
def _get_tasks(hist: list, start: int):
20962099
return [h.tasks for h in hist[start:]]
20972100

2098-
assert _get_tasks(new_history, 1) == _get_tasks(history, 0)
2101+
assert _get_tasks(new_history, 2) == _get_tasks(history, 0)
20992102

21002103

21012104
async def test_cond_edge_after_send() -> None:

0 commit comments

Comments
 (0)