Skip to content

Commit fce53b9

Browse files
committed
Added ackable message, instead of manual ack.
Signed-off-by: Pavel Kirilin <win10@list.ru>
1 parent a9cff86 commit fce53b9

5 files changed

Lines changed: 21 additions & 12 deletions

File tree

poetry.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ keywords = ["taskiq", "tasks", "distributed", "async", "nats", "result_backend"]
2121
[tool.poetry.dependencies]
2222
python = "^3.8.1"
2323
nats-py = "^2.2.0"
24-
taskiq = "^0"
24+
taskiq = ">=0.8,<1"
2525

2626

2727
[tool.poetry.group.dev.dependencies]

taskiq_nats/broker.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from nats.aio.client import Client
55
from nats.js import JetStreamContext
66
from nats.js.api import StreamConfig
7-
from taskiq import AsyncBroker, AsyncResultBackend, BrokerMessage
7+
from taskiq import AckableMessage, AsyncBroker, AsyncResultBackend, BrokerMessage
88

99
_T = TypeVar("_T") # noqa: WPS111 (Too short)
1010

@@ -140,16 +140,18 @@ async def kick(self, message: BrokerMessage) -> None:
140140
headers=message.labels,
141141
)
142142

143-
async def listen(self) -> AsyncGenerator[bytes, None]:
143+
async def listen(self) -> AsyncGenerator[AckableMessage, None]:
144144
"""
145145
Start listen to new messages.
146146
147147
:yield: incoming messages.
148148
"""
149149
subscribe = await self.js.subscribe(self.subject, queue=self.queue or "")
150150
async for message in subscribe.messages:
151-
yield message.data
152-
await message.ack()
151+
yield AckableMessage(
152+
data=message.data,
153+
ack=message.ack,
154+
)
153155

154156
async def shutdown(self) -> None:
155157
"""Close connections to NATS."""

tests/test_jstream.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from typing import List
44

55
import pytest
6-
from taskiq import BrokerMessage
6+
from taskiq import AckableMessage, BrokerMessage
77

88
from taskiq_nats import JetStreamBroker
99
from tests.utils import read_message
@@ -33,5 +33,10 @@ async def test_success(nats_urls: List[str], nats_subject: str) -> None:
3333
labels={},
3434
)
3535
await broker.kick(sent_message)
36-
assert await asyncio.wait_for(read_message(broker), 0.5) == sent_message.message
36+
ackable_msg = await asyncio.wait_for(read_message(broker), 0.5)
37+
assert isinstance(ackable_msg, AckableMessage)
38+
assert ackable_msg.data == sent_message.message
39+
ack = ackable_msg.ack()
40+
if ack is not None:
41+
await ack
3742
await broker.shutdown()

tests/utils.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
1-
from taskiq import AsyncBroker
1+
from taskiq import AckableMessage, AsyncBroker
22

33

4-
async def read_message(broker: AsyncBroker) -> bytes: # type: ignore
4+
async def read_message(broker: AsyncBroker) -> bytes | AckableMessage:
55
"""
66
Read signle message from the broker's listen method.
77
88
:param broker: current broker.
99
:return: firs message.
1010
"""
11+
msg: bytes | AckableMessage = b"error"
1112
async for message in broker.listen():
12-
assert isinstance(message, bytes) # noqa: S101
13-
return message
13+
msg = message
14+
break
15+
return msg

0 commit comments

Comments
 (0)