Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions examples/voice_agents/async_tool_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@
AgentSession,
JobContext,
RunContext,
ToolExecutionUpdatedEvent,
cli,
inference,
llm,
)
from livekit.agents.utils import aio
from livekit.plugins import silero

logger = logging.getLogger("async-travel-helper")
Expand Down Expand Up @@ -119,6 +121,9 @@ async def book_flight(self, ctx: RunContext, origin: str, destination: str, date
)
self._user_email = email.email_address
logger.info(f"User's email address: {self._user_email}")
await ctx.update(
"Thanks for providing your email address, we are confirming the booking now."
)

await asyncio.sleep(40)

Expand Down Expand Up @@ -213,6 +218,8 @@ async def _summarize(

@server.rtc_session()
async def entrypoint(ctx: JobContext):
await ctx.connect()

session = AgentSession(
stt=inference.STT("deepgram/nova-3"),
# llm=inference.LLM("openai/gpt-5.3-chat-latest"),
Expand All @@ -223,6 +230,25 @@ async def entrypoint(ctx: JobContext):
turn_handling={"interruption": {"mode": "vad"}},
)

# stream tool execution (calls, ctx.update progress, reply lifecycle) to the frontend
status_ch = aio.Chan[ToolExecutionUpdatedEvent]()

@session.on("tool_execution_updated")
def _on_tool_status(ev: ToolExecutionUpdatedEvent) -> None:
status_ch.send_nowait(ev)

async def _publish_status() -> None:
async for ev in status_ch:
await ctx.room.local_participant.publish_data(ev.model_dump_json(), topic="tool_status")

publish_task = asyncio.create_task(_publish_status())

async def _close_status() -> None:
status_ch.close()
await aio.cancel_and_wait(publish_task)

ctx.add_shutdown_callback(_close_status)

await session.start(agent=TravelAgent(), room=ctx.room)


Expand Down
10 changes: 10 additions & 0 deletions livekit-agents/livekit/agents/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@
RunContext,
SessionUsageUpdatedEvent,
SpeechCreatedEvent,
ToolCallEnded,
ToolCallStarted,
ToolCallUpdated,
ToolExecutionUpdatedEvent,
ToolReplyUpdated,
UserInputTranscribedEvent,
UserStateChangedEvent,
UserTurnExceededEvent,
Expand Down Expand Up @@ -181,6 +186,11 @@ def __getattr__(name: str) -> typing.Any:
"UserInputTranscribedEvent",
"UserStateChangedEvent",
"SpeechCreatedEvent",
"ToolExecutionUpdatedEvent",
"ToolCallStarted",
"ToolCallUpdated",
"ToolCallEnded",
"ToolReplyUpdated",
"MetricsCollectedEvent",
"SessionUsageUpdatedEvent",
"FunctionToolsExecutedEvent",
Expand Down
10 changes: 10 additions & 0 deletions livekit-agents/livekit/agents/voice/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@
RunContext,
SessionUsageUpdatedEvent,
SpeechCreatedEvent,
ToolCallEnded,
ToolCallStarted,
ToolCallUpdated,
ToolExecutionUpdatedEvent,
ToolReplyUpdated,
UserInputTranscribedEvent,
UserStateChangedEvent,
UserTurnExceededEvent,
Expand Down Expand Up @@ -50,6 +55,11 @@
"FunctionToolsExecutedEvent",
"AgentFalseInterruptionEvent",
"RemoteSession",
"ToolExecutionUpdatedEvent",
"ToolCallStarted",
"ToolCallUpdated",
"ToolCallEnded",
"ToolReplyUpdated",
"UserTurnExceededEvent",
"TranscriptSynchronizer",
"io",
Expand Down
72 changes: 71 additions & 1 deletion livekit-agents/livekit/agents/voice/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,9 @@ async def update(
for s in self._filler_schedulers:
s.reset_dwell()

# events carry the raw message, before the LLM-facing template wraps it
raw_message = message if isinstance(message, str) else str(message)

if isinstance(message, str):
if template is None:
if self._executor is not None:
Expand Down Expand Up @@ -213,7 +216,18 @@ async def update(
self._updates.append(pair)

if self._executor is None:
return # standalone — nothing else to do
return # standalone — no executor, so no tool lifecycle to report

self._session.emit(
"tool_execution_updated",
ToolExecutionUpdatedEvent(
update=ToolCallUpdated(
id=pair[0].call_id,
call_id=self.function_call.call_id,
message=raw_message,
)
),
)

assert self._first_update_fut is not None
if not self._first_update_fut.done():
Expand Down Expand Up @@ -288,6 +302,7 @@ def _make_update_pair(
"metrics_collected",
"session_usage_updated",
"speech_created",
"tool_execution_updated",
"error",
"close",
"debug_message",
Expand Down Expand Up @@ -420,6 +435,60 @@ class SpeechCreatedEvent(BaseModel):
created_at: float = Field(default_factory=time.time)


class ToolCallStarted(BaseModel):
"""A function tool call was dispatched."""

type: Literal["tool_call_started"] = "tool_call_started"
function_call: FunctionCall


class ToolCallUpdated(BaseModel):
"""A progress update emitted via ``ctx.update()`` while a tool call runs."""

type: Literal["tool_call_updated"] = "tool_call_updated"
id: str
"""Entry id: ``call_id`` inline, ``{call_id}_update_N`` when deferred."""
call_id: str
message: str


class ToolCallEnded(BaseModel):
"""A tool call's single terminal entry."""

type: Literal["tool_call_ended"] = "tool_call_ended"
id: str
"""Entry id: ``call_id`` inline, ``{call_id}_final`` when deferred."""
call_id: str
message: str | None = None
"""Result or error text; None when there is nothing to voice."""
status: Literal["done", "error", "cancelled"]


class ToolReplyUpdated(BaseModel):
"""Lifecycle of the deferred reply that voices buffered tool updates: ``scheduled``
when queued, then ``completed`` / ``interrupted`` / ``skipped``. One reply may cover
several calls; an inline first update never gets one."""

type: Literal["tool_reply_updated"] = "tool_reply_updated"
update_ids: list[str]
"""``ToolCallUpdated.id`` values this reply covers."""
status: Literal["scheduled", "completed", "interrupted", "skipped"]
speech_id: str
"""Id of the reply speech; ``speech_created`` carries its handle."""


class ToolExecutionUpdatedEvent(BaseModel):
"""One flat tool-lifecycle update. Discriminate on ``update.type``: ``tool_call_started``
→ ``tool_call_updated`` → ``tool_call_ended`` → ``tool_reply_updated``."""

type: Literal["tool_execution_updated"] = "tool_execution_updated"
update: Annotated[
ToolCallStarted | ToolCallUpdated | ToolCallEnded | ToolReplyUpdated,
Field(discriminator="type"),
]
created_at: float = Field(default_factory=time.time)


class UserTurnExceededEvent(BaseModel):
type: Literal["user_turn_exceeded"] = "user_turn_exceeded"
transcript: str
Expand Down Expand Up @@ -484,6 +553,7 @@ class CloseEvent(BaseModel):
| ConversationItemAddedEvent
| FunctionToolsExecutedEvent
| SpeechCreatedEvent
| ToolExecutionUpdatedEvent
| ErrorEvent
| CloseEvent
| OverlappingSpeechEvent,
Expand Down
69 changes: 69 additions & 0 deletions livekit-agents/livekit/agents/voice/remote_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@
ErrorEvent,
FunctionToolsExecutedEvent,
SessionUsageUpdatedEvent,
ToolCallEnded,
ToolCallStarted,
ToolCallUpdated,
ToolExecutionUpdatedEvent,
ToolReplyUpdated,
UserInputTranscribedEvent,
UserState,
UserStateChangedEvent,
Expand Down Expand Up @@ -249,6 +254,19 @@ async def __anext__(self) -> agent_pb.AgentSessionMessage:
"e2e_latency",
)

_TOOL_CALL_STATUS_MAP: dict[str, agent_pb.ToolCallStatus] = {
"done": agent_pb.TC_DONE,
"error": agent_pb.TC_ERROR,
"cancelled": agent_pb.TC_CANCELLED,
}

_TOOL_REPLY_STATUS_MAP: dict[str, agent_pb.ToolReplyStatus] = {
"scheduled": agent_pb.TR_SCHEDULED,
"completed": agent_pb.TR_COMPLETED,
"interrupted": agent_pb.TR_INTERRUPTED,
"skipped": agent_pb.TR_SKIPPED,
}

_AMD_CATEGORY_MAP: dict[AMDCategory, agent_pb.AmdCategory] = {
AMDCategory.HUMAN: agent_pb.AmdCategory.AMD_HUMAN,
AMDCategory.MACHINE_IVR: agent_pb.AmdCategory.AMD_MACHINE_IVR,
Expand Down Expand Up @@ -370,6 +388,7 @@ def register_session(self, session: AgentSession) -> None:
session.on("conversation_item_added", self._on_conversation_item_added)
session.on("user_input_transcribed", self._on_user_input_transcribed)
session.on("function_tools_executed", self._on_function_tools_executed)
session.on("tool_execution_updated", self._on_tool_execution_updated)
session.on("session_usage_updated", self._on_session_usage_updated)
session.on("overlapping_speech", self._on_overlapping_speech)
session.on("error", self._on_error)
Expand All @@ -394,6 +413,7 @@ async def aclose(self) -> None:
self._session.off("conversation_item_added", self._on_conversation_item_added)
self._session.off("user_input_transcribed", self._on_user_input_transcribed)
self._session.off("function_tools_executed", self._on_function_tools_executed)
self._session.off("tool_execution_updated", self._on_tool_execution_updated)
self._session.off("session_usage_updated", self._on_session_usage_updated)
self._session.off("overlapping_speech", self._on_overlapping_speech)
self._session.off("error", self._on_error)
Expand Down Expand Up @@ -513,6 +533,54 @@ def _on_function_tools_executed(self, event: FunctionToolsExecutedEvent) -> None
)
)

def _on_tool_execution_updated(self, event: ToolExecutionUpdatedEvent) -> None:
pb = agent_pb.AgentSessionEvent.ToolExecutionUpdated
updated: agent_pb.AgentSessionEvent.ToolExecutionUpdated
if isinstance(event.update, ToolCallStarted):
fc = event.update.function_call
updated = pb(
started=pb.Started(
function_call=agent_pb.FunctionCall(
id=fc.id,
call_id=fc.call_id,
name=fc.name,
arguments=fc.arguments,
)
)
)
elif isinstance(event.update, ToolCallUpdated):
updated = pb(
call_updated=pb.CallUpdated(
id=event.update.id,
call_id=event.update.call_id,
message=event.update.message,
)
)
elif isinstance(event.update, ToolCallEnded):
ended = pb.Ended(
id=event.update.id,
call_id=event.update.call_id,
status=_TOOL_CALL_STATUS_MAP[event.update.status],
)
if event.update.message is not None:
ended.message = event.update.message
updated = pb(ended=ended)
elif isinstance(event.update, ToolReplyUpdated):
updated = pb(
reply_updated=pb.ReplyUpdated(
update_ids=event.update.update_ids,
status=_TOOL_REPLY_STATUS_MAP[event.update.status],
speech_id=event.update.speech_id,
)
)
else:
return

self._send_event(
agent_pb.AgentSessionEvent(tool_execution_updated=updated),
created_at=event.created_at,
)

def _on_overlapping_speech(self, event: OverlappingSpeechEvent) -> None:
detected_at = Timestamp()
detected_at.FromNanoseconds(int(event.detected_at * 1e9))
Expand Down Expand Up @@ -903,6 +971,7 @@ def _session_usage_to_proto(usage: AgentSessionUsage) -> agent_pb.AgentSessionUs
"conversation_item_added",
"user_input_transcribed",
"function_tools_executed",
"tool_execution_updated",
"session_usage_updated",
"error",
]
Expand Down
Loading