33import threading
44import time
55from dataclasses import dataclass , field
6- from typing import Any , Dict , List , Optional , Sequence , Tuple , TypedDict , Iterator
6+ from typing import Any , Dict , Iterator , List , Optional , Sequence , Tuple , TypedDict
77
88import numpy as np
99
1313 from dynamixel_sdk .packet_handler import PacketHandler
1414 from dynamixel_sdk .port_handler import PortHandler
1515 from dynamixel_sdk .robotis_def import COMM_SUCCESS
16+
1617 HAS_DYNAMIXEL_SDK = True
1718except ImportError :
1819 HAS_DYNAMIXEL_SDK = False
1920
2021try :
2122 from pynput import keyboard
23+
2224 HAS_PYNPUT = True
2325except ImportError :
2426 HAS_PYNPUT = False
2527
26- from rcs .envs .base import ArmWithGripper , ControlMode , RelativeTo , JointsDictType , GripperDictType
28+ from rcs .envs .base import (
29+ ArmWithGripper ,
30+ ControlMode ,
31+ GripperDictType ,
32+ JointsDictType ,
33+ RelativeTo ,
34+ )
2735from rcs .operator .interface import BaseOperator , BaseOperatorConfig , TeleopCommands
2836from rcs .sim .sim import Sim
2937from rcs .utils import SimpleFrameRate
@@ -95,7 +103,7 @@ def __init__(
95103 def write_value_by_name (self , name : str , values : Sequence [int | None ]):
96104 if len (values ) != len (self ._ids ):
97105 raise ValueError (f"The length of { name } must match the number of servos" )
98-
106+
99107 handler = self ._groupSyncWriteHandlers [name ]
100108 value_len = XL330_CONTROL_TABLE [name ]["len" ]
101109
@@ -105,7 +113,7 @@ def write_value_by_name(self, name: str, values: Sequence[int | None]):
105113 continue
106114 param = [(value >> (8 * i )) & 0xFF for i in range (value_len )]
107115 handler .addParam (dxl_id , param )
108-
116+
109117 comm_result = handler .txPacket ()
110118 if comm_result != COMM_SUCCESS :
111119 handler .clearParam ()
@@ -121,7 +129,7 @@ def read_value_by_name(self, name: str) -> List[int]:
121129 comm_result = handler .txRxPacket ()
122130 if comm_result != COMM_SUCCESS :
123131 raise RuntimeError (f"Failed to sync read { name } : { self ._packetHandler .getTxRxResult (comm_result )} " )
124-
132+
125133 values = []
126134 for dxl_id in self ._ids :
127135 if handler .isAvailable (dxl_id , addr , value_len ):
@@ -177,34 +185,24 @@ def close(self):
177185
178186# --- Gello Hardware Interface Logic ---
179187
188+
180189@dataclass
181190class GelloArmConfig :
182191 com_port : str = "/dev/ttyUSB0"
183192 num_arm_joints : int = 7
184193 joint_signs : List [int ] = field (default_factory = lambda : [1 , - 1 , 1 , - 1 , 1 , 1 , 1 ])
185194 gripper : bool = True
186195 gripper_range_rad : List [float ] = field (default_factory = lambda : [2.23 , 3.22 ])
187- assembly_offsets : List [float ] = field (
188- default_factory = lambda : [0.000 , 0.000 , 3.142 , 3.142 , 3.142 , 4.712 , 0.000 ]
189- )
190- dynamixel_kp_p : List [int ] = field (
191- default_factory = lambda : [30 , 60 , 0 , 30 , 0 , 0 , 0 , 50 ]
192- )
193- dynamixel_kp_i : List [int ] = field (
194- default_factory = lambda : [0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 ]
195- )
196- dynamixel_kp_d : List [int ] = field (
197- default_factory = lambda : [250 , 100 , 80 , 60 , 30 , 10 , 5 , 0 ]
198- )
199- dynamixel_torque_enable : List [int ] = field (
200- default_factory = lambda : [0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 ]
201- )
196+ assembly_offsets : List [float ] = field (default_factory = lambda : [0.000 , 0.000 , 3.142 , 3.142 , 3.142 , 4.712 , 0.000 ])
197+ dynamixel_kp_p : List [int ] = field (default_factory = lambda : [30 , 60 , 0 , 30 , 0 , 0 , 0 , 50 ])
198+ dynamixel_kp_i : List [int ] = field (default_factory = lambda : [0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 ])
199+ dynamixel_kp_d : List [int ] = field (default_factory = lambda : [250 , 100 , 80 , 60 , 30 , 10 , 5 , 0 ])
200+ dynamixel_torque_enable : List [int ] = field (default_factory = lambda : [0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 ])
202201 dynamixel_goal_position : List [float ] = field (
203202 default_factory = lambda : [0.0 , 0.0 , 0.0 , - 1.571 , 0.0 , 1.571 , 0.0 , 3.509 ]
204203 )
205204
206205
207-
208206@dataclass
209207class DynamixelControlConfig :
210208 kp_p : List [int ] = field (default_factory = list )
@@ -289,8 +287,11 @@ def __init__(self, config: GelloArmConfig):
289287
290288 @staticmethod
291289 def normalize_joint_positions (raw , offsets , signs ):
292- return (np .mod ((raw - offsets ) * signs - GelloHardware .MID_JOINT_POSITIONS , 2 * np .pi )
293- - np .pi + GelloHardware .MID_JOINT_POSITIONS )
290+ return (
291+ np .mod ((raw - offsets ) * signs - GelloHardware .MID_JOINT_POSITIONS , 2 * np .pi )
292+ - np .pi
293+ + GelloHardware .MID_JOINT_POSITIONS
294+ )
294295
295296 def _initialize_parameters (self ):
296297 for name , value in self ._dynamixel_control_config :
@@ -300,25 +301,29 @@ def _initialize_parameters(self):
300301 def get_joint_and_gripper_positions (self ) -> Tuple [np .ndarray , float ]:
301302 joints_raw = self ._driver .get_joints ()
302303 arm_joints_raw = joints_raw [: self ._num_arm_joints ]
303-
304+
304305 arm_joints_delta = (arm_joints_raw - self ._prev_arm_joints_raw ) * self ._joint_signs
305306 arm_joints = self ._prev_arm_joints + arm_joints_delta
306307 self ._prev_arm_joints = arm_joints .copy ()
307308 self ._prev_arm_joints_raw = arm_joints_raw .copy ()
308309
309310 arm_joints_clipped = np .clip (arm_joints , self .JOINT_POSITION_LIMITS [:, 0 ], self .JOINT_POSITION_LIMITS [:, 1 ])
310-
311+
311312 gripper_pos = 0.0
312313 if self ._gripper :
313314 raw_grp = joints_raw [- 1 ]
314- gripper_pos = (raw_grp - self ._gripper_range_rad [0 ]) / (self ._gripper_range_rad [1 ] - self ._gripper_range_rad [0 ])
315+ gripper_pos = (raw_grp - self ._gripper_range_rad [0 ]) / (
316+ self ._gripper_range_rad [1 ] - self ._gripper_range_rad [0 ]
317+ )
315318 gripper_pos = max (0.0 , min (1.0 , gripper_pos ))
316319
317320 return arm_joints_clipped , gripper_pos
318321
319322 def _goal_position_to_pulses (self , goals ):
320323 arm_goals = np .array (goals [: self ._num_arm_joints ])
321- initial_rotations = np .floor_divide (self ._initial_arm_joints_raw - self ._assembly_offsets - self .MID_JOINT_POSITIONS , 2 * np .pi )
324+ initial_rotations = np .floor_divide (
325+ self ._initial_arm_joints_raw - self ._assembly_offsets - self .MID_JOINT_POSITIONS , 2 * np .pi
326+ )
322327 arm_goals_raw = (initial_rotations * 2 * np .pi + arm_goals + self ._assembly_offsets ) * self ._joint_signs + np .pi
323328 goals_raw = np .append (arm_goals_raw , goals [- 1 ]) if self ._gripper else arm_goals_raw
324329 return [self ._driver ._rad_to_pulses (rad ) for rad in goals_raw ]
@@ -334,8 +339,6 @@ def close(self):
334339# --- RCS Operator Implementation ---
335340
336341
337-
338-
339342class GelloOperator (BaseOperator ):
340343 control_mode = (ControlMode .JOINTS , RelativeTo .NONE )
341344
@@ -353,7 +356,7 @@ def __init__(self, config: GelloConfig, sim: Sim | None = None):
353356 self ._last_joints = {name : None for name in self .controller_names }
354357 self ._last_gripper = {name : 1.0 for name in self .controller_names }
355358 self ._hws : Dict [str , GelloHardware ] = {}
356-
359+
357360 if HAS_PYNPUT :
358361 self ._listener = keyboard .Listener (on_press = self ._on_press )
359362 self ._listener .start ()
@@ -428,8 +431,9 @@ def close(self):
428431 if self .is_alive () and threading .current_thread () != self :
429432 self .join (timeout = 1.0 )
430433
434+
431435@dataclass (kw_only = True )
432436class GelloConfig (BaseOperatorConfig ):
433437 operator_class = GelloOperator
434438 # Dictionary for multi-arm setups: {"left": GelloArmConfig(...), "right": GelloArmConfig(...)}
435- arms : Dict [str , GelloArmConfig ] = field (default_factory = lambda : {"right" : GelloArmConfig ()})
439+ arms : Dict [str , GelloArmConfig ] = field (default_factory = lambda : {"right" : GelloArmConfig ()})
0 commit comments