55from enum import Enum , auto
66from typing import Annotated , Any , ClassVar , Literal , TypeAlias , cast
77
8- from greenlet import getcurrent , greenlet
98import gymnasium as gym
109import numpy as np
10+ from greenlet import getcurrent , greenlet
1111from rcs ._core .common import Hand , RobotPlatform
1212from rcs ._core .sim import SimRobot
1313from rcs .camera .interface import BaseCameraSet
2222 get_space ,
2323 get_space_keys ,
2424)
25+ from rcs .utils import SimpleFrameRate
2526
2627from rcs import common
2728from rcs import sim as simulation
28- from rcs .utils import SimpleFrameRate
2929
3030_logger = logging .getLogger (__name__ )
3131
@@ -200,6 +200,7 @@ def get_home_position(robot: common.Robot) -> np.ndarray:
200200 """Returns the home position of the robot."""
201201 return common .robots_meta_config (robot .get_config ().robot_type ).q_home
202202
203+
203204class BaseEnv (gym .Env ):
204205 PLATFORM : RobotPlatform
205206
@@ -213,7 +214,6 @@ def reset(
213214 return {}, {}
214215
215216
216-
217217class HardwareEnv (BaseEnv ):
218218 PLATFORM = RobotPlatform .HARDWARE
219219
@@ -247,7 +247,6 @@ def step_sim(self):
247247 def apply_sim_state (self ):
248248 self .sim .step (1 )
249249
250-
251250 def reset (
252251 self , * , seed : int | None = None , options : dict [str , Any ] | None = None
253252 ) -> tuple [dict [str , Any ], dict [str , Any ]]:
@@ -257,22 +256,24 @@ def reset(
257256 self .apply_sim_state ()
258257 return super ().reset (seed = seed , options = options )
259258
259+
260260class CoverWrapper (gym .Wrapper ):
261261 """The CoverWrapper must be the last wrapper on the stack
262-
262+
263263 Only strictly necessary for simulator environments, but also works for hardware environments.
264264 It takes care of resetting the simulator before any other wrapper resets its state, already assuming
265265 a fresh simulator state.
266266 """
267- def reset (self , * , seed : int | None = None , options : dict [str , Any ] | None = None ) -> tuple [dict [str , Any ], dict [str , Any ]]:
267+
268+ def reset (
269+ self , * , seed : int | None = None , options : dict [str , Any ] | None = None
270+ ) -> tuple [dict [str , Any ], dict [str , Any ]]:
268271 if self .env .get_wrapper_attr ("PLATFORM" ) == RobotPlatform .SIMULATION :
269272 sim = cast (simulation .Sim , self .get_wrapper_attr ("sim" ))
270273 sim .reset ()
271274 return super ().reset (seed = seed , options = options )
272275
273276
274-
275-
276277class RobotWrapper (ActObsInfoWrapper ):
277278 """Gym Wrapper for a single robot arm.
278279
@@ -333,7 +334,6 @@ def get_robot_obs(self) -> ArmObsType:
333334 xyzrpy = self .robot .get_cartesian_position ().xyzrpy (),
334335 )
335336
336-
337337 def action (self , action : dict [str , Any ]) -> dict [str , Any ]:
338338 if (
339339 self .get_base_control_mode () == ControlMode .CARTESIAN_TQuat
@@ -378,7 +378,6 @@ def observation(self, observation: dict, info: dict[str, Any]) -> tuple[dict[str
378378 observation .update (self .get_robot_obs ())
379379 return observation , info
380380
381-
382381 def reset (
383382 self , * , seed : int | None = None , options : dict [str , Any ] | None = None
384383 ) -> tuple [dict [str , Any ], dict [str , Any ]]:
@@ -396,7 +395,7 @@ def close(self):
396395class MultiRobotWrapper (gym .Env ):
397396 """Wraps a dictionary of single robot environments to allow for multi robot control.
398397
399- Env1 Env2 Env3
398+ Env1 Env2 Env3
400399 | | |
401400 ----------------
402401 MultiRobotWrapper
@@ -423,18 +422,21 @@ def __init__(
423422 self .PLATFORM = self .envs [env ].get_wrapper_attr ("PLATFORM" )
424423 self .lead_env = self .envs [env ].unwrapped
425424 else :
426- assert self .envs [env ].get_wrapper_attr ("PLATFORM" ) == self .PLATFORM , "all envs must have the same platform!"
425+ assert (
426+ self .envs [env ].get_wrapper_attr ("PLATFORM" ) == self .PLATFORM
427+ ), "all envs must have the same platform!"
427428 self ._runs_in_sim = self .PLATFORM == RobotPlatform .SIMULATION
428429 if self ._runs_in_sim :
429430 self ._inject_main_greenlet ()
430431 assert isinstance (self .lead_env , SimEnv ), "something is wrong with the env, the base should be type SimEnv"
431432 self .sim = self .lead_env .get_wrapper_attr ("sim" )
432433
433-
434434 def _inject_main_greenlet (self ):
435435 main_gr = getcurrent ()
436436 for env_item in self .envs .values ():
437- assert isinstance (env_item .unwrapped , SimEnv ), "something is wrong with the env, the base should be type SimEnv"
437+ assert isinstance (
438+ env_item .unwrapped , SimEnv
439+ ), "something is wrong with the env, the base should be type SimEnv"
438440 env_item .unwrapped .main_greenlet = main_gr
439441
440442 def _translate_pose (self , key , dic , to_world = True ):
@@ -472,7 +474,6 @@ def make_step_gr(env_to_step):
472474 assert isinstance (self .lead_env , SimEnv )
473475 self .lead_env .step_sim ()
474476
475-
476477 # follows gym env by combinding a dict of envs into a single env
477478 obs = {}
478479 reward = 0.0
@@ -481,7 +482,6 @@ def make_step_gr(env_to_step):
481482 info = {}
482483 for key , env in self .envs .items ():
483484
484-
485485 if self ._runs_in_sim :
486486 # SIM path: 3. UP: Gather observations
487487 # Resume robot greenlet. It returns the step results.
@@ -491,8 +491,6 @@ def make_step_gr(env_to_step):
491491 act = self ._translate_pose (key , action [key ], to_world = False )
492492 ob , r , t , tr , info [key ] = env .step (act )
493493
494-
495-
496494 obs [key ] = self ._translate_pose (key , ob , to_world = True )
497495 reward += float (r )
498496 terminated = terminated or t
@@ -510,11 +508,11 @@ def reset(
510508 seed_ = {key : seed for key in self .envs } if seed is not None else {key : None for key in self .envs }
511509 options_ = options if options is not None else {key : None for key in self .envs }
512510
513-
514511 reset_greenlets = {}
515512 if self ._runs_in_sim :
516513 # SIM path: 1. DOWN: Reset each robot
517514 for key , env in self .envs .items ():
515+
518516 def make_reset_gr (env_to_reset , s , o ):
519517 return greenlet (lambda : env_to_reset .reset (seed = s , options = o ))
520518
@@ -526,7 +524,6 @@ def make_reset_gr(env_to_reset, s, o):
526524 assert isinstance (self .lead_env , SimEnv )
527525 self .lead_env .apply_sim_state ()
528526
529-
530527 for key , env in self .envs .items ():
531528 if self ._runs_in_sim :
532529 # SIM path: 3. UP: Gather initial obs
@@ -538,7 +535,6 @@ def make_reset_gr(env_to_reset, s, o):
538535 obs [key ] = self ._translate_pose (key , ob , to_world = True )
539536 info [key ] = i
540537
541-
542538 return obs , info
543539
544540 def get_wrapper_attr (self , name : str ) -> Any :
0 commit comments