Skip to content

Commit b0cd6cc

Browse files
Merge pull request #234 from ezmsg-org/feature/process-control
Phase 2: Process Control, Settings, Topology, And Profiling APIs
2 parents 2393c08 + 71071b3 commit b0cd6cc

33 files changed

Lines changed: 7549 additions & 117 deletions

examples/ezmsg_toy.py

Lines changed: 37 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,38 +24,64 @@ class LFOSettings(ez.Settings):
2424
update_rate: float = 2.0 # Hz, update rate
2525

2626

27+
class LFOState(ez.State):
28+
start_time: float
29+
cur_settings: LFOSettings
30+
31+
2732
class LFO(ez.Unit):
2833
SETTINGS = LFOSettings
34+
STATE = LFOState
2935

3036
OUTPUT = ez.OutputStream(float)
3137

38+
INPUT_SETTINGS = ez.InputStream(LFOSettings)
39+
3240
async def initialize(self) -> None:
33-
self.start_time = time.time()
41+
self.STATE.cur_settings = self.SETTINGS
42+
self.STATE.start_time = time.time()
3443

44+
@ez.subscriber(INPUT_SETTINGS)
45+
async def on_settings(self, msg: LFOSettings) -> None:
46+
self.STATE.cur_settings = msg
47+
3548
@ez.publisher(OUTPUT)
3649
async def generate(self) -> AsyncGenerator:
3750
while True:
38-
t = time.time() - self.start_time
39-
yield self.OUTPUT, math.sin(2.0 * math.pi * self.SETTINGS.freq * t)
40-
await asyncio.sleep(1.0 / self.SETTINGS.update_rate)
51+
t = time.time() - self.STATE.start_time
52+
yield self.OUTPUT, math.sin(2.0 * math.pi * self.STATE.cur_settings.freq * t)
53+
await asyncio.sleep(1.0 / self.STATE.cur_settings.update_rate)
4154

4255

4356
# MESSAGE GENERATOR
4457
class MessageGeneratorSettings(ez.Settings):
4558
message: str
4659

4760

61+
class MessageGeneratorState(ez.State):
62+
cur_settings: MessageGeneratorSettings
63+
64+
4865
class MessageGenerator(ez.Unit):
4966
SETTINGS = MessageGeneratorSettings
67+
STATE = MessageGeneratorState
5068

5169
OUTPUT = ez.OutputStream(str)
70+
INPUT_SETTINGS = ez.InputStream(MessageGeneratorSettings)
71+
72+
async def initialize(self) -> None:
73+
self.STATE.cur_settings = self.SETTINGS
74+
75+
@ez.subscriber(INPUT_SETTINGS)
76+
async def on_settings(self, msg: MessageGeneratorSettings) -> None:
77+
self.STATE.cur_settings = msg
5278

5379
@ez.publisher(OUTPUT)
5480
async def spawn_message(self) -> AsyncGenerator:
5581
while True:
5682
await asyncio.sleep(1.0)
57-
ez.logger.info(f"Spawning {self.SETTINGS.message}")
58-
yield self.OUTPUT, self.SETTINGS.message
83+
ez.logger.info(f"Spawning {self.STATE.cur_settings.message}")
84+
yield self.OUTPUT, self.STATE.cur_settings.message
5985

6086
@ez.publisher(OUTPUT)
6187
async def spawn_once(self) -> AsyncGenerator:
@@ -152,6 +178,8 @@ class TestSystemSettings(ez.Settings):
152178
class TestSystem(ez.Collection):
153179
SETTINGS = TestSystemSettings
154180

181+
OUTPUT_PING = ez.OutputTopic(str)
182+
155183
# Publishers
156184
PING = MessageGenerator()
157185
FOO = MessageGenerator()
@@ -173,6 +201,7 @@ def configure(self) -> None:
173201
# Define Connections
174202
def network(self) -> ez.NetworkDefinition:
175203
return (
204+
(self.PING.OUTPUT, self.OUTPUT_PING),
176205
(self.PING.OUTPUT, self.PINGSUB1.INPUT),
177206
(self.PING.OUTPUT, self.MODIFIER_COLLECTION.INPUT),
178207
(self.MODIFIER_COLLECTION.OUTPUT, self.PINGSUB2.INPUT),
@@ -193,7 +222,7 @@ def process_components(self):
193222
ez.run(
194223
SYSTEM=system,
195224
connections=[
196-
# Make PING.OUTPUT available on a topic ezmsg_attach.py
197-
(system.PING.OUTPUT, "GLOBAL_PING_TOPIC"),
225+
# Make a system output available on a topic ezmsg_attach.py
226+
(system.OUTPUT_PING, "GLOBAL_PING_TOPIC"),
198227
],
199228
)

0 commit comments

Comments
 (0)