|
| 1 | +from typing import Optional, Any |
| 2 | +from control_msgs.msg import JointTrajectoryControllerState |
| 3 | +from arktypes import joint_group_command_t |
| 4 | +from ark.tools.ros_bridge.ark_ros2_bridge import ArkRos2Bridge |
| 5 | +import os |
| 6 | +import yaml |
| 7 | + |
| 8 | + |
| 9 | +class MoveIt2Bridge(ArkRos2Bridge): |
| 10 | + """Bridge ROS2 JointTrajectoryControllerState -> Ark joint_group_command_t.""" |
| 11 | + |
| 12 | + def __init__( |
| 13 | + self, |
| 14 | + ros_controller: str, |
| 15 | + ark_robot_name: str, |
| 16 | + mapping_table: Optional[dict[str, Any]] = None, |
| 17 | + global_config: Optional[dict[str, Any]] = None, |
| 18 | + ): |
| 19 | + sim = self.is_sim_enabled(global_config=global_config) |
| 20 | + |
| 21 | + # Build topic/channel names |
| 22 | + if sim: |
| 23 | + ros_topic = f"/{ros_controller}_controller/state" |
| 24 | + ark_channel = f"{ark_robot_name}/joint_group_command/sim" |
| 25 | + else: |
| 26 | + ros_topic = f"/{ros_controller}_controller/state" |
| 27 | + ark_channel = f"{ark_robot_name}/joint_group_command" |
| 28 | + |
| 29 | + # Base mapping (MoveIt2 state -> Ark command) |
| 30 | + moveit2_mapping_table = { |
| 31 | + "ros2_to_ark": [ |
| 32 | + { |
| 33 | + "ros2_channel": ros_topic, |
| 34 | + "ros2_type": JointTrajectoryControllerState, |
| 35 | + "ark_channel": ark_channel, |
| 36 | + "ark_type": joint_group_command_t, |
| 37 | + "translator_callback": self.moveit2_translator, |
| 38 | + } |
| 39 | + ], |
| 40 | + "ark_to_ros": [], |
| 41 | + } |
| 42 | + |
| 43 | + # Merge in extra mappings if provided |
| 44 | + if mapping_table: |
| 45 | + moveit2_mapping_table["ros2_to_ark"].extend(mapping_table.get("ros2_to_ark", [])) |
| 46 | + moveit2_mapping_table["ark_to_ros"].extend(mapping_table.get("ark_to_ros", [])) |
| 47 | + |
| 48 | + # Init parent with the final mapping |
| 49 | + super().__init__(mapping_table=moveit2_mapping_table, global_config=global_config) |
| 50 | + |
| 51 | + def moveit2_translator(self, ros_msg: JointTrajectoryControllerState, ros_channel: str, ros_type: type[JointTrajectoryControllerState], ark_channel: str, ark_type: type[joint_group_command_t]): |
| 52 | + """Convert joint state positions into Ark command.""" |
| 53 | + msg = joint_group_command_t() |
| 54 | + msg.name = "arm" |
| 55 | + msg.n = len(ros_msg.actual.positions) |
| 56 | + msg.cmd = list(ros_msg.actual.positions) |
| 57 | + return msg |
| 58 | + |
| 59 | + def is_sim_enabled(self, global_config: Any) -> Optional[bool]: |
| 60 | + """ |
| 61 | + Check if the key 'sim' is True or False in a dict or YAML file. |
| 62 | +
|
| 63 | + Args: |
| 64 | + global_config (Any): Global configuration dictionary or YAML file path. |
| 65 | +
|
| 66 | + Returns: |
| 67 | + bool | None: True/False if 'sim' key exists, None if missing. |
| 68 | + """ |
| 69 | + data = None |
| 70 | + |
| 71 | + if isinstance(global_config, dict): |
| 72 | + data = global_config |
| 73 | + elif isinstance(global_config, str) and os.path.isfile(global_config): |
| 74 | + with open(global_config, "r") as f: |
| 75 | + data = yaml.safe_load(f) |
| 76 | + else: |
| 77 | + raise ValueError("Source must be a dict or a valid YAML file path.") |
| 78 | + |
| 79 | + if not isinstance(data, dict): |
| 80 | + raise ValueError("YAML/Dict must represent a dictionary at top-level.") |
| 81 | + |
| 82 | + return data.get("sim", None) |
0 commit comments