Skip to content

Commit 53785d0

Browse files
Ogban UgotOgban Ugot
authored andcommitted
v0.1.0
1 parent ca14d13 commit 53785d0

21 files changed

Lines changed: 460 additions & 399 deletions

README.md

Lines changed: 88 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,94 @@
11
# avenieca-python (WIP!)
2-
Python SDK for AveniECA
2+
Python SDK for publishing state signals to the AveniECA suite.
33

4-
## run
5-
```bash
6-
python consumer.py localhost:9092 testtopic
4+
## Usage
5+
6+
### Stream continuously to a topic
7+
```python
8+
import os
9+
import numpy as np
10+
from avenieca.utils import Config
11+
from avenieca.utils import Signal
12+
from avenieca.producers import Stream
13+
14+
# Define a handler that returns a signal dict like
15+
# the sample in avenieca.utils.signal
16+
17+
def handler():
18+
Signal["valence"] = 10
19+
Signal["state"] = np.array([0.2, 0.3, 0.8])
20+
return Signal
21+
22+
# Initialize Kafka configuration for the Stream
23+
Config["bootstrap_servers"] = os.environ["KAFKA_URL"]
24+
Config["topic"] = "left_wheel" #digital twin subscriber-topic
25+
26+
#Initialize the Stream object with a sync_rate
27+
# (the rate at which to publish signals).
28+
stream = Stream(config=Config, sync_rate=1)
29+
stream.publish(handler)
730
```
31+
32+
### Publish one signal as an event
33+
```python
34+
import os
35+
import numpy as np
36+
from avenieca.utils import Config
37+
from avenieca.utils import Signal
38+
from avenieca.producers import Event
39+
40+
# Initialize Kafka configuration for the Event
41+
Config["bootstrap_servers"] = os.environ["KAFKA_URL"]
42+
Config["topic"] = "left_wheel" #digital twin subscriber-topic
43+
44+
# Define the signal
45+
Signal["valence"] = 9
46+
Signal["state"] = np.array([0.2, 0.3, 0.8])
47+
48+
event = Event(config=Config)
49+
event.publish(Signal)
50+
```
51+
52+
### Consume from kafka topic
53+
```python
54+
import os
55+
import numpy as np
56+
from avenieca.utils import Config
57+
from avenieca.utils.signal import get_state_as_list, get_state_as_array
58+
from avenieca.consumer import Consumer
59+
60+
# Initialize Kafka configuration for the Event
61+
Config["bootstrap_servers"] = os.environ["KAFKA_URL"]
62+
Config["topic"] = "left_wheel" #digital twin subscriber-topic
63+
64+
# Define a handler to process incoming messages
65+
def handler(data):
66+
valence = data["valence"]
67+
state = data["state"]
68+
assert valence == 10
69+
assert state == "[0.2, 0.3, 0.8]"
70+
71+
client = Consumer(config=Config)
72+
client.consume(handler, True) # pass in handler
73+
74+
# You can use util functions in your handler to
75+
# convert the state signal from byte string to
76+
# np.ndarray or python list
77+
def handler(data):
78+
assert data["valence"] == 10
79+
assert data["state"] == "[0.2, 0.3, 0.8]"
80+
get_state_as_list(data)
81+
assert data["state"] == [0.2, 0.3, 0.8]
82+
83+
def handler(data):
84+
assert data["valence"] == 10
85+
assert data["state"] == "[0.2, 0.3, 0.8]"
86+
get_state_as_array(data)
87+
assert True, np.array_equal(data["state"], np.array([0.2, 0.3, 0.8]))
88+
```
89+
90+
## Tests
891
```bash
9-
python producer.py localhost:9092 testtwin1
92+
python -m pytest test/
1093
```
1194

avenieca/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
from avenieca.producers import *
2+
from avenieca.consumer import *
3+
from avenieca.producer import *
4+
from avenieca.utils import *

avenieca/consumer.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
import json
2+
from kafka import KafkaConsumer
3+
4+
5+
class Consumer:
6+
"""
7+
Base consumer for consuming messages (signals) from a digital twin.
8+
9+
:param config: configuration dictionary
10+
"""
11+
def __init__(self,
12+
config: dict,
13+
):
14+
self.config = config
15+
self.topic = config["topic"]
16+
self.client = KafkaConsumer(
17+
self.topic,
18+
bootstrap_servers=config["bootstrap_servers"],
19+
auto_offset_reset=config["auto_offset_reset"]
20+
)
21+
22+
def consume(self, func, sync_once=False):
23+
"""
24+
:param func: handler to process received messages
25+
:param sync_once: run consume loop once
26+
:return: none
27+
"""
28+
for msg in self.client:
29+
byte_val = msg.value
30+
data = json.loads(byte_val)
31+
func(data)
32+
if sync_once:
33+
break

avenieca/producer.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
import json
2+
from kafka import KafkaProducer
3+
4+
5+
class Producer:
6+
"""
7+
Base producer for publishing messages.
8+
9+
:param config: configuration dictionary
10+
"""
11+
def __init__(self,
12+
config: dict,
13+
):
14+
self.config = config
15+
self.topic = config["topic"]
16+
self.client = KafkaProducer(bootstrap_servers=config["bootstrap_servers"])
17+
18+
def send(self, data: dict):
19+
"""
20+
:param data: serialized signal dictionary
21+
:return: FutureRecordMetadata
22+
"""
23+
json_object = json.dumps(data).encode("utf-8")
24+
result = self.client.send(self.topic, json_object)
25+
return result
26+

avenieca/producers/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
from avenieca.producers.stream import *
2+
from avenieca.producers.event import *

avenieca/producers/event.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
from avenieca.producer import Producer
2+
from avenieca.utils.signal import verify_signal
3+
4+
5+
class Event(Producer):
6+
"""
7+
Event producer class for syncing from an event driven source.
8+
Use this class if you want to handle the outer syncing logic, then pass the
9+
signal to the publish method to publish.
10+
11+
:param config: configuration dictionary
12+
"""
13+
def __init__(self, config: dict):
14+
super().__init__(config)
15+
self.config = config
16+
self.sync = True
17+
18+
def publish(self, signal: dict):
19+
"""
20+
call this method with the signal dictionary to publish once to a digital twin
21+
22+
:param signal: signal data
23+
:return: None
24+
"""
25+
if self.sync:
26+
verify_signal(signal)
27+
return self.send(signal)
28+
29+
@property
30+
def config(self):
31+
return self.config
32+
33+
@config.setter
34+
def config(self, value):
35+
self._config = value

avenieca/producers/stream.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
import time
2+
from avenieca.producer import Producer
3+
from avenieca.utils.signal import verify_signal
4+
5+
6+
class Stream(Producer):
7+
"""
8+
Stream producer class for continuous syncing at a sync_rate. Use this class if you want the
9+
library to handle the syncing logic for you. Provide a handler that returns the signal
10+
data for publishing.
11+
12+
:param config: config dictionary
13+
:param sync_rate: int (seconds) or float (sub-seconds)
14+
"""
15+
16+
def __init__(self, config: dict, sync_rate: [int, float]):
17+
super().__init__(config)
18+
self.config = config
19+
self.sync_rate = sync_rate
20+
self.sync = True
21+
22+
def publish(self, func, sync_once=False):
23+
"""
24+
Basic publish stream timed by the sync_rate
25+
26+
:param func: handler to return the signal (dict data) for publishing
27+
:param sync_once: run the sync loop once
28+
:return: none
29+
"""
30+
while self.sync:
31+
signal = func()
32+
verify_signal(signal)
33+
self.send(signal)
34+
if sync_once:
35+
break
36+
time.sleep(self.sync_rate)
37+
38+
@property
39+
def config(self):
40+
return self.config
41+
42+
@config.setter
43+
def config(self, value):
44+
self._config = value

avenieca/utils/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
from avenieca.utils.config import *
2+
from avenieca.utils.signal import *

avenieca/utils/config.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
Config = {
2+
"bootstrap_servers": str,
3+
"topic": str,
4+
"auto_offset_reset": "earliest"
5+
}

avenieca/utils/signal.py

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
import json
2+
3+
import numpy as np
4+
5+
Signal = {
6+
"valence": None,
7+
"score": None,
8+
"state": None,
9+
}
10+
11+
DEFAULT_STATE_DIM = 1
12+
13+
14+
def get_state_as_array(signal, dtype=np.float64):
15+
state = signal["state"]
16+
if type(state) == str:
17+
state = json.loads(state)
18+
arr = np.array(state, dtype=dtype)
19+
signal["state"] = arr
20+
21+
22+
def get_state_as_list(signal, dtype=np.float64):
23+
state = signal["state"]
24+
if type(state) == str:
25+
state = json.loads(state)
26+
arr = np.array(state, dtype=dtype)
27+
signal["state"] = arr.tolist()
28+
29+
30+
def verify_signal(signal):
31+
assert type(signal) == dict
32+
assert len(signal) == 3
33+
34+
if signal["state"] is None:
35+
raise Exception("signal state cannot be None")
36+
37+
if type(signal["state"]) == str:
38+
verify_str_shape(signal["state"])
39+
40+
if type(signal["state"]) == list:
41+
arr_list = signal["state"]
42+
verify_list_shape(arr_list)
43+
if all(isinstance(item, (int, float)) for item in arr_list):
44+
signal["state"] = json.dumps(arr_list)
45+
return
46+
else:
47+
raise Exception("signal state values must be int or float")
48+
49+
if type(signal["state"]) == np.ndarray:
50+
try:
51+
arr_list = signal["state"].tolist()
52+
verify_list_shape(arr_list)
53+
signal["state"] = json.dumps(arr_list)
54+
return
55+
except Exception as e:
56+
raise Exception("error converting state signal from numpy array to byte string: {}".format(e))
57+
58+
59+
def verify_str_shape(state):
60+
if state == "":
61+
raise Exception("signal state cannot be empty")
62+
state = json.loads(state)
63+
verify_list_shape(state)
64+
65+
66+
def verify_list_shape(state, dtype=np.float64):
67+
arr = np.array(state, dtype=dtype)
68+
verify_np_shape(arr)
69+
70+
71+
def verify_np_shape(state: np.ndarray):
72+
if state.ndim > DEFAULT_STATE_DIM:
73+
raise Exception("state signal should be of dimension {} got {}".format(DEFAULT_STATE_DIM, state.ndim))

0 commit comments

Comments
 (0)