1+ import asyncio
2+ import logging
3+
4+ from aiortc .rtcdatachannel import RTCDataChannel
5+ from aiortc .sdp import candidate_from_sdp
6+ from aiortc import RTCPeerConnection , RTCSessionDescription
7+
8+ from janus_client import (
9+ JanusSession ,
10+ JanusPlugin ,
11+ )
12+ from janus_client .message_transaction import is_subset
13+
14+ logger = logging .getLogger (__name__ )
15+
16+
17+ class JanusTextRoomPlugin (JanusPlugin ):
18+ """Janus TextRoom plugin implementation"""
19+
20+ name = "janus.plugin.textroom"
21+ data_channel : RTCDataChannel
22+
23+ async def on_receive (self , response : dict ):
24+ print (f"Received message: { response } " )
25+
26+ if "janus" not in response :
27+ print ("Unexpected response" )
28+
29+ if is_subset (response , {"janus" : "trickle" , "candidate" : None }):
30+ candidate_data = response ["candidate" ]
31+
32+ if is_subset (candidate_data , {"completed" : True }):
33+ # await self._pc.addIceCandidate()
34+ print ("Trickle done" )
35+ return
36+
37+ if not is_subset (
38+ candidate_data , {"sdpMLineIndex" : None , "candidate" : None }
39+ ):
40+ raise Exception ("Invalid candidate data" )
41+
42+ print (self ._pc .iceConnectionState )
43+ print (self ._pc .iceGatheringState )
44+
45+ iceCandidate = candidate_from_sdp (
46+ response ["candidate" ]["candidate" ].split (":" , 1 )[1 ]
47+ )
48+ iceCandidate .sdpMid = str (response ["candidate" ]["sdpMLineIndex" ])
49+ print (iceCandidate )
50+
51+ await self ._pc .addIceCandidate (iceCandidate )
52+
53+ async def send_wrapper (self , message : dict , matcher : dict ) -> dict :
54+ def function_matcher (response : dict ):
55+ return (
56+ is_subset (
57+ response ,
58+ {
59+ "janus" : "success" ,
60+ "plugindata" : {
61+ "plugin" : self .name ,
62+ "data" : matcher ,
63+ },
64+ },
65+ )
66+ or is_subset (
67+ response ,
68+ {
69+ "janus" : "success" ,
70+ "plugindata" : {
71+ "plugin" : self .name ,
72+ "data" : {
73+ "textroom" : "event" ,
74+ },
75+ },
76+ },
77+ )
78+ or is_subset (response , {"janus" : "error" , "error" : {}})
79+ )
80+
81+ message_transaction = await self .send (
82+ message = {
83+ "janus" : "message" ,
84+ "body" : message ,
85+ },
86+ )
87+ message_response = await message_transaction .get (
88+ matcher = function_matcher , timeout = 15
89+ )
90+ await message_transaction .done ()
91+
92+ if is_subset (message_response , {"janus" : "error" , "error" : {}}):
93+ raise Exception (f"Janus error: { message_response } " )
94+
95+ async def list (
96+ self ,
97+ ) -> dict :
98+ """List available rooms."""
99+
100+ return await self .send_wrapper (
101+ message = {
102+ "request" : "list" ,
103+ },
104+ matcher = {
105+ "textroom" : "success" ,
106+ "list" : [],
107+ },
108+ )
109+
110+ async def get_participants_list (self , room : int ):
111+ """List participants in a specific room"""
112+
113+ return await self .send_wrapper (
114+ message = {
115+ "request" : "listparticipants" ,
116+ "room" : room ,
117+ },
118+ matcher = {
119+ "room" : room ,
120+ "participants" : [],
121+ },
122+ )
123+
124+ async def join_room (self , room : int ):
125+ return await self .send_wrapper (
126+ message = {
127+ "request" : "list" ,
128+ "textroom" : "join" ,
129+ "username" : "test_username" ,
130+ "room" : room ,
131+ },
132+ matcher = {
133+ "textroom" : "success" ,
134+ "participants" : [],
135+ },
136+ )
137+
138+ async def message (self , room : int , text : str , ack : bool = True ):
139+ return await self .send_wrapper (
140+ message = {
141+ "request" : "list" ,
142+ "textroom" : "message" ,
143+ "room" : room ,
144+ "text" : text ,
145+ "ack" : ack ,
146+ },
147+ matcher = {
148+ "textroom" : "success" ,
149+ },
150+ )
151+
152+ async def leave (self , room : int ):
153+ return await self .send_wrapper (
154+ message = {
155+ "request" : "list" ,
156+ "textroom" : "leave" ,
157+ "room" : room ,
158+ },
159+ matcher = {
160+ "textroom" : "success" ,
161+ },
162+ )
163+
164+ async def announcement (self , room : int , text : str ) -> dict :
165+ return await self .send_wrapper (
166+ message = {
167+ "request" : "list" ,
168+ "textroom" : "announcement" ,
169+ "room" : room ,
170+ "secret" : "adminpwd" ,
171+ "text" : text ,
172+ },
173+ matcher = {
174+ "textroom" : "success" ,
175+ },
176+ )
177+
178+ async def setup (self ) -> dict :
179+ def function_matcher (response : dict ):
180+ return is_subset (
181+ response ,
182+ {
183+ "janus" : "event" ,
184+ "plugindata" : {
185+ "plugin" : self .name ,
186+ "data" : {
187+ "textroom" : "event" ,
188+ "result" : "ok" ,
189+ },
190+ },
191+ },
192+ ) or is_subset (response , {"janus" : "error" , "error" : {}})
193+
194+ message_transaction = await self .send (
195+ message = {
196+ "janus" : "message" ,
197+ "body" : {
198+ "request" : "setup" ,
199+ },
200+ },
201+ )
202+ message_response = await message_transaction .get (
203+ matcher = function_matcher , timeout = 15
204+ )
205+ await message_transaction .done ()
206+
207+ if is_subset (message_response , {"janus" : "error" , "error" : {}}):
208+ raise Exception (f"Janus error: { message_response } " )
209+
210+ print ()
211+ print ("setup_response" )
212+ print (message_response )
213+ print ()
214+
215+ # We will get jsep offer from Janus after we call setup. Not sure if
216+ # I should reuse PC, but I think I shouldn't
217+ self ._pc = RTCPeerConnection ()
218+
219+ @self ._pc .on ("datachannel" )
220+ def on_datachannel (channel ):
221+ print (channel , "-" , "created by remote party" )
222+ self .data_channel = channel
223+
224+ @channel .on ("message" )
225+ def on_message (message ):
226+ print (channel , "<" , message )
227+
228+ if isinstance (message , str ) and message .startswith ("ping" ):
229+ # reply
230+ print (channel , "pong" + message [4 :])
231+
232+ await self ._pc .setRemoteDescription (
233+ RTCSessionDescription (
234+ sdp = message_response ["jsep" ]["sdp" ],
235+ type = message_response ["jsep" ]["type" ],
236+ )
237+ )
238+ # self.data_channel = self._pc.createDataChannel("JanusDataChannel")
239+ print (self ._pc .signalingState )
240+ print (self ._pc .connectionState )
241+ print (self ._pc .iceConnectionState )
242+ print (self ._pc .iceGatheringState )
243+
244+ print ("--- Wait for trickle ---" )
245+ await asyncio .sleep (5 )
246+ # for candidate in self.ice_candidates:
247+ # print(candidate)
248+ # await self._pc.addIceCandidate(candidate=candidate)
249+
250+ await self ._pc .setLocalDescription (await self ._pc .createAnswer ())
251+ print (self ._pc .signalingState )
252+ print (self ._pc .connectionState )
253+ print (self ._pc .iceConnectionState )
254+ print (self ._pc .iceGatheringState )
255+
256+ message_transaction = await self .send (
257+ message = {
258+ "janus" : "message" ,
259+ "body" : {
260+ "request" : "ack" ,
261+ },
262+ "jsep" : {
263+ "sdp" : self ._pc .localDescription .sdp ,
264+ "trickle" : True ,
265+ "type" : self ._pc .localDescription .type ,
266+ },
267+ },
268+ )
269+ message_response = await message_transaction .get (
270+ matcher = function_matcher , timeout = 15
271+ )
272+ await message_transaction .done ()
273+
274+ return message_response
275+
276+
277+ async def main ():
278+ session = JanusSession (
279+ base_url = "ws://janus.lmkprofile.com:8188" ,
280+ api_secret = "janusrocks" ,
281+ )
282+
283+ plugin_textroom = JanusTextRoomPlugin ()
284+
285+ await plugin_textroom .attach (session = session ),
286+
287+ response = await plugin_textroom .list ()
288+
289+ response = await plugin_textroom .get_participants_list (1234 )
290+
291+ response = await plugin_textroom .setup ()
292+
293+ response = await plugin_textroom .join_room (1234 )
294+
295+ print ("--- Wait for awhile ---" )
296+ print (plugin_textroom ._pc .signalingState )
297+ print (plugin_textroom ._pc .connectionState )
298+ print (plugin_textroom ._pc .iceConnectionState )
299+ print (plugin_textroom ._pc .iceGatheringState )
300+ await asyncio .sleep (30 )
301+
302+ response = await plugin_textroom .message (1234 , "test msg" )
303+
304+ response = await plugin_textroom .leave (1234 )
305+
306+ response = await plugin_textroom .announcement (1234 , "test announcement" )
307+
308+ print (response )
309+ print ("--- Everything done ---" )
310+
311+ await plugin_textroom .destroy ()
312+
313+ await session .destroy ()
314+
315+
316+ asyncio .run (main ())
0 commit comments