Skip to content

Commit 393efcd

Browse files
committed
Fixed tests, and jetstream publish method.
1 parent 6f88324 commit 393efcd

11 files changed

Lines changed: 131 additions & 92 deletions

File tree

python/natsrpy/_natsrpy_rs/js/__init__.pyi

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,46 @@
11
from datetime import datetime, timedelta
2-
from typing import Any
2+
from typing import Any, Literal, overload
33

44
from .managers import KVManager, ObjectStoreManager, StreamsManager
55

6+
class Publication:
7+
stream: str
8+
sequence: int
9+
domain: str
10+
duplicate: bool
11+
value: str | None
12+
613
class JetStream:
14+
@overload
15+
async def publish(
16+
self,
17+
subject: str,
18+
payload: str | bytes | bytearray | memoryview,
19+
*,
20+
headers: dict[str, str] | None = None,
21+
err_on_disconnect: bool = False,
22+
wait: Literal[True],
23+
) -> Publication: ...
24+
@overload
725
async def publish(
826
self,
927
subject: str,
1028
payload: str | bytes | bytearray | memoryview,
1129
*,
1230
headers: dict[str, str] | None = None,
13-
reply: str | None = None,
1431
err_on_disconnect: bool = False,
32+
wait: Literal[False] = False,
1533
) -> None: ...
34+
@overload
35+
async def publish(
36+
self,
37+
subject: str,
38+
payload: str | bytes | bytearray | memoryview,
39+
*,
40+
headers: dict[str, str] | None = None,
41+
err_on_disconnect: bool = False,
42+
wait: bool = False,
43+
) -> Publication | None: ...
1644
@property
1745
def kv(self) -> KVManager: ...
1846
@property

python/natsrpy/_natsrpy_rs/js/stream.pyi

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -228,15 +228,22 @@ class StreamInfo:
228228
sources: list[SourceInfo]
229229

230230
class Stream:
231-
async def direct_get(self, sequence: int) -> StreamMessage:
231+
async def direct_get(
232+
self,
233+
sequence: int,
234+
timeout: float | datetime | None = None,
235+
) -> StreamMessage:
232236
"""
233237
Get direct message from the stream.
234238
235239
:param sequence: sequence number of the message to get.
236240
:return: Message.
237241
"""
238242

239-
async def get_info(self) -> StreamInfo:
243+
async def get_info(
244+
self,
245+
timeout: float | datetime | None = None,
246+
) -> StreamInfo:
240247
"""
241248
Get information about the stream.
242249
@@ -248,6 +255,7 @@ class Stream:
248255
filter: str | None = None,
249256
sequence: int | None = None,
250257
keep: int | None = None,
258+
timeout: float | datetime | None = None,
251259
) -> int:
252260
"""
253261
Purge current stream.

python/tests/conftest.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
import pytest
55
from natsrpy import Nats
6+
from natsrpy.js import JetStream
67

78

89
@pytest.fixture(scope="session")
@@ -29,3 +30,8 @@ async def nats(nats_url: str) -> AsyncGenerator[Nats, None]:
2930
yield nats
3031

3132
await nats.shutdown()
33+
34+
35+
@pytest.fixture(scope="session")
36+
async def js(nats: Nats) -> JetStream:
37+
return await nats.jetstream()

python/tests/test_consumers.py

Lines changed: 11 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
import uuid
22

3-
import pytest
4-
from natsrpy import Nats
53
from natsrpy.js import (
64
AckPolicy,
75
DeliverPolicy,
@@ -15,11 +13,6 @@
1513
)
1614

1715

18-
@pytest.fixture()
19-
async def js(nats: Nats) -> JetStream:
20-
return await nats.jetstream()
21-
22-
2316
async def test_pull_consumer_create(js: JetStream) -> None:
2417
stream_name = f"test-pcreate-{uuid.uuid4()}"
2518
config = StreamConfig(name=stream_name, subjects=[f"{stream_name}.>"])
@@ -40,7 +33,7 @@ async def test_pull_consumer_fetch_with_ack(js: JetStream) -> None:
4033
config = StreamConfig(name=stream_name, subjects=[f"{stream_name}.>"])
4134
stream = await js.streams.create(config)
4235
try:
43-
await js.publish(subj, b"ack-msg")
36+
await js.publish(subj, b"ack-msg", wait=True)
4437

4538
consumer_config = PullConsumerConfig(
4639
name=f"consumer-{uuid.uuid4()}",
@@ -60,7 +53,7 @@ async def test_pull_consumer_nack(js: JetStream) -> None:
6053
config = StreamConfig(name=stream_name, subjects=[f"{stream_name}.>"])
6154
stream = await js.streams.create(config)
6255
try:
63-
await js.publish(subj, b"nack-msg")
56+
await js.publish(subj, b"nack-msg", wait=True)
6457

6558
consumer_config = PullConsumerConfig(
6659
name=f"consumer-{uuid.uuid4()}",
@@ -80,7 +73,7 @@ async def test_pull_consumer_term(js: JetStream) -> None:
8073
config = StreamConfig(name=stream_name, subjects=[f"{stream_name}.>"])
8174
stream = await js.streams.create(config)
8275
try:
83-
await js.publish(subj, b"term-msg")
76+
await js.publish(subj, b"term-msg", wait=True)
8477

8578
consumer_config = PullConsumerConfig(
8679
name=f"consumer-{uuid.uuid4()}",
@@ -100,7 +93,7 @@ async def test_pull_consumer_progress(js: JetStream) -> None:
10093
config = StreamConfig(name=stream_name, subjects=[f"{stream_name}.>"])
10194
stream = await js.streams.create(config)
10295
try:
103-
await js.publish(subj, b"progress-msg")
96+
await js.publish(subj, b"progress-msg", wait=True)
10497

10598
consumer_config = PullConsumerConfig(
10699
name=f"consumer-{uuid.uuid4()}",
@@ -121,7 +114,7 @@ async def test_pull_consumer_message_properties(js: JetStream) -> None:
121114
config = StreamConfig(name=stream_name, subjects=[f"{stream_name}.>"])
122115
stream = await js.streams.create(config)
123116
try:
124-
await js.publish(subj, b"prop-msg")
117+
await js.publish(subj, b"prop-msg", wait=True)
125118

126119
consumer_name = f"consumer-{uuid.uuid4()}"
127120
consumer_config = PullConsumerConfig(name=consumer_name)
@@ -149,8 +142,8 @@ async def test_pull_consumer_with_filter_subject(js: JetStream) -> None:
149142
config = StreamConfig(name=stream_name, subjects=[f"{stream_name}.>"])
150143
stream = await js.streams.create(config)
151144
try:
152-
await js.publish(f"{stream_name}.a", b"msg-a")
153-
await js.publish(f"{stream_name}.b", b"msg-b")
145+
await js.publish(f"{stream_name}.a", b"msg-a", wait=True)
146+
await js.publish(f"{stream_name}.b", b"msg-b", wait=True)
154147

155148
consumer_config = PullConsumerConfig(
156149
name=f"consumer-{uuid.uuid4()}",
@@ -171,8 +164,8 @@ async def test_pull_consumer_deliver_policy(js: JetStream) -> None:
171164
config = StreamConfig(name=stream_name, subjects=[f"{stream_name}.>"])
172165
stream = await js.streams.create(config)
173166
try:
174-
await js.publish(subj, b"old-msg")
175-
await js.publish(subj, b"new-msg")
167+
await js.publish(subj, b"old-msg", wait=True)
168+
await js.publish(subj, b"new-msg", wait=True)
176169

177170
consumer_config = PullConsumerConfig(
178171
name=f"consumer-{uuid.uuid4()}",
@@ -243,7 +236,7 @@ async def test_pull_consumer_messages(js: JetStream) -> None:
243236
stream = await js.streams.create(config)
244237
try:
245238
for message in messages:
246-
await js.publish(subj, message)
239+
await js.publish(subj, message, wait=True)
247240
consumer_config = PullConsumerConfig(name=f"consumer-{uuid.uuid4()}")
248241
consumer = await stream.consumers.create(consumer_config)
249242
msgs_iter = await consumer.fetch(timeout=0.5)
@@ -261,7 +254,7 @@ async def test_push_consumer_messages(js: JetStream) -> None:
261254
stream = await js.streams.create(config)
262255
try:
263256
for message in messages:
264-
await js.publish(subj, message)
257+
await js.publish(subj, message, wait=True)
265258

266259
deliver_subj = uuid.uuid4().hex
267260
consumer_config = PushConsumerConfig(

python/tests/test_jetstream.py

Lines changed: 5 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,6 @@
88
)
99

1010

11-
@pytest.fixture()
12-
async def js(nats: Nats) -> JetStream:
13-
return await nats.jetstream()
14-
15-
1611
async def test_jetstream_creation(nats: Nats) -> None:
1712
js = await nats.jetstream()
1813
assert isinstance(js, JetStream)
@@ -30,26 +25,14 @@ async def test_jetstream_has_object_store_manager(js: JetStream) -> None:
3025
assert js.object_store is not None
3126

3227

33-
async def test_jetstream_publish(js: JetStream) -> None:
34-
stream_name = f"test-js-pub-{uuid.uuid4().hex[:8]}"
35-
subj = f"{stream_name}.data"
36-
config = StreamConfig(name=stream_name, subjects=[f"{stream_name}.>"])
37-
stream = await js.streams.create(config)
38-
try:
39-
await js.publish(subj, b"jetstream-msg")
40-
info = await stream.get_info()
41-
assert info.state.messages >= 1
42-
finally:
43-
await js.streams.delete(stream_name)
44-
45-
46-
async def test_jetstream_publish_str(js: JetStream) -> None:
47-
stream_name = f"test-js-pubstr-{uuid.uuid4().hex[:8]}"
28+
@pytest.mark.parametrize("payload", [b"bytes-test", "str-test"])
29+
async def test_jetstream_publish(js: JetStream, payload: str | bytes) -> None:
30+
stream_name = f"test-js-pub-{uuid.uuid4().hex}"
4831
subj = f"{stream_name}.data"
4932
config = StreamConfig(name=stream_name, subjects=[f"{stream_name}.>"])
5033
stream = await js.streams.create(config)
5134
try:
52-
await js.publish(subj, "string-payload")
35+
await js.publish(subj, payload, wait=True)
5336
info = await stream.get_info()
5437
assert info.state.messages >= 1
5538
finally:
@@ -62,6 +45,6 @@ async def test_jetstream_publish_with_headers(js: JetStream) -> None:
6245
config = StreamConfig(name=stream_name, subjects=[f"{stream_name}.>"])
6346
await js.streams.create(config)
6447
try:
65-
await js.publish(subj, b"with-headers", headers={"x-test": "value"})
48+
await js.publish(subj, b"with-headers", headers={"x-test": "value"}, wait=True)
6649
finally:
6750
await js.streams.delete(stream_name)

python/tests/test_kv.py

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
import uuid
22

3-
import pytest
4-
from natsrpy import Nats
53
from natsrpy.js import (
64
JetStream,
75
KeyValue,
@@ -10,11 +8,6 @@
108
)
119

1210

13-
@pytest.fixture()
14-
async def js(nats: Nats) -> JetStream:
15-
return await nats.jetstream()
16-
17-
1811
async def test_kv_create(js: JetStream) -> None:
1912
bucket = f"test-kv-create-{uuid.uuid4().hex[:8]}"
2013
config = KVConfig(bucket=bucket)

python/tests/test_object_store.py

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
import io
22
import uuid
33

4-
import pytest
5-
from natsrpy import Nats
64
from natsrpy.js import (
75
JetStream,
86
ObjectStore,
@@ -11,11 +9,6 @@
119
)
1210

1311

14-
@pytest.fixture()
15-
async def js(nats: Nats) -> JetStream:
16-
return await nats.jetstream()
17-
18-
1912
async def test_object_store_create(js: JetStream) -> None:
2013
bucket = f"test-os-create-{uuid.uuid4().hex[:8]}"
2114
config = ObjectStoreConfig(bucket=bucket)

python/tests/test_streams.py

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
import uuid
22

3-
import pytest
4-
from natsrpy import Nats
53
from natsrpy.js import (
64
DiscardPolicy,
75
JetStream,
@@ -12,11 +10,6 @@
1210
)
1311

1412

15-
@pytest.fixture()
16-
async def js(nats: Nats) -> JetStream:
17-
return await nats.jetstream()
18-
19-
2013
async def test_stream_create(js: JetStream) -> None:
2114
name = f"test-create-{uuid.uuid4().hex[:8]}"
2215
config = StreamConfig(name=name, subjects=[f"{name}.>"])
@@ -94,9 +87,9 @@ async def test_stream_purge(js: JetStream) -> None:
9487
config = StreamConfig(name=name, subjects=[f"{name}.>"])
9588
stream = await js.streams.create(config)
9689
try:
97-
await js.publish(subj, b"msg-1")
98-
await js.publish(subj, b"msg-2")
99-
await js.publish(subj, b"msg-3")
90+
await js.publish(subj, b"msg-1", wait=True)
91+
await js.publish(subj, b"msg-2", wait=True)
92+
await js.publish(subj, b"msg-3", wait=True)
10093
info = await stream.get_info()
10194
assert info.state.messages == 3
10295
purged = await stream.purge()
@@ -112,8 +105,8 @@ async def test_stream_purge_with_filter(js: JetStream) -> None:
112105
config = StreamConfig(name=name, subjects=[f"{name}.>"])
113106
stream = await js.streams.create(config)
114107
try:
115-
await js.publish(f"{name}.a", b"a-msg")
116-
await js.publish(f"{name}.b", b"b-msg")
108+
await js.publish(f"{name}.a", b"a-msg", wait=True)
109+
await js.publish(f"{name}.b", b"b-msg", wait=True)
117110
purged = await stream.purge(filter=f"{name}.a")
118111
assert purged == 1
119112
info = await stream.get_info()
@@ -128,7 +121,7 @@ async def test_stream_direct_get(js: JetStream) -> None:
128121
config = StreamConfig(name=name, subjects=[f"{name}.>"], allow_direct=True)
129122
stream = await js.streams.create(config)
130123
try:
131-
await js.publish(subj, b"direct-get-msg")
124+
await js.publish(subj, b"direct-get-msg", wait=True)
132125
msg = await stream.direct_get(sequence=1)
133126
assert msg.payload == b"direct-get-msg"
134127
assert msg.subject == subj
@@ -143,7 +136,7 @@ async def test_stream_message_repr(js: JetStream) -> None:
143136
config = StreamConfig(name=name, subjects=[f"{name}.>"], allow_direct=True)
144137
stream = await js.streams.create(config)
145138
try:
146-
await js.publish(subj, b"repr-test")
139+
await js.publish(subj, b"repr-test", wait=True)
147140
msg = await stream.direct_get(sequence=1)
148141
r = repr(msg)
149142
assert isinstance(r, str)
@@ -250,8 +243,8 @@ async def test_stream_state_after_publish(js: JetStream) -> None:
250243
config = StreamConfig(name=name, subjects=[f"{name}.>"])
251244
stream = await js.streams.create(config)
252245
try:
253-
await js.publish(subj, b"msg-1")
254-
await js.publish(subj, b"msg-2")
246+
await js.publish(subj, b"msg-1", wait=True)
247+
await js.publish(subj, b"msg-2", wait=True)
255248
info = await stream.get_info()
256249
assert info.state.messages == 2
257250
assert info.state.first_sequence == 1

src/exceptions/rust_err.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,17 +47,19 @@ pub enum NatsrpyError {
4747
#[error(transparent)]
4848
CreateKeyValueError(#[from] async_nats::jetstream::context::CreateKeyValueError),
4949
#[error(transparent)]
50-
KVEntryError(#[from] async_nats::jetstream::kv::EntryError),
50+
CreateStreamError(#[from] async_nats::jetstream::context::CreateStreamError),
5151
#[error(transparent)]
52-
KVPutError(#[from] async_nats::jetstream::kv::PutError),
52+
GetStreamError(#[from] async_nats::jetstream::context::GetStreamError),
5353
#[error(transparent)]
5454
KVUpdateError(#[from] async_nats::jetstream::context::UpdateKeyValueError),
5555
#[error(transparent)]
56-
DeleteError(#[from] async_nats::jetstream::kv::DeleteError),
56+
JSPublishError(#[from] async_nats::jetstream::context::PublishError),
5757
#[error(transparent)]
58-
CreateStreamError(#[from] async_nats::jetstream::context::CreateStreamError),
58+
KVEntryError(#[from] async_nats::jetstream::kv::EntryError),
5959
#[error(transparent)]
60-
GetStreamError(#[from] async_nats::jetstream::context::GetStreamError),
60+
KVPutError(#[from] async_nats::jetstream::kv::PutError),
61+
#[error(transparent)]
62+
DeleteError(#[from] async_nats::jetstream::kv::DeleteError),
6163
#[error(transparent)]
6264
StreamDirectGetError(#[from] async_nats::jetstream::stream::DirectGetError),
6365
#[error(transparent)]

0 commit comments

Comments
 (0)