forked from livekit/agents
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpush_to_talk.py
More file actions
93 lines (71 loc) · 3.26 KB
/
push_to_talk.py
File metadata and controls
93 lines (71 loc) · 3.26 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
import asyncio
import logging
from dotenv import load_dotenv
from livekit import rtc
from livekit.agents import Agent, AgentServer, AgentSession, JobContext, JobRequest, cli, inference
from livekit.agents.llm import ChatContext, ChatMessage, StopResponse
logger = logging.getLogger("push-to-talk")
logger.setLevel(logging.INFO)
load_dotenv()
## This example demonstrates how to use the push-to-talk for multi-participant
## conversations with a voice agent
## It disables audio input by default, and only enables it when the client explicitly
## triggers the `start_turn` RPC method
class MyAgent(Agent):
def __init__(self) -> None:
super().__init__(
instructions="You are a helpful assistant.",
stt=inference.STT("deepgram/nova-3"),
llm=inference.LLM("google/gemini-2.5-flash"),
tts=inference.TTS("cartesia/sonic-3"),
)
async def on_user_turn_completed(self, turn_ctx: ChatContext, new_message: ChatMessage) -> None:
# callback before generating a reply after user turn committed
if not new_message.text_content:
# for example, raise StopResponse to stop the agent from generating a reply
logger.info("ignore empty user turn")
raise StopResponse()
async def handle_request(request: JobRequest) -> None:
await request.accept(
identity="ptt-agent",
# this attribute communicates to frontend that we support PTT
attributes={"push-to-talk": "1"},
)
server = AgentServer()
@server.rtc_session(on_request=handle_request)
async def entrypoint(ctx: JobContext):
session = AgentSession(turn_detection="manual")
agent = MyAgent()
await session.start(agent=agent, room=ctx.room)
# disable input audio at the start
session.input.set_audio_enabled(False)
@ctx.room.local_participant.register_rpc_method("start_turn")
async def start_turn(data: rtc.RpcInvocationData):
session.interrupt()
session.clear_user_turn()
# listen to the caller if multi-user
session.room_io.set_participant(data.caller_identity)
session.input.set_audio_enabled(True)
@ctx.room.local_participant.register_rpc_method("end_turn")
async def end_turn(data: rtc.RpcInvocationData):
session.input.set_audio_enabled(False)
try:
user_transcript = await session.commit_user_turn(
# the timeout for the final transcript to be received after committing the user turn
# increase this value if the STT is slow to respond
transcript_timeout=5.0,
# the duration of the silence to be appended to the STT to make it generate the final transcript
stt_flush_duration=2.0,
)
logger.info(f"user transcript: {user_transcript}")
except asyncio.CancelledError:
logger.info("commit user turn cancelled")
except Exception as e:
logger.error("error committing user turn", exc_info=e)
@ctx.room.local_participant.register_rpc_method("cancel_turn")
async def cancel_turn(data: rtc.RpcInvocationData):
session.input.set_audio_enabled(False)
session.clear_user_turn()
logger.info("cancel turn")
if __name__ == "__main__":
cli.run_app(server)