Skip to content

Commit f6dcba8

Browse files
authored
Add support for Event Stream. (#122)
* Add support for Event Stream. Co-authored-by: Jonathan Cross <>
1 parent e57f668 commit f6dcba8

8 files changed

Lines changed: 335 additions & 68 deletions

File tree

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ NOMAD_CLIENT_KEY=/path/to/tls/client.key
7070
|client|N|N|N|N
7171
|evaluation|Y|N|Y|N
7272
|evaluations|Y|Y|Y|Y
73+
|event|N|N|N|N
7374
|job|Y|N|Y|N
7475
|jobs|Y|Y|Y|Y
7576
|node|Y|N|Y|N
@@ -119,6 +120,7 @@ NOMAD_IP=127.0.0.1 NOMAD_VERSION=<SEMNATIC_VERSION> py.test --cov=nomad --cov-re
119120
- [x] Client [:link:](docs/api/client.md)
120121
- [x] Evaluation [:link:](docs/api/evaluation.md)
121122
- [x] Evaluations [:link:](docs/api/evaluations.md)
123+
- [x] Event [:link:](docs/api/event.md)
122124
- [x] Job [:link:](docs/api/job.md)
123125
- [x] Jobs [:link:](docs/api/jobs.md)
124126
- [x] Namespace [:link:](docs/api/namespace.md)

docs/api/event.md

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
## Event
2+
3+
## Event Stream
4+
5+
This will setup an event stream. To avoid blocking and having more control to the user it will return a
6+
tuple of (threading.Thread, threading.Event and queue.Queue). You can use your own `queue.Queue` if you want
7+
to use LIFO or SimpleQueue or simply extend upon that.
8+
9+
### Default
10+
Will listen to all topics
11+
12+
```
13+
import nomad
14+
n = nomad.Nomad()
15+
16+
stream, stream_exit_event, events = n.event.stream.get_stream()
17+
stream.start()
18+
19+
while True:
20+
event = events.get()
21+
print(event)
22+
events.task_done()
23+
```
24+
25+
### Set Index, Namespace and Topic(s) of Interest
26+
27+
```
28+
import nomad
29+
n = nomad.Nomad()
30+
31+
stream, stream_exit_event, events = n.event.stream.get_stream(index=0, topic={"Node": "*"}, namespace="not-default")
32+
stream.start()
33+
34+
while True:
35+
event = events.get()
36+
print(event)
37+
events.task_done()
38+
```
39+
40+
### Cancel thread/Optimistically exit
41+
We will use the `stream_exit_event` to get the thread to return/exit gracefully. This isn't immediate
42+
as we have to wait for an event or set an arbitrary timeout value to close/open the connection again.
43+
44+
In this example we will set `stream_exit_event` right before the timeout, knowing that it needs to re-establish
45+
the connection to the stream. Using a try/except with queue.Queue.get(timeout=<VALUE>) we will check if the thread
46+
is still alive; if it isn't we break the loop.
47+
48+
```
49+
import nomad
50+
import threading
51+
import time
52+
import queue
53+
54+
55+
def stop_stream(exit_event, timeout):
56+
print("start sleep")
57+
time.sleep(timeout)
58+
print("set exit event")
59+
exit_event.set()
60+
61+
62+
n = nomad.Nomad()
63+
64+
stream, stream_exit_event, events = n.event.stream.get_stream(index=0, topic={"Node": "*"}, timeout=3.2)
65+
stream.start()
66+
67+
stop = threading.Thread(target=stop_stream, args=(stream_exit_event, 3.0))
68+
stop.start()
69+
70+
while True:
71+
if not stream.is_alive():
72+
print("not alive")
73+
break
74+
75+
try:
76+
event = events.get(timeout=1.0)
77+
print(event)
78+
events.task_done()
79+
except queue.Empty:
80+
continue
81+
```

nomad/__init__.py

Lines changed: 33 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,22 @@
1-
21
import nomad.api as api
32
import os
43

4+
55
class Nomad(object):
66

77
def __init__(self,
8-
host='127.0.0.1',
9-
secure=False,
10-
port=4646,
11-
address=os.getenv('NOMAD_ADDR', None),
12-
namespace=os.getenv('NOMAD_NAMESPACE', None),
13-
token=os.getenv('NOMAD_TOKEN', None),
14-
timeout=5,
15-
region=os.getenv('NOMAD_REGION', None),
16-
version='v1',
17-
verify=False,
18-
cert=(os.getenv('NOMAD_CLIENT_CERT', None),
19-
os.getenv('NOMAD_CLIENT_KEY', None))):
8+
host='127.0.0.1',
9+
secure=False,
10+
port=4646,
11+
address=os.getenv('NOMAD_ADDR', None),
12+
namespace=os.getenv('NOMAD_NAMESPACE', None),
13+
token=os.getenv('NOMAD_TOKEN', None),
14+
timeout=5,
15+
region=os.getenv('NOMAD_REGION', None),
16+
version='v1',
17+
verify=False,
18+
cert=(os.getenv('NOMAD_CLIENT_CERT', None),
19+
os.getenv('NOMAD_CLIENT_KEY', None))):
2020
""" Nomad api client
2121
2222
https://github.com/jrxFive/python-nomad/
@@ -68,28 +68,29 @@ def __init__(self,
6868
"region": self.region
6969
}
7070

71-
self._jobs = api.Jobs(**self.requester_settings)
72-
self._job = api.Job(**self.requester_settings)
73-
self._nodes = api.Nodes(**self.requester_settings)
74-
self._node = api.Node(**self.requester_settings)
75-
self._allocations = api.Allocations(**self.requester_settings)
76-
self._allocation = api.Allocation(**self.requester_settings)
77-
self._evaluations = api.Evaluations(**self.requester_settings)
78-
self._evaluation = api.Evaluation(**self.requester_settings)
71+
self._acl = api.Acl(**self.requester_settings)
7972
self._agent = api.Agent(**self.requester_settings)
73+
self._allocation = api.Allocation(**self.requester_settings)
74+
self._allocations = api.Allocations(**self.requester_settings)
8075
self._client = api.Client(**self.requester_settings)
81-
self._deployments = api.Deployments(**self.requester_settings)
8276
self._deployment = api.Deployment(**self.requester_settings)
77+
self._deployments = api.Deployments(**self.requester_settings)
78+
self._evaluation = api.Evaluation(**self.requester_settings)
79+
self._evaluations = api.Evaluations(**self.requester_settings)
80+
self._event = api.Event(**self.requester_settings)
81+
self._job = api.Job(**self.requester_settings)
82+
self._jobs = api.Jobs(**self.requester_settings)
83+
self._metrics = api.Metrics(**self.requester_settings)
84+
self._namespace = api.Namespace(**self.requester_settings)
85+
self._namespaces = api.Namespaces(**self.requester_settings)
86+
self._node = api.Node(**self.requester_settings)
87+
self._nodes = api.Nodes(**self.requester_settings)
88+
self._operator = api.Operator(**self.requester_settings)
8389
self._regions = api.Regions(**self.requester_settings)
90+
self._sentinel = api.Sentinel(**self.requester_settings)
8491
self._status = api.Status(**self.requester_settings)
8592
self._system = api.System(**self.requester_settings)
86-
self._operator = api.Operator(**self.requester_settings)
8793
self._validate = api.Validate(**self.requester_settings)
88-
self._namespaces = api.Namespaces(**self.requester_settings)
89-
self._namespace = api.Namespace(**self.requester_settings)
90-
self._acl = api.Acl(**self.requester_settings)
91-
self._sentinel = api.Sentinel(**self.requester_settings)
92-
self._metrics = api.Metrics(**self.requester_settings)
9394

9495
def get_uri(self):
9596
if self.secure:
@@ -136,6 +137,10 @@ def evaluations(self):
136137
def evaluation(self):
137138
return self._evaluation
138139

140+
@property
141+
def event(self):
142+
return self._event
143+
139144
@property
140145
def agent(self):
141146
return self._agent

nomad/api/__init__.py

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,25 @@
11
import nomad.api.exceptions
2-
from nomad.api.base import Requester
3-
from nomad.api.jobs import Jobs
4-
from nomad.api.job import Job
5-
from nomad.api.nodes import Nodes
6-
from nomad.api.node import Node
2+
from nomad.api.acl import Acl
73
from nomad.api.agent import Agent
8-
from nomad.api.allocations import Allocations
94
from nomad.api.allocation import Allocation
10-
from nomad.api.evaluations import Evaluations
11-
from nomad.api.evaluation import Evaluation
5+
from nomad.api.allocations import Allocations
6+
from nomad.api.base import Requester
127
from nomad.api.client import Client
8+
from nomad.api.deployment import Deployment
9+
from nomad.api.deployments import Deployments
10+
from nomad.api.evaluation import Evaluation
11+
from nomad.api.evaluations import Evaluations
12+
from nomad.api.event import Event
13+
from nomad.api.job import Job
14+
from nomad.api.jobs import Jobs
15+
from nomad.api.metrics import Metrics
16+
from nomad.api.namespace import Namespace
17+
from nomad.api.namespaces import Namespaces
18+
from nomad.api.node import Node
19+
from nomad.api.nodes import Nodes
20+
from nomad.api.operator import Operator
1321
from nomad.api.regions import Regions
22+
from nomad.api.sentinel import Sentinel
1423
from nomad.api.status import Status
1524
from nomad.api.system import System
16-
from nomad.api.operator import Operator
1725
from nomad.api.validate import Validate
18-
from nomad.api.deployments import Deployments
19-
from nomad.api.deployment import Deployment
20-
from nomad.api.namespaces import Namespaces
21-
from nomad.api.namespace import Namespace
22-
from nomad.api.acl import Acl
23-
from nomad.api.sentinel import Sentinel
24-
from nomad.api.metrics import Metrics

nomad/api/base.py

Lines changed: 33 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -83,12 +83,14 @@ def request(self, *args, **kwargs):
8383
data=kwargs.get("data", None),
8484
json=kwargs.get("json", None),
8585
headers=kwargs.get("headers", None),
86-
allow_redirects=kwargs.get("allow_redirects", False)
86+
allow_redirects=kwargs.get("allow_redirects", False),
87+
timeout=kwargs.get("timeout", self.timeout),
88+
stream=kwargs.get("stream", False)
8789
)
8890

8991
return response
9092

91-
def _request(self, method, endpoint, params=None, data=None, json=None, headers=None, allow_redirects=None):
93+
def _request(self, method, endpoint, params=None, data=None, json=None, headers=None, allow_redirects=None, timeout=None, stream=False):
9294
url = self._url_builder(endpoint)
9395
qs = self._query_string_builder(endpoint=endpoint, params=params)
9496

@@ -109,46 +111,47 @@ def _request(self, method, endpoint, params=None, data=None, json=None, headers=
109111
method = method.lower()
110112
if method == "get":
111113
response = self.session.get(
112-
url=url,
113-
params=params,
114+
allow_redirects=allow_redirects,
115+
cert=self.cert,
114116
headers=headers,
115-
timeout=self.timeout,
117+
params=params,
118+
stream=stream,
119+
timeout=timeout,
120+
url=url,
116121
verify=self.verify,
117-
cert=self.cert,
118-
allow_redirects=allow_redirects
119122
)
120123

121124
elif method == "post":
122125
response = self.session.post(
123-
url=url,
124-
params=params,
125-
json=json,
126-
headers=headers,
126+
allow_redirects=allow_redirects,
127+
cert=self.cert,
127128
data=data,
128-
timeout=self.timeout,
129+
headers=headers,
130+
json=json,
131+
params=params,
132+
timeout=timeout,
133+
url=url,
129134
verify=self.verify,
130-
cert=self.cert,
131-
allow_redirects=allow_redirects
132135
)
133136
elif method == "put":
134137
response = self.session.put(
135-
url=url,
136-
params=params,
137-
json=json,
138-
headers=headers,
138+
cert=self.cert,
139139
data=data,
140+
headers=headers,
141+
json=json,
142+
params=params,
143+
timeout=timeout,
144+
url=url,
140145
verify=self.verify,
141-
cert=self.cert,
142-
timeout=self.timeout
143146
)
144147
elif method == "delete":
145148
response = self.session.delete(
146-
url=url,
147-
params=params,
149+
cert=self.cert,
148150
headers=headers,
151+
params=params,
152+
timeout=timeout,
153+
url=url,
149154
verify=self.verify,
150-
cert=self.cert,
151-
timeout=self.timeout
152155
)
153156

154157
if response.ok:
@@ -162,5 +165,11 @@ def _request(self, method, endpoint, params=None, data=None, json=None, headers=
162165
else:
163166
raise nomad.api.exceptions.BaseNomadException(response)
164167

168+
except requests.exceptions.ConnectionError as error:
169+
if all([stream, timeout]):
170+
raise nomad.api.exceptions.TimeoutNomadException(error)
171+
172+
raise nomad.api.exceptions.BaseNomadException(error)
173+
165174
except requests.RequestException as error:
166175
raise nomad.api.exceptions.BaseNomadException(error)

0 commit comments

Comments
 (0)