-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathconsumer.rs
More file actions
108 lines (94 loc) · 2.96 KB
/
consumer.rs
File metadata and controls
108 lines (94 loc) · 2.96 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
use std::sync::Arc;
use futures_util::StreamExt;
use pyo3::{Bound, PyAny, PyRef, Python};
use crate::{
exceptions::rust_err::{NatsrpyError, NatsrpyResult},
js::pymod::JetStreamMessage,
utils::{futures::natsrpy_future_with_timeout, natsrpy_future, py_types::TimeValue},
};
type NatsPushConsumer =
async_nats::jetstream::consumer::Consumer<async_nats::jetstream::consumer::push::Config>;
#[pyo3::pyclass(from_py_object)]
#[derive(Debug, Clone)]
pub struct PushConsumer {
#[pyo3(get)]
name: String,
#[pyo3(get)]
stream_name: String,
consumer: Arc<NatsPushConsumer>,
}
impl PushConsumer {
#[must_use]
pub fn new(consumer: NatsPushConsumer) -> Self {
let info = consumer.cached_info();
Self {
name: info.name.clone(),
stream_name: info.stream_name.clone(),
consumer: Arc::new(consumer),
}
}
}
#[pyo3::pyclass]
pub struct MessagesIterator {
messages: Option<Arc<tokio::sync::Mutex<async_nats::jetstream::consumer::push::Messages>>>,
}
impl From<async_nats::jetstream::consumer::push::Messages> for MessagesIterator {
fn from(value: async_nats::jetstream::consumer::push::Messages) -> Self {
Self {
messages: Some(Arc::new(tokio::sync::Mutex::new(value))),
}
}
}
#[pyo3::pymethods]
impl PushConsumer {
pub fn messages<'py>(&self, py: Python<'py>) -> NatsrpyResult<Bound<'py, PyAny>> {
let consumer = self.consumer.clone();
natsrpy_future(py, async move {
Ok(MessagesIterator::from(consumer.messages().await?))
})
}
#[must_use]
pub fn __repr__(&self) -> String {
format!(
"PushConsumer<name={name:?}, stream_name={stream_name:?}>",
name = self.name,
stream_name = self.stream_name
)
}
}
#[pyo3::pymethods]
impl MessagesIterator {
#[must_use]
pub const fn __aiter__(slf: PyRef<Self>) -> PyRef<Self> {
slf
}
#[pyo3(signature=(timeout=None))]
pub fn next<'py>(
&self,
py: Python<'py>,
timeout: Option<TimeValue>,
) -> NatsrpyResult<Bound<'py, PyAny>> {
let Some(messages_guard) = self.messages.clone() else {
unreachable!("Message is always Some in runtime.")
};
#[allow(clippy::significant_drop_tightening)]
natsrpy_future_with_timeout(py, timeout, async move {
let mut messages = messages_guard.lock().await;
let Some(message) = messages.next().await else {
return Err(NatsrpyError::AsyncStopIteration);
};
let message = message?;
JetStreamMessage::try_from(message)
})
}
pub fn __anext__<'py>(&self, py: Python<'py>) -> NatsrpyResult<Bound<'py, PyAny>> {
self.next(py, None)
}
}
impl Drop for MessagesIterator {
fn drop(&mut self) {
pyo3_async_runtimes::tokio::get_runtime().block_on(async move {
self.messages = None;
});
}
}