|
| 1 | +from logging import getLogger |
1 | 2 | from typing import Any, AsyncGenerator, Callable, Optional, TypeVar, Union |
2 | | -from taskiq import AsyncBroker, AsyncResultBackend, BrokerMessage |
3 | 3 |
|
4 | 4 | from nats.aio.client import Client |
| 5 | +from taskiq import AsyncBroker, AsyncResultBackend, BrokerMessage |
| 6 | + |
| 7 | +_T = TypeVar("_T") # noqa: WPS111 (Too short) |
| 8 | + |
5 | 9 |
|
6 | | -_T = TypeVar("_T") |
| 10 | +logger = getLogger("taskiq_nats") |
7 | 11 |
|
8 | 12 |
|
9 | 13 | class NatsBroker(AsyncBroker): |
10 | | - def __init__( |
| 14 | + def __init__( # noqa: WPS211 (too many args) |
11 | 15 | self, |
12 | 16 | servers: Union[str, list[str]], |
13 | 17 | subject: str = "tasiq_tasks", |
@@ -35,13 +39,12 @@ async def kick(self, message: BrokerMessage) -> None: |
35 | 39 | ) |
36 | 40 |
|
37 | 41 | async def listen(self) -> AsyncGenerator[BrokerMessage, None]: |
38 | | - subscribe = await self.client.subscribe(self.subject, queue=self.queue) |
| 42 | + subscribe = await self.client.subscribe(self.subject, queue=self.queue or "") |
39 | 43 | async for message in subscribe.messages: |
40 | 44 | try: |
41 | | - message = BrokerMessage.parse_raw(message.data) |
42 | | - yield message |
| 45 | + yield BrokerMessage.parse_raw(message.data) |
43 | 46 | except ValueError: |
44 | | - continue |
| 47 | + logger.warning(f"Cannot parse message: {message.data.decode('utf-8')}") |
45 | 48 |
|
46 | 49 | async def shutdown(self) -> None: |
47 | 50 | await self.client.close() |
0 commit comments