Skip to content

Commit 22f5961

Browse files
authored
Merge pull request #11 from taskiq-python/bugfix/ackable-msg
2 parents a9cff86 + 6bc5de7 commit 22f5961

5 files changed

Lines changed: 23 additions & 12 deletions

File tree

poetry.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ keywords = ["taskiq", "tasks", "distributed", "async", "nats", "result_backend"]
2121
[tool.poetry.dependencies]
2222
python = "^3.8.1"
2323
nats-py = "^2.2.0"
24-
taskiq = "^0"
24+
taskiq = ">=0.8,<1"
2525

2626

2727
[tool.poetry.group.dev.dependencies]

taskiq_nats/broker.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from nats.aio.client import Client
55
from nats.js import JetStreamContext
66
from nats.js.api import StreamConfig
7-
from taskiq import AsyncBroker, AsyncResultBackend, BrokerMessage
7+
from taskiq import AckableMessage, AsyncBroker, AsyncResultBackend, BrokerMessage
88

99
_T = TypeVar("_T") # noqa: WPS111 (Too short)
1010

@@ -140,16 +140,18 @@ async def kick(self, message: BrokerMessage) -> None:
140140
headers=message.labels,
141141
)
142142

143-
async def listen(self) -> AsyncGenerator[bytes, None]:
143+
async def listen(self) -> AsyncGenerator[AckableMessage, None]:
144144
"""
145145
Start listen to new messages.
146146
147147
:yield: incoming messages.
148148
"""
149149
subscribe = await self.js.subscribe(self.subject, queue=self.queue or "")
150150
async for message in subscribe.messages:
151-
yield message.data
152-
await message.ack()
151+
yield AckableMessage(
152+
data=message.data,
153+
ack=message.ack,
154+
)
153155

154156
async def shutdown(self) -> None:
155157
"""Close connections to NATS."""

tests/test_jstream.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from typing import List
44

55
import pytest
6-
from taskiq import BrokerMessage
6+
from taskiq import AckableMessage, BrokerMessage
77

88
from taskiq_nats import JetStreamBroker
99
from tests.utils import read_message
@@ -33,5 +33,10 @@ async def test_success(nats_urls: List[str], nats_subject: str) -> None:
3333
labels={},
3434
)
3535
await broker.kick(sent_message)
36-
assert await asyncio.wait_for(read_message(broker), 0.5) == sent_message.message
36+
ackable_msg = await asyncio.wait_for(read_message(broker), 0.5)
37+
assert isinstance(ackable_msg, AckableMessage)
38+
assert ackable_msg.data == sent_message.message
39+
ack = ackable_msg.ack()
40+
if ack is not None:
41+
await ack
3742
await broker.shutdown()

tests/utils.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,17 @@
1-
from taskiq import AsyncBroker
1+
from typing import Union
22

3+
from taskiq import AckableMessage, AsyncBroker
34

4-
async def read_message(broker: AsyncBroker) -> bytes: # type: ignore
5+
6+
async def read_message(broker: AsyncBroker) -> Union[bytes, AckableMessage]:
57
"""
68
Read signle message from the broker's listen method.
79
810
:param broker: current broker.
911
:return: firs message.
1012
"""
13+
msg: Union[bytes, AckableMessage] = b"error"
1114
async for message in broker.listen():
12-
assert isinstance(message, bytes) # noqa: S101
13-
return message
15+
msg = message
16+
break
17+
return msg

0 commit comments

Comments
 (0)