Replies: 2 comments
-
|
Hi @jinhu2659! I'm Dosu and I’m helping the llama_index team. LlamaIndex does not automatically propagate cancellation from a parent workflow to nested sub-workflows. When you call Recommended approach: @step
async def parent_step(self, ev: StartEvent) -> StopEvent:
print("SupWorkflow: Starting parent step")
sub_handle = self.sub_workflow.run()
# Optionally, stream events from the sub-workflow here
result = await sub_handle
print(f"SupWorkflow: SubWorkflow result: {result}")
return StopEvent(result="Parent workflow done")Then, in your cancellation logic, make sure to cancel both handles: parent_handle = parent_workflow.run()
sub_handle = parent_workflow.sub_workflow.run()
await asyncio.sleep(3)
print("Attempting to cancel parent workflow and sub-workflow")
await parent_handle.cancel_run()
await sub_handle.cancel_run()Alternatively, you can create an async task that monitors the parent workflow's cancellation and propagates it to the sub-workflow [source]. Best practices:
Note: For more details and examples, see the discussion here and this issue. To reply, just mention @dosu. How did I do? Good | Irrelevant | Incorrect | Verbose | Hallucination | Report 🐛 | Other |
Beta Was this translation helpful? Give feedback.
-
|
LlamaIndex doesn't propagate cancellation to nested workflows automatically, but you can implement this yourself. Here are two approaches: Approach 1: Track and cancel sub-workflow handlesStore the sub-workflow handle and cancel it explicitly when the parent is cancelled: from llama_index.core.workflow import Workflow, step, StartEvent, StopEvent
import asyncio
class SubWorkflow(Workflow):
@step
async def do_work(self, ev: StartEvent) -> StopEvent:
for i in range(100):
await asyncio.sleep(1) # simulate work
print(f"Sub-workflow step {i}")
return StopEvent(result="done")
class SupWorkflow(Workflow):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.sub_workflow = SubWorkflow()
self._sub_handle = None
@step
async def run_sub(self, ev: StartEvent) -> StopEvent:
self._sub_handle = self.sub_workflow.run()
result = await self._sub_handle
return StopEvent(result=result)
async def cancel(self):
"""Cancel both parent and sub-workflow."""
if self._sub_handle:
self._sub_handle.cancel()
# cancel parent
return await super().cancel()Approach 2: Use a shared cancellation eventMore robust for deeply nested workflows: import asyncio
from contextvars import ContextVar
cancel_event: ContextVar[asyncio.Event] = ContextVar("cancel_event")
class SubWorkflow(Workflow):
@step
async def do_work(self, ev: StartEvent) -> StopEvent:
cancel = cancel_event.get()
for i in range(100):
if cancel.is_set():
print("Sub-workflow cancelled")
return StopEvent(result="cancelled")
await asyncio.sleep(1)
return StopEvent(result="done")
class SupWorkflow(Workflow):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.sub_workflow = SubWorkflow()
self._cancel_event = asyncio.Event()
@step
async def run_sub(self, ev: StartEvent) -> StopEvent:
token = cancel_event.set(self._cancel_event)
try:
handler = self.sub_workflow.run()
result = await handler
return StopEvent(result=result)
finally:
cancel_event.reset(token)
async def cancel(self):
self._cancel_event.set() # signals all sub-workflowsApproach 2 scales better when you have multiple levels of nesting. The shared Hope this helps! |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
-
Context:
I have a parent workflow (
SupWorkflow) that initializes a sub-workflow (SubWorkflow) in its__init__method. The sub-workflow is executed as part of a step in the parent workflow. When I callcancel_run()on the handle of the parent workflow, the sub-workflow continues running, which is unexpected. I would like guidance on how to properly cancel both the parent and nested workflows to ensure the sub-workflow stops execution as well.Steps to Reproduce:
SupWorkflow) that initializes a sub-workflow (SubWorkflow) in its__init__method.cancel_run()on its handle.Code Example:
Expected Behavior:
When
cancel_run()is called on the parent workflow's handle, both the parent workflow (SupWorkflow) and the nested sub-workflow (SubWorkflow) should stop execution immediately.Actual Behavior:
The parent workflow stops, but the sub-workflow continues running, as evidenced by the continued printing of
SubWorkflow: Processing ...messages after cancellation.Question:
cancel_run()on the parent workflow also cancels the nested sub-workflow?Environment:
Additional Notes:
Beta Was this translation helpful? Give feedback.
All reactions