Skip to content

Commit 7f6ca7b

Browse files
Merge pull request #215 from ezmsg-org/feature/sync-lowlevel-api
Feature: Synchronous Low‑Level API for ezmsg (ROS2‑style ergonomics)
2 parents 24e8a13 + 4cec249 commit 7f6ca7b

24 files changed

Lines changed: 1657 additions & 159 deletions

examples/lowlevel_api.py

Lines changed: 0 additions & 119 deletions
This file was deleted.

examples/simple_async_publisher.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
import asyncio
2+
3+
import ezmsg.core as ez
4+
5+
TOPIC = "/TEST"
6+
7+
8+
async def main(host: str = "127.0.0.1", port: int = 12345) -> None:
9+
async with ez.GraphContext((host, port), auto_start=True) as ctx:
10+
pub = await ctx.publisher(TOPIC)
11+
try:
12+
print("Publisher Task Launched")
13+
count = 0
14+
while True:
15+
await pub.broadcast(f"{count=}")
16+
await asyncio.sleep(0.1)
17+
count += 1
18+
except asyncio.CancelledError:
19+
pass
20+
finally:
21+
print("Publisher Task Concluded")
22+
23+
24+
if __name__ == "__main__":
25+
from argparse import ArgumentParser
26+
27+
parser = ArgumentParser()
28+
parser.add_argument("--host", default="127.0.0.1", help="hostname for graphserver")
29+
parser.add_argument("--port", default=12345, type=int, help="port for graphserver")
30+
31+
args = parser.parse_args()
32+
33+
asyncio.run(main(host=args.host, port=args.port))
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
import asyncio
2+
3+
import ezmsg.core as ez
4+
5+
TOPIC = "/TEST"
6+
7+
8+
async def main(host: str = "127.0.0.1", port: int = 12345) -> None:
9+
async with ez.GraphContext((host, port), auto_start=True) as ctx:
10+
sub = await ctx.subscriber(TOPIC)
11+
try:
12+
print("Subscriber Task Launched")
13+
while True:
14+
async with sub.recv_zero_copy() as msg:
15+
# Uncomment if you want to witness backpressure!
16+
# await asyncio.sleep(1.0)
17+
print(msg)
18+
except asyncio.CancelledError:
19+
pass
20+
finally:
21+
print("Subscriber Task Concluded")
22+
print("Detached")
23+
24+
25+
if __name__ == "__main__":
26+
from argparse import ArgumentParser
27+
28+
parser = ArgumentParser()
29+
parser.add_argument("--host", default="127.0.0.1", help="hostname for graphserver")
30+
parser.add_argument("--port", default=12345, type=int, help="port for graphserver")
31+
32+
args = parser.parse_args()
33+
34+
asyncio.run(main(host=args.host, port=args.port))

examples/simple_publisher.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
import time
2+
3+
import ezmsg.core as ez
4+
5+
TOPIC = "/TEST"
6+
7+
def main(host: str = "127.0.0.1", port: int = 12345) -> None:
8+
with ez.sync.init((host, port), auto_start=True) as ctx:
9+
pub = ctx.create_publisher(TOPIC, force_tcp=True)
10+
11+
print("Publisher Task Launched")
12+
count = 0
13+
try:
14+
while True:
15+
output = f"{count=}"
16+
pub.publish(output)
17+
print(output)
18+
time.sleep(0.1)
19+
count += 1
20+
except KeyboardInterrupt:
21+
pass
22+
print("Publisher Task Concluded")
23+
24+
print("Done")
25+
26+
27+
if __name__ == "__main__":
28+
from argparse import ArgumentParser
29+
30+
parser = ArgumentParser()
31+
parser.add_argument("--host", default="127.0.0.1", help="hostname for graphserver")
32+
parser.add_argument("--port", default=12345, type=int, help="port for graphserver")
33+
args = parser.parse_args()
34+
35+
main(host=args.host, port=args.port)

examples/simple_subscriber.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
import ezmsg.core as ez
2+
3+
TOPIC = "/TEST"
4+
5+
6+
def main(host: str = "127.0.0.1", port: int = 12345) -> None:
7+
with ez.sync.init((host, port), auto_start=True) as ctx:
8+
print("Subscriber Task Launched")
9+
10+
def on_message(msg: str) -> None:
11+
# Uncomment if you want to witness backpressure!
12+
# import time
13+
# time.sleep(1.0)
14+
print(msg)
15+
16+
ctx.create_subscription(TOPIC, callback=on_message)
17+
ez.sync.spin(ctx)
18+
19+
print("Subscriber Task Concluded")
20+
21+
22+
if __name__ == "__main__":
23+
from argparse import ArgumentParser
24+
25+
parser = ArgumentParser()
26+
parser.add_argument("--host", default="127.0.0.1", help="hostname for graphserver")
27+
parser.add_argument("--port", default=12345, type=int, help="port for graphserver")
28+
args = parser.parse_args()
29+
30+
main(host=args.host, port=args.port)

src/ezmsg/core/__init__.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@
2525
"NormalTermination",
2626
"GraphServer",
2727
"GraphContext",
28+
"sync",
29+
"SyncContext",
30+
"SyncPublisher",
31+
"SyncSubscriber",
2832
"run_command",
2933
"Publisher",
3034
"Subscriber",
@@ -45,6 +49,8 @@
4549
from .backendprocess import Complete, NormalTermination
4650
from .graphserver import GraphServer
4751
from .graphcontext import GraphContext
52+
from . import sync
53+
from .sync import SyncContext, SyncPublisher, SyncSubscriber
4854
from .command import run_command
4955
from .pubclient import Publisher
5056
from .subclient import Subscriber

src/ezmsg/core/backend.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,15 @@ def graph_address(self) -> AddressType | None:
222222
return self._graph_context.graph_address
223223
return self._graph_address
224224

225+
@property
226+
def strict_shutdown(self) -> bool:
227+
value = os.environ.get("EZMSG_STRICT_SHUTDOWN", "")
228+
return value.lower() in ("1", "true", "yes", "on")
229+
230+
@strict_shutdown.setter
231+
def strict_shutdown(self, value: bool) -> None:
232+
os.environ["EZMSG_STRICT_SHUTDOWN"] = "1" if value else "0"
233+
225234
@property
226235
def graph_server_spawned(self) -> bool:
227236
return self._graph_server_spawned

0 commit comments

Comments
 (0)