Skip to content

Commit 0880264

Browse files
committed
Updated broker's interface to work with raw bytes.
Signed-off-by: Pavel Kirilin <win10@list.ru>
1 parent 1d58bd9 commit 0880264

2 files changed

Lines changed: 8 additions & 12 deletions

File tree

taskiq_nats/broker.py

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -58,23 +58,19 @@ async def kick(self, message: BrokerMessage) -> None:
5858
"""
5959
await self.client.publish(
6060
self.subject,
61-
payload=message.json().encode(),
61+
payload=message.message,
6262
headers=message.labels,
6363
)
6464

65-
async def listen(self) -> AsyncGenerator[BrokerMessage, None]:
65+
async def listen(self) -> AsyncGenerator[bytes, None]:
6666
"""
6767
Start listen to new messages.
6868
6969
:yield: incoming messages.
7070
"""
7171
subscribe = await self.client.subscribe(self.subject, queue=self.queue or "")
7272
async for message in subscribe.messages:
73-
try:
74-
yield BrokerMessage.parse_raw(message.data)
75-
except ValueError:
76-
data = message.data.decode("utf-8")
77-
logger.warning(f"Cannot parse message: {data}")
73+
yield message.data
7874

7975
async def shutdown(self) -> None:
8076
"""Close connections to NATS."""

tests/test_broker.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from taskiq_nats import NatsBroker
99

1010

11-
async def read_message(broker: NatsBroker) -> BrokerMessage: # type: ignore
11+
async def read_message(broker: NatsBroker) -> bytes: # type: ignore
1212
"""
1313
Read signle message from the broker's listen method.
1414
@@ -31,12 +31,12 @@ async def test_success_broadcast(nats_urls: List[str], nats_subject: str) -> Non
3131
sent_message = BrokerMessage(
3232
task_id=uuid.uuid4().hex,
3333
task_name="meme",
34-
message="some",
34+
message=b"some",
3535
labels={},
3636
)
3737
asyncio.create_task(broker.kick(sent_message))
3838
for received_message in await asyncio.gather(*tasks):
39-
assert received_message == sent_message
39+
assert received_message == sent_message.message
4040

4141

4242
@pytest.mark.anyio
@@ -49,8 +49,8 @@ async def test_success_queued(nats_urls: List[str], nats_subject: str) -> None:
4949
sent_message = BrokerMessage(
5050
task_id=uuid.uuid4().hex,
5151
task_name="meme",
52-
message="some",
52+
message=b"some",
5353
labels={},
5454
)
5555
asyncio.create_task(broker.kick(sent_message))
56-
assert await reading_task == sent_message
56+
assert await reading_task == sent_message.message

0 commit comments

Comments
 (0)