1111
1212
1313class NatsBroker (AsyncBroker ):
14+ """
15+ NATS broker for taskiq.
16+
17+ By default this broker works
18+ broadcasting message to all connected workers.
19+
20+ If you want to make it work as queue,
21+ you need to supply name of the queue in
22+ queue argument.
23+
24+ Docs about queue:
25+ https://docs.nats.io/nats-concepts/core-nats/queue
26+ """
27+
1428 def __init__ ( # noqa: WPS211 (too many args)
1529 self ,
1630 servers : Union [str , list [str ]],
@@ -28,23 +42,40 @@ def __init__( # noqa: WPS211 (too many args)
2842 self .subject = subject
2943
3044 async def startup (self ) -> None :
45+ """
46+ Startup event handler.
47+
48+ It simply connects to NATS cluster.
49+ """
3150 await super ().startup ()
3251 await self .client .connect (self .servers , ** self .connection_kwargs )
3352
3453 async def kick (self , message : BrokerMessage ) -> None :
54+ """
55+ Send a message using NATS.
56+
57+ :param message: message to send.
58+ """
3559 await self .client .publish (
3660 self .subject ,
3761 payload = message .json ().encode (),
3862 headers = message .labels ,
3963 )
4064
4165 async def listen (self ) -> AsyncGenerator [BrokerMessage , None ]:
66+ """
67+ Start listen to new messages.
68+
69+ :yield: incoming messages.
70+ """
4271 subscribe = await self .client .subscribe (self .subject , queue = self .queue or "" )
4372 async for message in subscribe .messages :
4473 try :
4574 yield BrokerMessage .parse_raw (message .data )
4675 except ValueError :
47- logger .warning (f"Cannot parse message: { message .data .decode ('utf-8' )} " )
76+ data = message .data .decode ("utf-8" )
77+ logger .warning (f"Cannot parse message: { data } " )
4878
4979 async def shutdown (self ) -> None :
80+ """Close connections to NATS."""
5081 await self .client .close ()
0 commit comments