Skip to content

Commit 7edf955

Browse files
committed
Fixed formatting.
1 parent 3896b94 commit 7edf955

5 files changed

Lines changed: 78 additions & 60 deletions

File tree

examples/rpc_run_client.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import time
2+
23
from python.rcs.rpc.client import RcsClient
34

45
if __name__ == "__main__":

examples/rpc_run_server.py

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,26 @@
1+
from rcs.envs.base import ControlMode, RelativeTo
12
from rcs.envs.creators import SimEnvCreator
23
from rcs.envs.utils import (
3-
default_mujoco_cameraset_cfg,
4-
default_sim_gripper_cfg,
5-
default_sim_robot_cfg,
4+
default_mujoco_cameraset_cfg,
5+
default_sim_gripper_cfg,
6+
default_sim_robot_cfg,
67
)
7-
from rcs.envs.base import ControlMode, RelativeTo
88
from rcs.rpc.server import RcsServer
99

10+
1011
def run_server():
11-
env = SimEnvCreator()(
12-
control_mode=ControlMode.JOINTS,
13-
collision_guard=False,
14-
robot_cfg=default_sim_robot_cfg(),
15-
gripper_cfg=default_sim_gripper_cfg(),
16-
cameras=default_mujoco_cameraset_cfg(),
17-
max_relative_movement=0.1,
18-
relative_to=RelativeTo.LAST_STEP,
19-
)
20-
server = RcsServer(env, port=50051)
21-
server.start()
22-
12+
env = SimEnvCreator()(
13+
control_mode=ControlMode.JOINTS,
14+
collision_guard=False,
15+
robot_cfg=default_sim_robot_cfg(),
16+
gripper_cfg=default_sim_gripper_cfg(),
17+
cameras=default_mujoco_cameraset_cfg(),
18+
max_relative_movement=0.1,
19+
relative_to=RelativeTo.LAST_STEP,
20+
)
21+
server = RcsServer(env, port=50051)
22+
server.start()
23+
24+
2325
if __name__ == "__main__":
24-
run_server()
26+
run_server()

python/rcs/rpc/client.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,9 @@
22
import rpyc
33
from rpyc.utils.classic import obtain
44

5+
56
class RcsClient(gym.Env):
6-
def __init__(self, host='localhost', port=50051):
7+
def __init__(self, host="localhost", port=50051):
78
super().__init__()
89
self.conn = rpyc.connect(host, port)
910
self.server = self.conn.root
@@ -23,7 +24,7 @@ def get_obs(self):
2324
@property
2425
def unwrapped(self):
2526
return self.server.unwrapped()
26-
27+
2728
@property
2829
def action_space(self):
2930
return obtain(self.server.action_space())

python/rcs/rpc/server.py

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
1-
21
# import wrapper
3-
from gymnasium import Wrapper
42
import rpyc
3+
from gymnasium import Wrapper
54
from rpyc.utils.server import ThreadedServer
6-
rpyc.core.protocol.DEFAULT_CONFIG['allow_pickle'] = True
5+
6+
rpyc.core.protocol.DEFAULT_CONFIG["allow_pickle"] = True
7+
78

89
@rpyc.service
910
class RcsServer(Wrapper, rpyc.Service):
10-
def __init__(self, env, host='localhost', port=50051):
11+
def __init__(self, env, host="localhost", port=50051):
1112
super().__init__(env)
1213
self.host = host
1314
self.port = port
@@ -25,25 +26,24 @@ def reset(self, **kwargs):
2526
@rpyc.exposed
2627
def get_obs(self):
2728
"""Get the current observation using the Wrapper base class if available."""
28-
if hasattr(super(), 'get_obs'):
29+
if hasattr(super(), "get_obs"):
2930
return super().get_obs()
30-
elif hasattr(self.env, 'get_obs'):
31+
if hasattr(self.env, "get_obs"):
3132
return self.env.get_obs()
32-
else:
33-
raise NotImplementedError("The environment does not have a get_obs method.")
33+
error = "The environment does not have a get_obs method."
34+
raise NotImplementedError(error)
3435

3536
@rpyc.exposed
3637
def unwrapped(self):
3738
"""Return the unwrapped environment using the Wrapper base class."""
3839
return super().unwrapped
39-
40+
4041
@rpyc.exposed
4142
def action_space(self):
4243
"""Return the action space using the Wrapper base class."""
4344
return super().action_space
4445

4546
def start(self):
46-
import time
4747
print(f"Starting RcsServer RPC (looped OneShotServer) on {self.host}:{self.port}")
4848
t = ThreadedServer(self, port=self.port)
49-
t.start()
49+
t.start()

python/tests/test_rpc.py

Lines changed: 44 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,35 @@
11
import multiprocessing
2-
import time
2+
import os
33
import socket
44
import sys
5+
import time
56
import traceback
6-
import os
7+
from contextlib import suppress
8+
from multiprocessing.context import ForkServerContext, SpawnContext
9+
from typing import Optional, Type, Union # Add Type and Union here
10+
711
import pytest
8-
from typing import Optional # Add this import at the top
9-
from rcs.envs.creators import SimEnvCreator
10-
from rcs.envs.utils import (
11-
default_mujoco_cameraset_cfg,
12-
default_sim_gripper_cfg,
13-
default_sim_robot_cfg,
14-
)
1512
from rcs.envs.base import ControlMode, RelativeTo
16-
from rcs.rpc.server import RcsServer
13+
from rcs.envs.creators import SimEnvCreator
14+
from rcs.envs.utils import default_sim_gripper_cfg, default_sim_robot_cfg
1715
from rcs.rpc.client import RcsClient
16+
from rcs.rpc.server import RcsServer
1817

1918
HOST = "127.0.0.1"
2019

20+
2121
def get_free_port() -> int:
2222
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
2323
s.bind((HOST, 0))
2424
return s.getsockname()[1]
2525

26+
2627
def wait_for_port(
2728
host: str,
2829
port: int,
2930
timeout: float,
3031
server_proc: Optional[multiprocessing.Process] = None,
31-
err_q: Optional[multiprocessing.Queue] = None
32+
err_q: Optional[multiprocessing.Queue] = None,
3233
) -> None:
3334
start = time.time()
3435
last_exc = None
@@ -44,21 +45,17 @@ def wait_for_port(
4445
if server_proc is not None and not server_proc.is_alive():
4546
server_err = None
4647
if err_q is not None:
47-
try:
48+
with suppress(Exception):
4849
server_err = err_q.get_nowait()
49-
except Exception:
50-
pass
5150
msg = f"Server process exited early (exitcode={server_proc.exitcode})."
5251
if server_err:
5352
msg += f"\nServer traceback:\n{server_err}"
5453
raise RuntimeError(msg)
5554
time.sleep(0.2)
5655
server_err = None
5756
if err_q is not None:
58-
try:
57+
with suppress(Exception):
5958
server_err = err_q.get_nowait()
60-
except Exception:
61-
pass
6259
msg = f"Timed out waiting for {host}:{port} to open."
6360
if last_exc:
6461
msg += f" Last socket error: {last_exc}"
@@ -68,6 +65,7 @@ def wait_for_port(
6865
msg += f"\nServer traceback:\n{server_err}"
6966
raise TimeoutError(msg)
7067

68+
7169
def run_server(host: str, port: int, err_q: multiprocessing.Queue) -> None:
7270
try:
7371
env = SimEnvCreator()(
@@ -76,7 +74,7 @@ def run_server(host: str, port: int, err_q: multiprocessing.Queue) -> None:
7674
robot_cfg=default_sim_robot_cfg(),
7775
gripper_cfg=default_sim_gripper_cfg(),
7876
# Disabled to avoid rendering problem in python subprocess.
79-
#cameras=default_mujoco_cameraset_cfg(),
77+
# cameras=default_mujoco_cameraset_cfg(),
8078
max_relative_movement=0.1,
8179
relative_to=RelativeTo.LAST_STEP,
8280
)
@@ -90,20 +88,22 @@ def run_server(host: str, port: int, err_q: multiprocessing.Queue) -> None:
9088
time.sleep(1)
9189
except Exception:
9290
tb = "".join(traceback.format_exception(*sys.exc_info()))
93-
try:
91+
with suppress(Exception):
9492
err_q.put(tb)
95-
except Exception:
96-
pass
9793
sys.exit(1)
9894

99-
def _mp_context() -> multiprocessing.context.BaseContext:
95+
96+
def _mp_context() -> Union[SpawnContext, ForkServerContext]:
10097
# Prefer spawn to avoid fork-related issues with GL/MuJoCo/threaded libs
10198
methods = multiprocessing.get_all_start_methods()
10299
if "spawn" in methods:
103100
return multiprocessing.get_context("spawn")
104101
if "forkserver" in methods:
105102
return multiprocessing.get_context("forkserver")
106-
return multiprocessing.get_context(methods[0])
103+
104+
msg = "No suitable multiprocessing context found."
105+
raise RuntimeError(msg)
106+
107107

108108
def _external_server_from_env() -> tuple[str, int] | None:
109109
# Set RCS_TEST_HOST and RCS_TEST_PORT to reuse an already running server.
@@ -119,6 +119,7 @@ def _external_server_from_env() -> tuple[str, int] | None:
119119
return HOST, 50055
120120
return None
121121

122+
122123
def test_run_server_starts_and_stops():
123124
# Skip if reusing an external server
124125
ext = _external_server_from_env()
@@ -130,17 +131,24 @@ def test_run_server_starts_and_stops():
130131
server_proc = ctx.Process(target=run_server, args=(HOST, port, err_q))
131132
server_proc.start()
132133
try:
133-
wait_for_port(HOST, port, timeout=120.0, server_proc=server_proc, err_q=err_q)
134+
wait_for_port(HOST, port, timeout=120.0, server_proc=server_proc, err_q=err_q) # type: ignore
134135
assert server_proc.is_alive(), "Server process did not start as expected."
135136
finally:
136137
if server_proc.is_alive():
137138
server_proc.terminate()
138139
server_proc.join(timeout=5)
139140
assert not server_proc.is_alive(), "Server process did not terminate as expected."
140141

142+
141143
class TestRcsClientServer:
144+
client: RcsClient
145+
host: str = HOST
146+
port: int = 0
147+
server_proc = None
148+
err_q: Optional[multiprocessing.Queue] = None
149+
142150
@classmethod
143-
def setup_class(cls):
151+
def setup_class(cls: Type["TestRcsClientServer"]):
144152
ext = _external_server_from_env()
145153
if ext:
146154
cls.host, cls.port = ext
@@ -156,11 +164,11 @@ def setup_class(cls):
156164
cls.server_proc = ctx.Process(target=run_server, args=(cls.host, cls.port, cls.err_q))
157165
cls.server_proc.start()
158166
# Wait until the server is actually listening or fail early if it crashed
159-
wait_for_port(cls.host, cls.port, timeout=180.0, server_proc=cls.server_proc, err_q=cls.err_q)
167+
wait_for_port(cls.host, cls.port, timeout=180.0, server_proc=cls.server_proc, err_q=cls.err_q) # type: ignore
160168
cls.client = RcsClient(host=cls.host, port=cls.port)
161169

162170
@classmethod
163-
def teardown_class(cls):
171+
def teardown_class(cls: Type["TestRcsClientServer"]):
164172
try:
165173
if getattr(cls, "client", None):
166174
cls.client.close()
@@ -188,8 +196,14 @@ def test_unwrapped(self):
188196
_ = self.client.unwrapped
189197

190198
def test_close(self):
191-
self.client.close()
199+
if self.client is not None:
200+
self.client.close()
192201
# Reconnect for further tests
193-
wait_for_port(self.__class__.host, self.__class__.port, timeout=15.0,
194-
server_proc=self.__class__.server_proc, err_q=self.__class__.err_q)
202+
wait_for_port(
203+
self.__class__.host,
204+
self.__class__.port,
205+
timeout=15.0,
206+
server_proc=self.__class__.server_proc, # type: ignore
207+
err_q=self.__class__.err_q,
208+
)
195209
self.__class__.client = RcsClient(host=self.__class__.host, port=self.__class__.port)

0 commit comments

Comments
 (0)