From e44abedefe73c48e4ad0d4763d208fa00082cd91 Mon Sep 17 00:00:00 2001 From: Xinjie Yao Date: Wed, 20 May 2026 17:53:58 -0700 Subject: [PATCH 1/8] wip --- isaaclab_arena/environments/env_graph_spec.py | 4 ++++ 1 file changed, 4 insertions(+) create mode 100644 isaaclab_arena/environments/env_graph_spec.py diff --git a/isaaclab_arena/environments/env_graph_spec.py b/isaaclab_arena/environments/env_graph_spec.py new file mode 100644 index 000000000..fee3a6a9f --- /dev/null +++ b/isaaclab_arena/environments/env_graph_spec.py @@ -0,0 +1,4 @@ +# Copyright (c) 2025-2026, The Isaac Lab Arena Project Developers (https://github.com/isaac-sim/IsaacLab-Arena/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 From b564cdf7e66f5410bd3b6a528091111d4023296e Mon Sep 17 00:00:00 2001 From: Xinjie Yao Date: Wed, 20 May 2026 18:27:46 -0700 Subject: [PATCH 2/8] update --- isaaclab_arena/environments/__init__.py | 18 ++ isaaclab_arena/environments/env_graph_spec.py | 176 ++++++++++++++++++ .../pick_and_place_maple_table_env_graph.yaml | 119 ++++++++++++ isaaclab_arena/tests/test_env_graph_spec.py | 58 ++++++ 4 files changed, 371 insertions(+) create mode 100644 isaaclab_arena/tests/test_data/pick_and_place_maple_table_env_graph.yaml create mode 100644 isaaclab_arena/tests/test_env_graph_spec.py diff --git a/isaaclab_arena/environments/__init__.py b/isaaclab_arena/environments/__init__.py index fee3a6a9f..1883e8cba 100644 --- a/isaaclab_arena/environments/__init__.py +++ b/isaaclab_arena/environments/__init__.py @@ -2,3 +2,21 @@ # All rights reserved. # # SPDX-License-Identifier: Apache-2.0 + +from isaaclab_arena.environments.env_graph_spec import ( + EnvGraphConstraintSpec, + EnvGraphEdgesSpec, + EnvGraphNodeSpec, + EnvGraphSpec, + EnvGraphStateSpec, + EnvGraphTaskSpec, +) + +__all__ = [ + "EnvGraphConstraintSpec", + "EnvGraphEdgesSpec", + "EnvGraphNodeSpec", + "EnvGraphSpec", + "EnvGraphStateSpec", + "EnvGraphTaskSpec", +] diff --git a/isaaclab_arena/environments/env_graph_spec.py b/isaaclab_arena/environments/env_graph_spec.py index fee3a6a9f..146da1f3c 100644 --- a/isaaclab_arena/environments/env_graph_spec.py +++ b/isaaclab_arena/environments/env_graph_spec.py @@ -2,3 +2,179 @@ # All rights reserved. # # SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +import yaml +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any + + +@dataclass(frozen=True) +class EnvGraphNodeSpec: + """Node in an environment graph. Could be an object, an object reference, an embodiment, a background, etc.""" + + id: str + name: str + type: str + parent: str | None = None + prim_path: str | None = None + object_type: str | None = None + params: dict[str, Any] = field(default_factory=dict) + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> EnvGraphNodeSpec: + assert isinstance(data, dict), f"Node spec must be a dict, got {type(data).__name__}" + return cls( + id=_required_str(data, "id"), + name=_required_str(data, "name"), + type=_required_str(data, "type"), + parent=_optional_str(data, "parent"), + prim_path=_optional_str(data, "prim_path"), + object_type=_optional_str(data, "object_type"), + params=_optional_dict(data, "params"), + ) + + +@dataclass(frozen=True) +class EnvGraphConstraintSpec: + """Constraint edge in an environment graph state spec. It defines a spatial or task constraint between two nodes.""" + + id: str + type: str + parent: str | None = None + child: str | None = None + params: dict[str, Any] = field(default_factory=dict) + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> EnvGraphConstraintSpec: + assert isinstance(data, dict), f"Constraint spec must be a dict, got {type(data).__name__}" + return cls( + id=_required_str(data, "id"), + type=_required_str(data, "type"), + parent=_optional_str(data, "parent"), + child=_optional_str(data, "child"), + params=_optional_dict(data, "params"), + ) + + +@dataclass(frozen=True) +class EnvGraphEdgesSpec: + """Grouped spatial and task constraints.""" + + spatial_constraints: list[EnvGraphConstraintSpec] = field(default_factory=list) + task_constraints: list[EnvGraphConstraintSpec] = field(default_factory=list) + + @classmethod + def from_dict(cls, data: dict[str, Any] | None) -> EnvGraphEdgesSpec: + data = data or {} + assert isinstance(data, dict), f"Edges spec must be a dict, got {type(data).__name__}" + return cls( + spatial_constraints=[ + EnvGraphConstraintSpec.from_dict(edge) for edge in data.get("spatial_constraints", []) + ], + task_constraints=[EnvGraphConstraintSpec.from_dict(edge) for edge in data.get("task_constraints", [])], + ) + + +@dataclass(frozen=True) +class EnvGraphStateSpec: + """Snapshots of the environment state in the graph. Could be an initial, or intermediate, or final state.""" + + id: str + name: str + edges: EnvGraphEdgesSpec = field(default_factory=EnvGraphEdgesSpec) + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> EnvGraphStateSpec: + assert isinstance(data, dict), f"State spec must be a dict, got {type(data).__name__}" + return cls( + id=_required_str(data, "id"), + name=_required_str(data, "name"), + edges=EnvGraphEdgesSpec.from_dict(data.get("edges")), + ) + + +@dataclass(frozen=True) +class EnvGraphTaskSpec: + """Task entry in an environment graph. It defines the task to be completed in the environment.""" + + id: str + name: str + type: str + state_specs: dict[str, str] = field(default_factory=dict) + task_args: dict[str, Any] = field(default_factory=dict) + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> EnvGraphTaskSpec: + assert isinstance(data, dict), f"Task spec must be a dict, got {type(data).__name__}" + state_specs = data.get("state_specs", {}) + assert isinstance(state_specs, dict), "Task state_specs must be a dict" + return cls( + id=_required_str(data, "id"), + name=_required_str(data, "name"), + type=_required_str(data, "type"), + state_specs={str(k): str(v) for k, v in state_specs.items()}, + task_args=_optional_dict(data, "task_args"), + ) + + +@dataclass(frozen=True) +class EnvGraphSpec: + """Typed representation of an environment graph YAML file. It defines the nodes, tasks, and states of the environment graph.""" + + name: str + nodes: list[EnvGraphNodeSpec] = field(default_factory=list) + tasks: list[EnvGraphTaskSpec] = field(default_factory=list) + state_specs: list[EnvGraphStateSpec] = field(default_factory=list) + + @classmethod + def from_yaml(cls, path: str | Path) -> EnvGraphSpec: + """Load an environment graph spec from a YAML file.""" + with Path(path).open("r", encoding="utf-8") as f: + return cls.from_dict(yaml.safe_load(f)) + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> EnvGraphSpec: + """Build an environment graph spec from parsed YAML data.""" + assert isinstance(data, dict), f"Env graph spec must be a dict, got {type(data).__name__}" + return cls( + name=_required_str(data, "name"), + nodes=[EnvGraphNodeSpec.from_dict(node) for node in data.get("nodes", [])], + tasks=[EnvGraphTaskSpec.from_dict(task) for task in data.get("tasks", [])], + state_specs=[EnvGraphStateSpec.from_dict(state_spec) for state_spec in data.get("state_specs", [])], + ) + + @property + def nodes_by_id(self) -> dict[str, EnvGraphNodeSpec]: + """Return nodes keyed by id.""" + return {node.id: node for node in self.nodes} + + @property + def tasks_by_id(self) -> dict[str, EnvGraphTaskSpec]: + """Return tasks keyed by id.""" + return {task.id: task for task in self.tasks} + + @property + def state_specs_by_id(self) -> dict[str, EnvGraphStateSpec]: + """Return state specs keyed by id.""" + return {state_spec.id: state_spec for state_spec in self.state_specs} + + +def _required_str(data: dict[str, Any], key: str) -> str: + value = data.get(key) + assert isinstance(value, str) and value, f"Missing required string field '{key}'" + return value + + +def _optional_str(data: dict[str, Any], key: str) -> str | None: + value = data.get(key) + assert value is None or isinstance(value, str), f"Optional field '{key}' must be a string when set" + return value + + +def _optional_dict(data: dict[str, Any], key: str) -> dict[str, Any]: + value = data.get(key, {}) + assert value is None or isinstance(value, dict), f"Optional field '{key}' must be a dict when set" + return dict(value or {}) diff --git a/isaaclab_arena/tests/test_data/pick_and_place_maple_table_env_graph.yaml b/isaaclab_arena/tests/test_data/pick_and_place_maple_table_env_graph.yaml new file mode 100644 index 000000000..8d1aa8369 --- /dev/null +++ b/isaaclab_arena/tests/test_data/pick_and_place_maple_table_env_graph.yaml @@ -0,0 +1,119 @@ +# Copyright (c) 2026, The Isaac Lab Arena Project Developers (https://github.com/isaac-sim/IsaacLab-Arena/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 + +name: pick_and_place_maple_table_default + +nodes: + - id: droid_abs_joint_pos + name: droid_abs_joint_pos + type: embodiment + + - id: maple_table_robolab + name: maple_table_robolab + type: background + + - id: maple_table_robolab_table + name: table + type: object_reference + parent: maple_table_robolab + prim_path: "{ENV_REGEX_NS}/maple_table_robolab/table" + object_type: rigid + + - id: rubiks_cube_hot3d_robolab + name: rubiks_cube_hot3d_robolab + type: rigid_object + + - id: bowl_ycb_robolab + name: bowl_ycb_robolab + type: rigid_object + +tasks: + - id: pick_and_place_0 + name: pick_and_place_0 + type: pick_and_place + state_specs: + initial: state_spec_0 + final: state_spec_1 + task_args: + object: rubiks_cube_hot3d_robolab + destination: bowl_ycb_robolab + background: maple_table_robolab + episode_length_s: 20.0 + +state_specs: + - id: state_spec_0 + name: state_spec_0 + edges: + spatial_constraints: + - id: maple_table_robolab_table_is_anchor + type: is_anchor + parent: maple_table_robolab_table + + - id: rubiks_cube_hot3d_robolab_on_maple_table_robolab_table + type: "on" + parent: maple_table_robolab_table + child: rubiks_cube_hot3d_robolab + + - id: rubiks_cube_hot3d_robolab_position_limits + type: position_limits + child: rubiks_cube_hot3d_robolab + params: + x_min: 0.55 + x_max: 0.70 + y_min: -0.40 + y_max: -0.10 + + - id: bowl_ycb_robolab_on_maple_table_robolab_table + type: "on" + parent: maple_table_robolab_table + child: bowl_ycb_robolab + + - id: bowl_ycb_robolab_position_limits + type: position_limits + child: bowl_ycb_robolab + params: + x_min: 0.55 + x_max: 0.70 + y_min: -0.40 + y_max: -0.10 + + task_constraints: + - id: droid_reach_rubiks_cube_hot3d_robolab + type: "reach" + parent: droid_abs_joint_pos + child: rubiks_cube_hot3d_robolab + + - id: state_spec_1 + name: state_spec_1 + edges: + spatial_constraints: + - id: maple_table_robolab_table_is_anchor + type: is_anchor + parent: maple_table_robolab_table + + - id: bowl_ycb_robolab_on_maple_table_robolab_table + type: "on" + parent: maple_table_robolab_table + child: bowl_ycb_robolab + + - id: bowl_ycb_robolab_position_limits + type: position_limits + child: bowl_ycb_robolab + params: + x_min: 0.55 + x_max: 0.70 + y_min: -0.40 + y_max: -0.10 + + - id: rubiks_cube_hot3d_robolab_in_bowl_ycb_robolab + type: "in" + parent: bowl_ycb_robolab + child: rubiks_cube_hot3d_robolab + + task_constraints: + - id: droid_reach_bowl_ycb_robolab + type: "reach" + parent: droid_abs_joint_pos + child: bowl_ycb_robolab diff --git a/isaaclab_arena/tests/test_env_graph_spec.py b/isaaclab_arena/tests/test_env_graph_spec.py new file mode 100644 index 000000000..bc3f582bb --- /dev/null +++ b/isaaclab_arena/tests/test_env_graph_spec.py @@ -0,0 +1,58 @@ +# Copyright (c) 2026, The Isaac Lab Arena Project Developers (https://github.com/isaac-sim/IsaacLab-Arena/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 + +from pathlib import Path + +from isaaclab_arena.environments.env_graph_spec import EnvGraphSpec, EnvGraphStateSpec + +TEST_DATA_DIR = Path(__file__).parent / "test_data" + + +def test_env_graph_spec_loads_pick_and_place_yaml(): + spec = EnvGraphSpec.from_yaml(TEST_DATA_DIR / "pick_and_place_maple_table_env_graph.yaml") + + assert spec.name == "pick_and_place_maple_table_default" + assert len(spec.nodes) == 5 + assert len(spec.tasks) == 1 + assert len(spec.state_specs) == 2 + + table = spec.nodes_by_id["maple_table_robolab_table"] + assert table.type == "object_reference" + assert table.parent == "maple_table_robolab" + assert table.prim_path == "{ENV_REGEX_NS}/maple_table_robolab/table" + assert table.object_type == "rigid" + + task = spec.tasks_by_id["pick_and_place_0"] + assert task.state_specs == {"initial": "state_spec_0", "final": "state_spec_1"} + assert task.task_args["object"] == "rubiks_cube_hot3d_robolab" + assert task.task_args["destination"] == "bowl_ycb_robolab" + assert task.task_args["episode_length_s"] == 20.0 + + initial_state = spec.state_specs_by_id["state_spec_0"] + assert isinstance(initial_state, EnvGraphStateSpec) + assert len(initial_state.edges.spatial_constraints) == 5 + assert len(initial_state.edges.task_constraints) == 1 + + cube_limits = initial_state.edges.spatial_constraints[2] + assert cube_limits.type == "position_limits" + assert cube_limits.child == "rubiks_cube_hot3d_robolab" + assert cube_limits.params == { + "x_min": 0.55, + "x_max": 0.70, + "y_min": -0.40, + "y_max": -0.10, + } + + final_state = spec.state_specs_by_id["state_spec_1"] + assert isinstance(final_state, EnvGraphStateSpec) + in_constraint = final_state.edges.spatial_constraints[3] + assert in_constraint.type == "in" + assert in_constraint.parent == "bowl_ycb_robolab" + assert in_constraint.child == "rubiks_cube_hot3d_robolab" + + reach_constraint = final_state.edges.task_constraints[0] + assert reach_constraint.type == "reach" + assert reach_constraint.parent == "droid_abs_joint_pos" + assert reach_constraint.child == "bowl_ycb_robolab" From 9f6825aeedc137e8e6e700cce3c5bbfc84350f06 Mon Sep 17 00:00:00 2001 From: Xinjie Yao Date: Wed, 20 May 2026 22:55:23 -0700 Subject: [PATCH 3/8] address autoreview --- isaaclab_arena/environments/env_graph_spec.py | 217 +++++++++++------- isaaclab_arena/tests/test_env_graph_spec.py | 67 ++++++ 2 files changed, 207 insertions(+), 77 deletions(-) diff --git a/isaaclab_arena/environments/env_graph_spec.py b/isaaclab_arena/environments/env_graph_spec.py index 146da1f3c..63fc76f72 100644 --- a/isaaclab_arena/environments/env_graph_spec.py +++ b/isaaclab_arena/environments/env_graph_spec.py @@ -3,17 +3,21 @@ # # SPDX-License-Identifier: Apache-2.0 -from __future__ import annotations - -import yaml from dataclasses import dataclass, field from pathlib import Path -from typing import Any +from typing import Any, Callable, TypeVar + +import yaml + +T = TypeVar("T") @dataclass(frozen=True) class EnvGraphNodeSpec: - """Node in an environment graph. Could be an object, an object reference, an embodiment, a background, etc.""" + """Node in an environment graph. + + Could be an object, an object reference, an embodiment, a background, etc. + """ id: str name: str @@ -23,23 +27,13 @@ class EnvGraphNodeSpec: object_type: str | None = None params: dict[str, Any] = field(default_factory=dict) - @classmethod - def from_dict(cls, data: dict[str, Any]) -> EnvGraphNodeSpec: - assert isinstance(data, dict), f"Node spec must be a dict, got {type(data).__name__}" - return cls( - id=_required_str(data, "id"), - name=_required_str(data, "name"), - type=_required_str(data, "type"), - parent=_optional_str(data, "parent"), - prim_path=_optional_str(data, "prim_path"), - object_type=_optional_str(data, "object_type"), - params=_optional_dict(data, "params"), - ) - @dataclass(frozen=True) class EnvGraphConstraintSpec: - """Constraint edge in an environment graph state spec. It defines a spatial or task constraint between two nodes.""" + """Constraint edge in an environment graph state spec. + + It defines a spatial or task constraint between two nodes. + """ id: str type: str @@ -47,17 +41,6 @@ class EnvGraphConstraintSpec: child: str | None = None params: dict[str, Any] = field(default_factory=dict) - @classmethod - def from_dict(cls, data: dict[str, Any]) -> EnvGraphConstraintSpec: - assert isinstance(data, dict), f"Constraint spec must be a dict, got {type(data).__name__}" - return cls( - id=_required_str(data, "id"), - type=_required_str(data, "type"), - parent=_optional_str(data, "parent"), - child=_optional_str(data, "child"), - params=_optional_dict(data, "params"), - ) - @dataclass(frozen=True) class EnvGraphEdgesSpec: @@ -66,39 +49,22 @@ class EnvGraphEdgesSpec: spatial_constraints: list[EnvGraphConstraintSpec] = field(default_factory=list) task_constraints: list[EnvGraphConstraintSpec] = field(default_factory=list) - @classmethod - def from_dict(cls, data: dict[str, Any] | None) -> EnvGraphEdgesSpec: - data = data or {} - assert isinstance(data, dict), f"Edges spec must be a dict, got {type(data).__name__}" - return cls( - spatial_constraints=[ - EnvGraphConstraintSpec.from_dict(edge) for edge in data.get("spatial_constraints", []) - ], - task_constraints=[EnvGraphConstraintSpec.from_dict(edge) for edge in data.get("task_constraints", [])], - ) - @dataclass(frozen=True) class EnvGraphStateSpec: - """Snapshots of the environment state in the graph. Could be an initial, or intermediate, or final state.""" + """Snapshot of the environment state in the graph. + + Could be an initial, intermediate, or final state. + """ id: str name: str edges: EnvGraphEdgesSpec = field(default_factory=EnvGraphEdgesSpec) - @classmethod - def from_dict(cls, data: dict[str, Any]) -> EnvGraphStateSpec: - assert isinstance(data, dict), f"State spec must be a dict, got {type(data).__name__}" - return cls( - id=_required_str(data, "id"), - name=_required_str(data, "name"), - edges=EnvGraphEdgesSpec.from_dict(data.get("edges")), - ) - @dataclass(frozen=True) class EnvGraphTaskSpec: - """Task entry in an environment graph. It defines the task to be completed in the environment.""" + """Task entry in an environment graph.""" id: str name: str @@ -106,23 +72,10 @@ class EnvGraphTaskSpec: state_specs: dict[str, str] = field(default_factory=dict) task_args: dict[str, Any] = field(default_factory=dict) - @classmethod - def from_dict(cls, data: dict[str, Any]) -> EnvGraphTaskSpec: - assert isinstance(data, dict), f"Task spec must be a dict, got {type(data).__name__}" - state_specs = data.get("state_specs", {}) - assert isinstance(state_specs, dict), "Task state_specs must be a dict" - return cls( - id=_required_str(data, "id"), - name=_required_str(data, "name"), - type=_required_str(data, "type"), - state_specs={str(k): str(v) for k, v in state_specs.items()}, - task_args=_optional_dict(data, "task_args"), - ) - @dataclass(frozen=True) class EnvGraphSpec: - """Typed representation of an environment graph YAML file. It defines the nodes, tasks, and states of the environment graph.""" + """Typed representation of an environment graph YAML file.""" name: str nodes: list[EnvGraphNodeSpec] = field(default_factory=list) @@ -130,38 +83,107 @@ class EnvGraphSpec: state_specs: list[EnvGraphStateSpec] = field(default_factory=list) @classmethod - def from_yaml(cls, path: str | Path) -> EnvGraphSpec: - """Load an environment graph spec from a YAML file.""" + def from_yaml(cls, path: str | Path) -> "EnvGraphSpec": with Path(path).open("r", encoding="utf-8") as f: return cls.from_dict(yaml.safe_load(f)) @classmethod - def from_dict(cls, data: dict[str, Any]) -> EnvGraphSpec: - """Build an environment graph spec from parsed YAML data.""" - assert isinstance(data, dict), f"Env graph spec must be a dict, got {type(data).__name__}" + def from_dict(cls, data: dict[str, Any]) -> "EnvGraphSpec": + data = _as_dict(data, "Env graph spec") + nodes = _parse_list(data, "nodes", _parse_node) + tasks = _parse_list(data, "tasks", _parse_task) + state_specs = _parse_list(data, "state_specs", _parse_state_spec) + + _assert_unique_ids(nodes, "node") + _assert_unique_ids(tasks, "task") + _assert_unique_ids(state_specs, "state spec") + _assert_references_exist(nodes, tasks, state_specs) + return cls( name=_required_str(data, "name"), - nodes=[EnvGraphNodeSpec.from_dict(node) for node in data.get("nodes", [])], - tasks=[EnvGraphTaskSpec.from_dict(task) for task in data.get("tasks", [])], - state_specs=[EnvGraphStateSpec.from_dict(state_spec) for state_spec in data.get("state_specs", [])], + nodes=nodes, + tasks=tasks, + state_specs=state_specs, ) @property def nodes_by_id(self) -> dict[str, EnvGraphNodeSpec]: - """Return nodes keyed by id.""" return {node.id: node for node in self.nodes} @property def tasks_by_id(self) -> dict[str, EnvGraphTaskSpec]: - """Return tasks keyed by id.""" return {task.id: task for task in self.tasks} @property def state_specs_by_id(self) -> dict[str, EnvGraphStateSpec]: - """Return state specs keyed by id.""" return {state_spec.id: state_spec for state_spec in self.state_specs} +def _parse_node(data: Any) -> EnvGraphNodeSpec: + data = _as_dict(data, "Node spec") + return EnvGraphNodeSpec( + id=_required_str(data, "id"), + name=_required_str(data, "name"), + type=_required_str(data, "type"), + parent=_optional_str(data, "parent"), + prim_path=_optional_str(data, "prim_path"), + object_type=_optional_str(data, "object_type"), + params=_optional_dict(data, "params"), + ) + + +def _parse_constraint(data: Any) -> EnvGraphConstraintSpec: + data = _as_dict(data, "Constraint spec") + return EnvGraphConstraintSpec( + id=_required_str(data, "id"), + type=_required_str(data, "type"), + parent=_optional_str(data, "parent"), + child=_optional_str(data, "child"), + params=_optional_dict(data, "params"), + ) + + +def _parse_edges(data: dict[str, Any] | None) -> EnvGraphEdgesSpec: + if data is None: + data = {} + data = _as_dict(data, "Edges spec") + return EnvGraphEdgesSpec( + spatial_constraints=_parse_list(data, "spatial_constraints", _parse_constraint), + task_constraints=_parse_list(data, "task_constraints", _parse_constraint), + ) + + +def _parse_state_spec(data: Any) -> EnvGraphStateSpec: + data = _as_dict(data, "State spec") + return EnvGraphStateSpec( + id=_required_str(data, "id"), + name=_required_str(data, "name"), + edges=_parse_edges(data.get("edges")), + ) + + +def _parse_task(data: Any) -> EnvGraphTaskSpec: + data = _as_dict(data, "Task spec") + return EnvGraphTaskSpec( + id=_required_str(data, "id"), + name=_required_str(data, "name"), + type=_required_str(data, "type"), + state_specs=_optional_str_map(data, "state_specs"), + task_args=_optional_dict(data, "task_args"), + ) + + +def _as_dict(data: Any, spec_name: str) -> dict[str, Any]: + assert isinstance(data, dict), f"{spec_name} must be a dict, got {type(data).__name__}" + return data + + +def _parse_list(data: dict[str, Any], key: str, parser: Callable[[Any], T]) -> list[T]: + values = data.get(key, []) + assert isinstance(values, list), f"Field '{key}' must be a list" + return [parser(value) for value in values] + + def _required_str(data: dict[str, Any], key: str) -> str: value = data.get(key) assert isinstance(value, str) and value, f"Missing required string field '{key}'" @@ -178,3 +200,44 @@ def _optional_dict(data: dict[str, Any], key: str) -> dict[str, Any]: value = data.get(key, {}) assert value is None or isinstance(value, dict), f"Optional field '{key}' must be a dict when set" return dict(value or {}) + + +def _optional_str_map(data: dict[str, Any], key: str) -> dict[str, str]: + return {str(k): str(v) for k, v in _optional_dict(data, key).items()} + + +def _assert_unique_ids(specs: list[Any], spec_name: str) -> None: + seen: set[str] = set() + duplicates: set[str] = set() + for spec in specs: + if spec.id in seen: + duplicates.add(spec.id) + seen.add(spec.id) + assert not duplicates, f"Duplicate {spec_name} ids found: {sorted(duplicates)}" + + +def _assert_references_exist( + nodes: list[EnvGraphNodeSpec], + tasks: list[EnvGraphTaskSpec], + state_specs: list[EnvGraphStateSpec], +) -> None: + node_ids = {node.id for node in nodes} + state_spec_ids = {state_spec.id for state_spec in state_specs} + + for task in tasks: + for label, state_spec_id in task.state_specs.items(): + assert state_spec_id in state_spec_ids, ( + f"Task '{task.id}' references unknown state spec '{state_spec_id}' for '{label}'" + ) + + for state_spec in state_specs: + constraints = state_spec.edges.spatial_constraints + state_spec.edges.task_constraints + for constraint in constraints: + if constraint.parent is not None: + assert constraint.parent in node_ids, ( + f"Constraint '{constraint.id}' references unknown parent node '{constraint.parent}'" + ) + if constraint.child is not None: + assert constraint.child in node_ids, ( + f"Constraint '{constraint.id}' references unknown child node '{constraint.child}'" + ) diff --git a/isaaclab_arena/tests/test_env_graph_spec.py b/isaaclab_arena/tests/test_env_graph_spec.py index bc3f582bb..80236718a 100644 --- a/isaaclab_arena/tests/test_env_graph_spec.py +++ b/isaaclab_arena/tests/test_env_graph_spec.py @@ -3,8 +3,11 @@ # # SPDX-License-Identifier: Apache-2.0 +from copy import deepcopy from pathlib import Path +import pytest + from isaaclab_arena.environments.env_graph_spec import EnvGraphSpec, EnvGraphStateSpec TEST_DATA_DIR = Path(__file__).parent / "test_data" @@ -56,3 +59,67 @@ def test_env_graph_spec_loads_pick_and_place_yaml(): assert reach_constraint.type == "reach" assert reach_constraint.parent == "droid_abs_joint_pos" assert reach_constraint.child == "bowl_ycb_robolab" + + +def test_env_graph_spec_rejects_duplicate_ids(): + data = _minimal_env_graph_data() + data["nodes"].append({"id": "table", "name": "duplicate_table", "type": "object_reference"}) + + with pytest.raises(AssertionError, match="Duplicate node ids"): + EnvGraphSpec.from_dict(data) + + +def test_env_graph_spec_rejects_missing_task_state_reference(): + data = _minimal_env_graph_data() + data["tasks"][0]["state_specs"]["initial"] = "missing_state" + + with pytest.raises(AssertionError, match="unknown state spec 'missing_state'"): + EnvGraphSpec.from_dict(data) + + +def test_env_graph_spec_rejects_missing_constraint_node_reference(): + data = _minimal_env_graph_data() + data["state_specs"][0]["edges"]["task_constraints"][0]["child"] = "missing_cube" + + with pytest.raises(AssertionError, match="unknown child node 'missing_cube'"): + EnvGraphSpec.from_dict(data) + + +def _minimal_env_graph_data(): + return deepcopy( + { + "name": "minimal_env_graph", + "nodes": [ + {"id": "robot", "name": "robot", "type": "embodiment"}, + {"id": "table", "name": "table", "type": "object_reference"}, + {"id": "cube", "name": "cube", "type": "rigid_object"}, + ], + "tasks": [ + { + "id": "task_0", + "name": "task_0", + "type": "pick_and_place", + "state_specs": {"initial": "state_0"}, + } + ], + "state_specs": [ + { + "id": "state_0", + "name": "state_0", + "edges": { + "spatial_constraints": [ + {"id": "table_is_anchor", "type": "is_anchor", "parent": "table"} + ], + "task_constraints": [ + { + "id": "robot_reach_cube", + "type": "reach", + "parent": "robot", + "child": "cube", + } + ], + }, + } + ], + } + ) From 1dfd197d8f486f580a23fa97feb0aa56327bfcdb Mon Sep 17 00:00:00 2001 From: Xinjie Yao Date: Thu, 21 May 2026 13:37:17 -0700 Subject: [PATCH 4/8] address comments --- isaaclab_arena/environments/__init__.py | 22 ------- isaaclab_arena/environments/env_graph_spec.py | 36 +++++------ isaaclab_arena/tests/test_env_graph_spec.py | 64 ++++++++----------- 3 files changed, 45 insertions(+), 77 deletions(-) delete mode 100644 isaaclab_arena/environments/__init__.py diff --git a/isaaclab_arena/environments/__init__.py b/isaaclab_arena/environments/__init__.py deleted file mode 100644 index 1883e8cba..000000000 --- a/isaaclab_arena/environments/__init__.py +++ /dev/null @@ -1,22 +0,0 @@ -# Copyright (c) 2025-2026, The Isaac Lab Arena Project Developers (https://github.com/isaac-sim/IsaacLab-Arena/blob/main/CONTRIBUTORS.md). -# All rights reserved. -# -# SPDX-License-Identifier: Apache-2.0 - -from isaaclab_arena.environments.env_graph_spec import ( - EnvGraphConstraintSpec, - EnvGraphEdgesSpec, - EnvGraphNodeSpec, - EnvGraphSpec, - EnvGraphStateSpec, - EnvGraphTaskSpec, -) - -__all__ = [ - "EnvGraphConstraintSpec", - "EnvGraphEdgesSpec", - "EnvGraphNodeSpec", - "EnvGraphSpec", - "EnvGraphStateSpec", - "EnvGraphTaskSpec", -] diff --git a/isaaclab_arena/environments/env_graph_spec.py b/isaaclab_arena/environments/env_graph_spec.py index 63fc76f72..8452662b2 100644 --- a/isaaclab_arena/environments/env_graph_spec.py +++ b/isaaclab_arena/environments/env_graph_spec.py @@ -3,16 +3,16 @@ # # SPDX-License-Identifier: Apache-2.0 +import yaml +from collections.abc import Callable from dataclasses import dataclass, field from pathlib import Path -from typing import Any, Callable, TypeVar - -import yaml +from typing import Any, TypeVar T = TypeVar("T") -@dataclass(frozen=True) +@dataclass class EnvGraphNodeSpec: """Node in an environment graph. @@ -28,7 +28,7 @@ class EnvGraphNodeSpec: params: dict[str, Any] = field(default_factory=dict) -@dataclass(frozen=True) +@dataclass class EnvGraphConstraintSpec: """Constraint edge in an environment graph state spec. @@ -42,7 +42,7 @@ class EnvGraphConstraintSpec: params: dict[str, Any] = field(default_factory=dict) -@dataclass(frozen=True) +@dataclass class EnvGraphEdgesSpec: """Grouped spatial and task constraints.""" @@ -50,7 +50,7 @@ class EnvGraphEdgesSpec: task_constraints: list[EnvGraphConstraintSpec] = field(default_factory=list) -@dataclass(frozen=True) +@dataclass class EnvGraphStateSpec: """Snapshot of the environment state in the graph. @@ -62,7 +62,7 @@ class EnvGraphStateSpec: edges: EnvGraphEdgesSpec = field(default_factory=EnvGraphEdgesSpec) -@dataclass(frozen=True) +@dataclass class EnvGraphTaskSpec: """Task entry in an environment graph.""" @@ -73,7 +73,7 @@ class EnvGraphTaskSpec: task_args: dict[str, Any] = field(default_factory=dict) -@dataclass(frozen=True) +@dataclass class EnvGraphSpec: """Typed representation of an environment graph YAML file.""" @@ -226,18 +226,18 @@ def _assert_references_exist( for task in tasks: for label, state_spec_id in task.state_specs.items(): - assert state_spec_id in state_spec_ids, ( - f"Task '{task.id}' references unknown state spec '{state_spec_id}' for '{label}'" - ) + assert ( + state_spec_id in state_spec_ids + ), f"Task '{task.id}' references unknown state spec '{state_spec_id}' for '{label}'" for state_spec in state_specs: constraints = state_spec.edges.spatial_constraints + state_spec.edges.task_constraints for constraint in constraints: if constraint.parent is not None: - assert constraint.parent in node_ids, ( - f"Constraint '{constraint.id}' references unknown parent node '{constraint.parent}'" - ) + assert ( + constraint.parent in node_ids + ), f"Constraint '{constraint.id}' references unknown parent node '{constraint.parent}'" if constraint.child is not None: - assert constraint.child in node_ids, ( - f"Constraint '{constraint.id}' references unknown child node '{constraint.child}'" - ) + assert ( + constraint.child in node_ids + ), f"Constraint '{constraint.id}' references unknown child node '{constraint.child}'" diff --git a/isaaclab_arena/tests/test_env_graph_spec.py b/isaaclab_arena/tests/test_env_graph_spec.py index 80236718a..c90464f7f 100644 --- a/isaaclab_arena/tests/test_env_graph_spec.py +++ b/isaaclab_arena/tests/test_env_graph_spec.py @@ -86,40 +86,30 @@ def test_env_graph_spec_rejects_missing_constraint_node_reference(): def _minimal_env_graph_data(): - return deepcopy( - { - "name": "minimal_env_graph", - "nodes": [ - {"id": "robot", "name": "robot", "type": "embodiment"}, - {"id": "table", "name": "table", "type": "object_reference"}, - {"id": "cube", "name": "cube", "type": "rigid_object"}, - ], - "tasks": [ - { - "id": "task_0", - "name": "task_0", - "type": "pick_and_place", - "state_specs": {"initial": "state_0"}, - } - ], - "state_specs": [ - { - "id": "state_0", - "name": "state_0", - "edges": { - "spatial_constraints": [ - {"id": "table_is_anchor", "type": "is_anchor", "parent": "table"} - ], - "task_constraints": [ - { - "id": "robot_reach_cube", - "type": "reach", - "parent": "robot", - "child": "cube", - } - ], - }, - } - ], - } - ) + return deepcopy({ + "name": "minimal_env_graph", + "nodes": [ + {"id": "robot", "name": "robot", "type": "embodiment"}, + {"id": "table", "name": "table", "type": "object_reference"}, + {"id": "cube", "name": "cube", "type": "rigid_object"}, + ], + "tasks": [{ + "id": "task_0", + "name": "task_0", + "type": "pick_and_place", + "state_specs": {"initial": "state_0"}, + }], + "state_specs": [{ + "id": "state_0", + "name": "state_0", + "edges": { + "spatial_constraints": [{"id": "table_is_anchor", "type": "is_anchor", "parent": "table"}], + "task_constraints": [{ + "id": "robot_reach_cube", + "type": "reach", + "parent": "robot", + "child": "cube", + }], + }, + }], + }) From 2666aaaf21e90e3d8484598bb7ab3a510539a34e Mon Sep 17 00:00:00 2001 From: Xinjie Yao Date: Thu, 21 May 2026 15:03:39 -0700 Subject: [PATCH 5/8] refactor --- isaaclab_arena/environments/env_graph_spec.py | 119 ++++++++++++++---- .../pick_and_place_maple_table_env_graph.yaml | 12 +- isaaclab_arena/tests/test_env_graph_spec.py | 68 ++++++++-- 3 files changed, 163 insertions(+), 36 deletions(-) diff --git a/isaaclab_arena/environments/env_graph_spec.py b/isaaclab_arena/environments/env_graph_spec.py index 8452662b2..2ec070d2c 100644 --- a/isaaclab_arena/environments/env_graph_spec.py +++ b/isaaclab_arena/environments/env_graph_spec.py @@ -6,10 +6,30 @@ import yaml from collections.abc import Callable from dataclasses import dataclass, field +from enum import Enum from pathlib import Path -from typing import Any, TypeVar +from typing import Any -T = TypeVar("T") +from isaaclab_arena.assets.object_base import ObjectType + + +class EnvGraphNodeType(Enum): + EMBODIMENT = "embodiment" + BACKGROUND = "background" + OBJECT = "object" + OBJECT_REFERENCE = "objectReference" + LIGHTING = "lighting" + + +class EnvGraphSpatialConstraintType(Enum): + IS_ANCHOR = "is_anchor" + NEXT_TO = "next_to" + ON = "on" + AT_POSITION = "at_position" + POSITION_LIMITS = "position_limits" + RANDOM_AROUND_SOLUTION = "random_around_solution" + ROTATE_AROUND_SOLUTION = "rotate_around_solution" + IN = "in" @dataclass @@ -21,20 +41,31 @@ class EnvGraphNodeSpec: id: str name: str - type: str - parent: str | None = None - prim_path: str | None = None - object_type: str | None = None + type: EnvGraphNodeType + parent: str | None = None # Optional, only need for object references + prim_path: str | None = None # Optional, only need for object references + object_type: ObjectType | None = None # Optional, only need for type=object params: dict[str, Any] = field(default_factory=dict) @dataclass -class EnvGraphConstraintSpec: - """Constraint edge in an environment graph state spec. +class EnvGraphSpatialConstraintSpec: + """Spatial constraint edge in an environment graph state spec. - It defines a spatial or task constraint between two nodes. + It defines a relation between two nodes. """ + id: str + type: EnvGraphSpatialConstraintType + parent: str + child: str | None = None # Optional, e.g. is_anchor constraint does not have a child + params: dict[str, Any] = field(default_factory=dict) + + +@dataclass +class EnvGraphTaskConstraintSpec: + """Task-dependent constraint edge in an environment graph state spec.""" + id: str type: str parent: str | None = None @@ -46,8 +77,8 @@ class EnvGraphConstraintSpec: class EnvGraphEdgesSpec: """Grouped spatial and task constraints.""" - spatial_constraints: list[EnvGraphConstraintSpec] = field(default_factory=list) - task_constraints: list[EnvGraphConstraintSpec] = field(default_factory=list) + spatial_constraints: list[EnvGraphSpatialConstraintSpec] = field(default_factory=list) + task_constraints: list[EnvGraphTaskConstraintSpec] = field(default_factory=list) @dataclass @@ -124,17 +155,28 @@ def _parse_node(data: Any) -> EnvGraphNodeSpec: return EnvGraphNodeSpec( id=_required_str(data, "id"), name=_required_str(data, "name"), - type=_required_str(data, "type"), + type=_required_enum(data, "type", EnvGraphNodeType), parent=_optional_str(data, "parent"), prim_path=_optional_str(data, "prim_path"), - object_type=_optional_str(data, "object_type"), + object_type=_optional_enum(data, "object_type", ObjectType), params=_optional_dict(data, "params"), ) -def _parse_constraint(data: Any) -> EnvGraphConstraintSpec: - data = _as_dict(data, "Constraint spec") - return EnvGraphConstraintSpec( +def _parse_spatial_constraint(data: Any) -> EnvGraphSpatialConstraintSpec: + data = _as_dict(data, "Spatial constraint spec") + return EnvGraphSpatialConstraintSpec( + id=_required_str(data, "id"), + type=_required_enum(data, "type", EnvGraphSpatialConstraintType), + parent=_required_str(data, "parent"), + child=_optional_str(data, "child"), + params=_optional_dict(data, "params"), + ) + + +def _parse_task_constraint(data: Any) -> EnvGraphTaskConstraintSpec: + data = _as_dict(data, "Task constraint spec") + return EnvGraphTaskConstraintSpec( id=_required_str(data, "id"), type=_required_str(data, "type"), parent=_optional_str(data, "parent"), @@ -148,8 +190,8 @@ def _parse_edges(data: dict[str, Any] | None) -> EnvGraphEdgesSpec: data = {} data = _as_dict(data, "Edges spec") return EnvGraphEdgesSpec( - spatial_constraints=_parse_list(data, "spatial_constraints", _parse_constraint), - task_constraints=_parse_list(data, "task_constraints", _parse_constraint), + spatial_constraints=_parse_list(data, "spatial_constraints", _parse_spatial_constraint), + task_constraints=_parse_list(data, "task_constraints", _parse_task_constraint), ) @@ -178,7 +220,7 @@ def _as_dict(data: Any, spec_name: str) -> dict[str, Any]: return data -def _parse_list(data: dict[str, Any], key: str, parser: Callable[[Any], T]) -> list[T]: +def _parse_list(data: dict[str, Any], key: str, parser: Callable[[Any], Any]) -> list[Any]: values = data.get(key, []) assert isinstance(values, list), f"Field '{key}' must be a list" return [parser(value) for value in values] @@ -206,6 +248,29 @@ def _optional_str_map(data: dict[str, Any], key: str) -> dict[str, str]: return {str(k): str(v) for k, v in _optional_dict(data, key).items()} +def _required_enum(data: dict[str, Any], key: str, enum_type: type[Enum]) -> Enum: + value = data.get(key) + assert value is not None, f"Missing required field '{key}'" + parsed = _parse_enum(value, key, enum_type) + assert parsed is not None + return parsed + + +def _optional_enum(data: dict[str, Any], key: str, enum_type: type[Enum]) -> Enum | None: + return _parse_enum(data.get(key), key, enum_type) + + +def _parse_enum(value: Any, key: str, enum_type: type[Enum]) -> Enum | None: + if value is None or isinstance(value, enum_type): + return value + assert isinstance(value, str), f"Field '{key}' must be a string when set" + try: + return enum_type(value) + except ValueError: + valid_values = [enum_value.value for enum_value in enum_type] + raise AssertionError(f"Unknown {key} '{value}'. Expected one of {valid_values}") from None + + def _assert_unique_ids(specs: list[Any], spec_name: str) -> None: seen: set[str] = set() duplicates: set[str] = set() @@ -224,6 +289,10 @@ def _assert_references_exist( node_ids = {node.id for node in nodes} state_spec_ids = {state_spec.id for state_spec in state_specs} + for node in nodes: + if node.parent is not None: + assert node.parent in node_ids, f"Node '{node.id}' references unknown parent '{node.parent}'" + for task in tasks: for label, state_spec_id in task.state_specs.items(): assert ( @@ -231,8 +300,16 @@ def _assert_references_exist( ), f"Task '{task.id}' references unknown state spec '{state_spec_id}' for '{label}'" for state_spec in state_specs: - constraints = state_spec.edges.spatial_constraints + state_spec.edges.task_constraints - for constraint in constraints: + for constraint in state_spec.edges.spatial_constraints: + assert ( + constraint.parent in node_ids + ), f"Constraint '{constraint.id}' references unknown parent node '{constraint.parent}'" + if constraint.child is not None: + assert ( + constraint.child in node_ids + ), f"Constraint '{constraint.id}' references unknown child node '{constraint.child}'" + + for constraint in state_spec.edges.task_constraints: if constraint.parent is not None: assert ( constraint.parent in node_ids diff --git a/isaaclab_arena/tests/test_data/pick_and_place_maple_table_env_graph.yaml b/isaaclab_arena/tests/test_data/pick_and_place_maple_table_env_graph.yaml index 8d1aa8369..84a932768 100644 --- a/isaaclab_arena/tests/test_data/pick_and_place_maple_table_env_graph.yaml +++ b/isaaclab_arena/tests/test_data/pick_and_place_maple_table_env_graph.yaml @@ -16,18 +16,18 @@ nodes: - id: maple_table_robolab_table name: table - type: object_reference + type: objectReference parent: maple_table_robolab prim_path: "{ENV_REGEX_NS}/maple_table_robolab/table" object_type: rigid - id: rubiks_cube_hot3d_robolab name: rubiks_cube_hot3d_robolab - type: rigid_object + type: object - id: bowl_ycb_robolab name: bowl_ycb_robolab - type: rigid_object + type: object tasks: - id: pick_and_place_0 @@ -58,7 +58,7 @@ state_specs: - id: rubiks_cube_hot3d_robolab_position_limits type: position_limits - child: rubiks_cube_hot3d_robolab + parent: rubiks_cube_hot3d_robolab params: x_min: 0.55 x_max: 0.70 @@ -72,7 +72,7 @@ state_specs: - id: bowl_ycb_robolab_position_limits type: position_limits - child: bowl_ycb_robolab + parent: bowl_ycb_robolab params: x_min: 0.55 x_max: 0.70 @@ -100,7 +100,7 @@ state_specs: - id: bowl_ycb_robolab_position_limits type: position_limits - child: bowl_ycb_robolab + parent: bowl_ycb_robolab params: x_min: 0.55 x_max: 0.70 diff --git a/isaaclab_arena/tests/test_env_graph_spec.py b/isaaclab_arena/tests/test_env_graph_spec.py index c90464f7f..861d6ec15 100644 --- a/isaaclab_arena/tests/test_env_graph_spec.py +++ b/isaaclab_arena/tests/test_env_graph_spec.py @@ -8,7 +8,13 @@ import pytest -from isaaclab_arena.environments.env_graph_spec import EnvGraphSpec, EnvGraphStateSpec +from isaaclab_arena.assets.object_base import ObjectType +from isaaclab_arena.environments.env_graph_spec import ( + EnvGraphNodeType, + EnvGraphSpatialConstraintType, + EnvGraphSpec, + EnvGraphStateSpec, +) TEST_DATA_DIR = Path(__file__).parent / "test_data" @@ -22,10 +28,13 @@ def test_env_graph_spec_loads_pick_and_place_yaml(): assert len(spec.state_specs) == 2 table = spec.nodes_by_id["maple_table_robolab_table"] - assert table.type == "object_reference" + assert table.type == EnvGraphNodeType.OBJECT_REFERENCE assert table.parent == "maple_table_robolab" assert table.prim_path == "{ENV_REGEX_NS}/maple_table_robolab/table" - assert table.object_type == "rigid" + assert table.object_type == ObjectType.RIGID + + assert spec.nodes_by_id["rubiks_cube_hot3d_robolab"].type == EnvGraphNodeType.OBJECT + assert spec.nodes_by_id["bowl_ycb_robolab"].type == EnvGraphNodeType.OBJECT task = spec.tasks_by_id["pick_and_place_0"] assert task.state_specs == {"initial": "state_spec_0", "final": "state_spec_1"} @@ -39,8 +48,9 @@ def test_env_graph_spec_loads_pick_and_place_yaml(): assert len(initial_state.edges.task_constraints) == 1 cube_limits = initial_state.edges.spatial_constraints[2] - assert cube_limits.type == "position_limits" - assert cube_limits.child == "rubiks_cube_hot3d_robolab" + assert cube_limits.type == EnvGraphSpatialConstraintType.POSITION_LIMITS + assert cube_limits.parent == "rubiks_cube_hot3d_robolab" + assert cube_limits.child is None assert cube_limits.params == { "x_min": 0.55, "x_max": 0.70, @@ -51,7 +61,7 @@ def test_env_graph_spec_loads_pick_and_place_yaml(): final_state = spec.state_specs_by_id["state_spec_1"] assert isinstance(final_state, EnvGraphStateSpec) in_constraint = final_state.edges.spatial_constraints[3] - assert in_constraint.type == "in" + assert in_constraint.type == EnvGraphSpatialConstraintType.IN assert in_constraint.parent == "bowl_ycb_robolab" assert in_constraint.child == "rubiks_cube_hot3d_robolab" @@ -63,7 +73,7 @@ def test_env_graph_spec_loads_pick_and_place_yaml(): def test_env_graph_spec_rejects_duplicate_ids(): data = _minimal_env_graph_data() - data["nodes"].append({"id": "table", "name": "duplicate_table", "type": "object_reference"}) + data["nodes"].append({"id": "table", "name": "duplicate_table", "type": "objectReference"}) with pytest.raises(AssertionError, match="Duplicate node ids"): EnvGraphSpec.from_dict(data) @@ -85,13 +95,53 @@ def test_env_graph_spec_rejects_missing_constraint_node_reference(): EnvGraphSpec.from_dict(data) +def test_env_graph_spec_rejects_missing_spatial_constraint_parent(): + data = _minimal_env_graph_data() + del data["state_specs"][0]["edges"]["spatial_constraints"][0]["parent"] + + with pytest.raises(AssertionError, match="Missing required string field 'parent'"): + EnvGraphSpec.from_dict(data) + + +def test_env_graph_spec_rejects_missing_node_parent_reference(): + data = _minimal_env_graph_data() + data["nodes"][1]["parent"] = "missing_background" + + with pytest.raises(AssertionError, match="unknown parent 'missing_background'"): + EnvGraphSpec.from_dict(data) + + +def test_env_graph_spec_rejects_unknown_object_type(): + data = _minimal_env_graph_data() + data["nodes"][1]["object_type"] = "unknown" + + with pytest.raises(AssertionError, match="Unknown object_type 'unknown'"): + EnvGraphSpec.from_dict(data) + + +def test_env_graph_spec_rejects_unknown_node_type(): + data = _minimal_env_graph_data() + data["nodes"][0]["type"] = "unknown" + + with pytest.raises(AssertionError, match="Unknown type 'unknown'"): + EnvGraphSpec.from_dict(data) + + +def test_env_graph_spec_rejects_unknown_spatial_constraint_type(): + data = _minimal_env_graph_data() + data["state_specs"][0]["edges"]["spatial_constraints"][0]["type"] = "unknown" + + with pytest.raises(AssertionError, match="Unknown type 'unknown'"): + EnvGraphSpec.from_dict(data) + + def _minimal_env_graph_data(): return deepcopy({ "name": "minimal_env_graph", "nodes": [ {"id": "robot", "name": "robot", "type": "embodiment"}, - {"id": "table", "name": "table", "type": "object_reference"}, - {"id": "cube", "name": "cube", "type": "rigid_object"}, + {"id": "table", "name": "table", "type": "objectReference"}, + {"id": "cube", "name": "cube", "type": "object"}, ], "tasks": [{ "id": "task_0", From 9c119193b4f504e8ceed3e27a5ab2b66b8fdd20e Mon Sep 17 00:00:00 2001 From: Xinjie Yao Date: Thu, 21 May 2026 16:15:57 -0700 Subject: [PATCH 6/8] refactor --- .../environments/arena_env_graph_spec.py | 224 ++++++++++++ isaaclab_arena/environments/env_graph_spec.py | 320 ------------------ isaaclab_arena/environments/utils.py | 130 +++++++ .../tests/test_arena_env_graph_spec.py | 216 ++++++++++++ .../pick_and_place_maple_table_env_graph.yaml | 143 ++++---- isaaclab_arena/tests/test_env_graph_spec.py | 165 --------- 6 files changed, 640 insertions(+), 558 deletions(-) create mode 100644 isaaclab_arena/environments/arena_env_graph_spec.py delete mode 100644 isaaclab_arena/environments/env_graph_spec.py create mode 100644 isaaclab_arena/environments/utils.py create mode 100644 isaaclab_arena/tests/test_arena_env_graph_spec.py delete mode 100644 isaaclab_arena/tests/test_env_graph_spec.py diff --git a/isaaclab_arena/environments/arena_env_graph_spec.py b/isaaclab_arena/environments/arena_env_graph_spec.py new file mode 100644 index 000000000..792ec6df3 --- /dev/null +++ b/isaaclab_arena/environments/arena_env_graph_spec.py @@ -0,0 +1,224 @@ +# Copyright (c) 2025-2026, The Isaac Lab Arena Project Developers (https://github.com/isaac-sim/IsaacLab-Arena/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 + +from dataclasses import dataclass, field +from enum import Enum +from pathlib import Path +from typing import Any + +import yaml + +from isaaclab_arena.assets.object_base import ObjectType +from isaaclab_arena.environments.utils import ( + as_dict, + assert_env_graph_references_exist, + assert_env_graph_universal_ids, + optional_dict, + optional_enum, + optional_str, + parse_list, + required_enum, + required_number_sequence, + required_str, +) + + +class ArenaEnvGraphNodeType(Enum): + EMBODIMENT = "embodiment" + BACKGROUND = "background" + OBJECT = "object" + OBJECT_REFERENCE = "objectReference" + LIGHTING = "lighting" + + +class ArenaEnvGraphSpatialConstraintType(Enum): + IS_ANCHOR = "is_anchor" + NEXT_TO = "next_to" + ON = "on" + AT_POSE = "at_pose" # through set_initial_pose() + AT_POSITION = "at_position" # through object relation solver: AtPosition + POSITION_LIMITS = "position_limits" + RANDOM_AROUND_SOLUTION = "random_around_solution" + ROTATE_AROUND_SOLUTION = "rotate_around_solution" + # TODO(xinjieyao, 2026-05-21): Support "in" in solver + IN = "in" + + +@dataclass +class ArenaEnvGraphNodeSpec: + """Node in an environment graph. + + Could be an object, an object reference, an embodiment, a background, etc. + """ + + id: str + name: str + type: ArenaEnvGraphNodeType + parent: str | None = None # Optional, only need for object references + prim_path: str | None = None # Optional, only need for object references + object_type: ObjectType | None = None # Optional, only need for type=object + params: dict[str, Any] = field(default_factory=dict) + + +@dataclass +class ArenaEnvGraphSpatialConstraintSpec: + """Spatial constraint edge in an environment graph state spec. + + It defines a relation between two nodes. + """ + + id: str + type: ArenaEnvGraphSpatialConstraintType + parent: str + child: str | None = None # Optional, e.g. is_anchor constraint does not have a child + params: dict[str, Any] = field(default_factory=dict) + + +@dataclass +class ArenaEnvGraphTaskConstraintSpec: + """Task-dependent constraint edge in an environment graph state spec.""" + + id: str + type: str + parent: str + child: str | None = None # Optional, could be a robot keeps gripper open or closed, or a single object + params: dict[str, Any] = field(default_factory=dict) + + +@dataclass +class ArenaEnvGraphStateSpec: + """Snapshot of the environment state in the graph. + + Could be an initial, intermediate, or final state. + """ + + id: str + name: str + spatial_constraints: list[ArenaEnvGraphSpatialConstraintSpec] = field(default_factory=list) + task_constraints: list[ArenaEnvGraphTaskConstraintSpec] = field(default_factory=list) + + +@dataclass +class ArenaEnvGraphTaskSpec: + """Task entry in an environment graph.""" + + id: str + name: str + type: str # Task class name, could be a custom task class or a built-in task class + initial_state_spec_id: str + success_state_spec_id: str + task_args: dict[str, Any] = field(default_factory=dict) + + +@dataclass +class ArenaEnvGraphSpec: + """Typed representation of an environment graph YAML file. + It defines the nodes, tasks, and state specs of the environment graph. + """ + + env_name: str + nodes: list[ArenaEnvGraphNodeSpec] = field(default_factory=list) + tasks: list[ArenaEnvGraphTaskSpec] = field(default_factory=list) + state_specs: list[ArenaEnvGraphStateSpec] = field(default_factory=list) + + @classmethod + def from_yaml(cls, path: str | Path) -> "ArenaEnvGraphSpec": + with Path(path).open("r", encoding="utf-8") as f: + return cls.from_dict(yaml.safe_load(f)) + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> "ArenaEnvGraphSpec": + data = as_dict(data, "Env graph spec") + nodes = parse_list(data, "nodes", _parse_node) + tasks = parse_list(data, "tasks", _parse_task) + state_specs = parse_list(data, "state_specs", _parse_state_spec) + + assert_env_graph_universal_ids(nodes, tasks, state_specs) + assert_env_graph_references_exist(nodes, tasks, state_specs) + + return cls( + env_name=required_str(data, "env_name"), + nodes=nodes, + tasks=tasks, + state_specs=state_specs, + ) + + @property + def nodes_by_id(self) -> dict[str, ArenaEnvGraphNodeSpec]: + return {node.id: node for node in self.nodes} + + @property + def tasks_by_id(self) -> dict[str, ArenaEnvGraphTaskSpec]: + return {task.id: task for task in self.tasks} + + @property + def state_specs_by_id(self) -> dict[str, ArenaEnvGraphStateSpec]: + return {state_spec.id: state_spec for state_spec in self.state_specs} + + +def _parse_node(data: Any) -> ArenaEnvGraphNodeSpec: + data = as_dict(data, "Node spec") + return ArenaEnvGraphNodeSpec( + id=required_str(data, "id"), + name=required_str(data, "name"), + type=required_enum(data, "type", ArenaEnvGraphNodeType), + parent=optional_str(data, "parent"), + prim_path=optional_str(data, "prim_path"), + object_type=optional_enum(data, "object_type", ObjectType), + params=optional_dict(data, "params"), + ) + + +def _parse_spatial_constraint(data: Any) -> ArenaEnvGraphSpatialConstraintSpec: + data = as_dict(data, "Spatial constraint spec") + constraint_type = required_enum(data, "type", ArenaEnvGraphSpatialConstraintType) + params = optional_dict(data, "params") + if constraint_type == ArenaEnvGraphSpatialConstraintType.AT_POSE: + params["position_xyz"] = required_number_sequence(params, "position_xyz", 3) + params["rotation_xyzw"] = required_number_sequence(params, "rotation_xyzw", 4) + + return ArenaEnvGraphSpatialConstraintSpec( + id=required_str(data, "id"), + type=constraint_type, + parent=required_str(data, "parent"), + child=optional_str(data, "child"), + params=params, + ) + + +def _parse_task_constraint(data: Any) -> ArenaEnvGraphTaskConstraintSpec: + data = as_dict(data, "Task constraint spec") + return ArenaEnvGraphTaskConstraintSpec( + id=required_str(data, "id"), + type=required_str(data, "type"), + parent=optional_str(data, "parent"), + child=optional_str(data, "child"), + params=optional_dict(data, "params"), + ) + + +def _parse_state_spec(data: Any) -> ArenaEnvGraphStateSpec: + data = as_dict(data, "State spec") + assert "edges" not in data, "State spec must define spatial_constraints and task_constraints directly" + return ArenaEnvGraphStateSpec( + id=required_str(data, "id"), + name=required_str(data, "name"), + spatial_constraints=parse_list(data, "spatial_constraints", _parse_spatial_constraint), + task_constraints=parse_list(data, "task_constraints", _parse_task_constraint), + ) + + +def _parse_task(data: Any) -> ArenaEnvGraphTaskSpec: + data = as_dict(data, "Task spec") + for old_key in ("state_specs", "initial_state_spec", "success_state_spec"): + assert old_key not in data, "Task spec must use initial_state_spec_id and success_state_spec_id" + return ArenaEnvGraphTaskSpec( + id=required_str(data, "id"), + name=required_str(data, "name"), + type=required_str(data, "type"), + initial_state_spec_id=required_str(data, "initial_state_spec_id"), + success_state_spec_id=required_str(data, "success_state_spec_id"), + task_args=optional_dict(data, "task_args"), + ) diff --git a/isaaclab_arena/environments/env_graph_spec.py b/isaaclab_arena/environments/env_graph_spec.py deleted file mode 100644 index 2ec070d2c..000000000 --- a/isaaclab_arena/environments/env_graph_spec.py +++ /dev/null @@ -1,320 +0,0 @@ -# Copyright (c) 2025-2026, The Isaac Lab Arena Project Developers (https://github.com/isaac-sim/IsaacLab-Arena/blob/main/CONTRIBUTORS.md). -# All rights reserved. -# -# SPDX-License-Identifier: Apache-2.0 - -import yaml -from collections.abc import Callable -from dataclasses import dataclass, field -from enum import Enum -from pathlib import Path -from typing import Any - -from isaaclab_arena.assets.object_base import ObjectType - - -class EnvGraphNodeType(Enum): - EMBODIMENT = "embodiment" - BACKGROUND = "background" - OBJECT = "object" - OBJECT_REFERENCE = "objectReference" - LIGHTING = "lighting" - - -class EnvGraphSpatialConstraintType(Enum): - IS_ANCHOR = "is_anchor" - NEXT_TO = "next_to" - ON = "on" - AT_POSITION = "at_position" - POSITION_LIMITS = "position_limits" - RANDOM_AROUND_SOLUTION = "random_around_solution" - ROTATE_AROUND_SOLUTION = "rotate_around_solution" - IN = "in" - - -@dataclass -class EnvGraphNodeSpec: - """Node in an environment graph. - - Could be an object, an object reference, an embodiment, a background, etc. - """ - - id: str - name: str - type: EnvGraphNodeType - parent: str | None = None # Optional, only need for object references - prim_path: str | None = None # Optional, only need for object references - object_type: ObjectType | None = None # Optional, only need for type=object - params: dict[str, Any] = field(default_factory=dict) - - -@dataclass -class EnvGraphSpatialConstraintSpec: - """Spatial constraint edge in an environment graph state spec. - - It defines a relation between two nodes. - """ - - id: str - type: EnvGraphSpatialConstraintType - parent: str - child: str | None = None # Optional, e.g. is_anchor constraint does not have a child - params: dict[str, Any] = field(default_factory=dict) - - -@dataclass -class EnvGraphTaskConstraintSpec: - """Task-dependent constraint edge in an environment graph state spec.""" - - id: str - type: str - parent: str | None = None - child: str | None = None - params: dict[str, Any] = field(default_factory=dict) - - -@dataclass -class EnvGraphEdgesSpec: - """Grouped spatial and task constraints.""" - - spatial_constraints: list[EnvGraphSpatialConstraintSpec] = field(default_factory=list) - task_constraints: list[EnvGraphTaskConstraintSpec] = field(default_factory=list) - - -@dataclass -class EnvGraphStateSpec: - """Snapshot of the environment state in the graph. - - Could be an initial, intermediate, or final state. - """ - - id: str - name: str - edges: EnvGraphEdgesSpec = field(default_factory=EnvGraphEdgesSpec) - - -@dataclass -class EnvGraphTaskSpec: - """Task entry in an environment graph.""" - - id: str - name: str - type: str - state_specs: dict[str, str] = field(default_factory=dict) - task_args: dict[str, Any] = field(default_factory=dict) - - -@dataclass -class EnvGraphSpec: - """Typed representation of an environment graph YAML file.""" - - name: str - nodes: list[EnvGraphNodeSpec] = field(default_factory=list) - tasks: list[EnvGraphTaskSpec] = field(default_factory=list) - state_specs: list[EnvGraphStateSpec] = field(default_factory=list) - - @classmethod - def from_yaml(cls, path: str | Path) -> "EnvGraphSpec": - with Path(path).open("r", encoding="utf-8") as f: - return cls.from_dict(yaml.safe_load(f)) - - @classmethod - def from_dict(cls, data: dict[str, Any]) -> "EnvGraphSpec": - data = _as_dict(data, "Env graph spec") - nodes = _parse_list(data, "nodes", _parse_node) - tasks = _parse_list(data, "tasks", _parse_task) - state_specs = _parse_list(data, "state_specs", _parse_state_spec) - - _assert_unique_ids(nodes, "node") - _assert_unique_ids(tasks, "task") - _assert_unique_ids(state_specs, "state spec") - _assert_references_exist(nodes, tasks, state_specs) - - return cls( - name=_required_str(data, "name"), - nodes=nodes, - tasks=tasks, - state_specs=state_specs, - ) - - @property - def nodes_by_id(self) -> dict[str, EnvGraphNodeSpec]: - return {node.id: node for node in self.nodes} - - @property - def tasks_by_id(self) -> dict[str, EnvGraphTaskSpec]: - return {task.id: task for task in self.tasks} - - @property - def state_specs_by_id(self) -> dict[str, EnvGraphStateSpec]: - return {state_spec.id: state_spec for state_spec in self.state_specs} - - -def _parse_node(data: Any) -> EnvGraphNodeSpec: - data = _as_dict(data, "Node spec") - return EnvGraphNodeSpec( - id=_required_str(data, "id"), - name=_required_str(data, "name"), - type=_required_enum(data, "type", EnvGraphNodeType), - parent=_optional_str(data, "parent"), - prim_path=_optional_str(data, "prim_path"), - object_type=_optional_enum(data, "object_type", ObjectType), - params=_optional_dict(data, "params"), - ) - - -def _parse_spatial_constraint(data: Any) -> EnvGraphSpatialConstraintSpec: - data = _as_dict(data, "Spatial constraint spec") - return EnvGraphSpatialConstraintSpec( - id=_required_str(data, "id"), - type=_required_enum(data, "type", EnvGraphSpatialConstraintType), - parent=_required_str(data, "parent"), - child=_optional_str(data, "child"), - params=_optional_dict(data, "params"), - ) - - -def _parse_task_constraint(data: Any) -> EnvGraphTaskConstraintSpec: - data = _as_dict(data, "Task constraint spec") - return EnvGraphTaskConstraintSpec( - id=_required_str(data, "id"), - type=_required_str(data, "type"), - parent=_optional_str(data, "parent"), - child=_optional_str(data, "child"), - params=_optional_dict(data, "params"), - ) - - -def _parse_edges(data: dict[str, Any] | None) -> EnvGraphEdgesSpec: - if data is None: - data = {} - data = _as_dict(data, "Edges spec") - return EnvGraphEdgesSpec( - spatial_constraints=_parse_list(data, "spatial_constraints", _parse_spatial_constraint), - task_constraints=_parse_list(data, "task_constraints", _parse_task_constraint), - ) - - -def _parse_state_spec(data: Any) -> EnvGraphStateSpec: - data = _as_dict(data, "State spec") - return EnvGraphStateSpec( - id=_required_str(data, "id"), - name=_required_str(data, "name"), - edges=_parse_edges(data.get("edges")), - ) - - -def _parse_task(data: Any) -> EnvGraphTaskSpec: - data = _as_dict(data, "Task spec") - return EnvGraphTaskSpec( - id=_required_str(data, "id"), - name=_required_str(data, "name"), - type=_required_str(data, "type"), - state_specs=_optional_str_map(data, "state_specs"), - task_args=_optional_dict(data, "task_args"), - ) - - -def _as_dict(data: Any, spec_name: str) -> dict[str, Any]: - assert isinstance(data, dict), f"{spec_name} must be a dict, got {type(data).__name__}" - return data - - -def _parse_list(data: dict[str, Any], key: str, parser: Callable[[Any], Any]) -> list[Any]: - values = data.get(key, []) - assert isinstance(values, list), f"Field '{key}' must be a list" - return [parser(value) for value in values] - - -def _required_str(data: dict[str, Any], key: str) -> str: - value = data.get(key) - assert isinstance(value, str) and value, f"Missing required string field '{key}'" - return value - - -def _optional_str(data: dict[str, Any], key: str) -> str | None: - value = data.get(key) - assert value is None or isinstance(value, str), f"Optional field '{key}' must be a string when set" - return value - - -def _optional_dict(data: dict[str, Any], key: str) -> dict[str, Any]: - value = data.get(key, {}) - assert value is None or isinstance(value, dict), f"Optional field '{key}' must be a dict when set" - return dict(value or {}) - - -def _optional_str_map(data: dict[str, Any], key: str) -> dict[str, str]: - return {str(k): str(v) for k, v in _optional_dict(data, key).items()} - - -def _required_enum(data: dict[str, Any], key: str, enum_type: type[Enum]) -> Enum: - value = data.get(key) - assert value is not None, f"Missing required field '{key}'" - parsed = _parse_enum(value, key, enum_type) - assert parsed is not None - return parsed - - -def _optional_enum(data: dict[str, Any], key: str, enum_type: type[Enum]) -> Enum | None: - return _parse_enum(data.get(key), key, enum_type) - - -def _parse_enum(value: Any, key: str, enum_type: type[Enum]) -> Enum | None: - if value is None or isinstance(value, enum_type): - return value - assert isinstance(value, str), f"Field '{key}' must be a string when set" - try: - return enum_type(value) - except ValueError: - valid_values = [enum_value.value for enum_value in enum_type] - raise AssertionError(f"Unknown {key} '{value}'. Expected one of {valid_values}") from None - - -def _assert_unique_ids(specs: list[Any], spec_name: str) -> None: - seen: set[str] = set() - duplicates: set[str] = set() - for spec in specs: - if spec.id in seen: - duplicates.add(spec.id) - seen.add(spec.id) - assert not duplicates, f"Duplicate {spec_name} ids found: {sorted(duplicates)}" - - -def _assert_references_exist( - nodes: list[EnvGraphNodeSpec], - tasks: list[EnvGraphTaskSpec], - state_specs: list[EnvGraphStateSpec], -) -> None: - node_ids = {node.id for node in nodes} - state_spec_ids = {state_spec.id for state_spec in state_specs} - - for node in nodes: - if node.parent is not None: - assert node.parent in node_ids, f"Node '{node.id}' references unknown parent '{node.parent}'" - - for task in tasks: - for label, state_spec_id in task.state_specs.items(): - assert ( - state_spec_id in state_spec_ids - ), f"Task '{task.id}' references unknown state spec '{state_spec_id}' for '{label}'" - - for state_spec in state_specs: - for constraint in state_spec.edges.spatial_constraints: - assert ( - constraint.parent in node_ids - ), f"Constraint '{constraint.id}' references unknown parent node '{constraint.parent}'" - if constraint.child is not None: - assert ( - constraint.child in node_ids - ), f"Constraint '{constraint.id}' references unknown child node '{constraint.child}'" - - for constraint in state_spec.edges.task_constraints: - if constraint.parent is not None: - assert ( - constraint.parent in node_ids - ), f"Constraint '{constraint.id}' references unknown parent node '{constraint.parent}'" - if constraint.child is not None: - assert ( - constraint.child in node_ids - ), f"Constraint '{constraint.id}' references unknown child node '{constraint.child}'" diff --git a/isaaclab_arena/environments/utils.py b/isaaclab_arena/environments/utils.py new file mode 100644 index 000000000..fcc9cc1d0 --- /dev/null +++ b/isaaclab_arena/environments/utils.py @@ -0,0 +1,130 @@ +# Copyright (c) 2026, The Isaac Lab Arena Project Developers (https://github.com/isaac-sim/IsaacLab-Arena/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 + +from collections.abc import Callable +from enum import Enum +from numbers import Real +from typing import Any + + +def as_dict(data: Any, spec_name: str) -> dict[str, Any]: + assert isinstance(data, dict), f"{spec_name} must be a dict, got {type(data).__name__}" + return data + + +def parse_list(data: dict[str, Any], key: str, parser: Callable[[Any], Any]) -> list[Any]: + values = data.get(key, []) + assert isinstance(values, list), f"Field '{key}' must be a list" + return [parser(value) for value in values] + + +def required_str(data: dict[str, Any], key: str) -> str: + value = data.get(key) + assert isinstance(value, str) and value, f"Missing required string field '{key}'" + return value + + +def optional_str(data: dict[str, Any], key: str) -> str | None: + value = data.get(key) + assert value is None or isinstance(value, str), f"Optional field '{key}' must be a string when set" + return value + + +def optional_dict(data: dict[str, Any], key: str) -> dict[str, Any]: + value = data.get(key, {}) + assert value is None or isinstance(value, dict), f"Optional field '{key}' must be a dict when set" + return dict(value or {}) + + +def required_number_sequence(data: dict[str, Any], key: str, length: int) -> tuple[float, ...]: + value = data.get(key) + assert isinstance(value, (list, tuple)), f"Missing required numeric sequence field '{key}'" + assert len(value) == length, f"Field '{key}' must contain {length} numbers" + assert all(isinstance(item, Real) and not isinstance(item, bool) for item in value), ( + f"Field '{key}' must contain only numbers" + ) + return tuple(float(item) for item in value) + + +def required_enum(data: dict[str, Any], key: str, enum_type: type[Enum]) -> Enum: + value = data.get(key) + assert value is not None, f"Missing required field '{key}'" + parsed = parse_enum(value, key, enum_type) + assert parsed is not None + return parsed + + +def optional_enum(data: dict[str, Any], key: str, enum_type: type[Enum]) -> Enum | None: + return parse_enum(data.get(key), key, enum_type) + + +def parse_enum(value: Any, key: str, enum_type: type[Enum]) -> Enum | None: + if value is None or isinstance(value, enum_type): + return value + assert isinstance(value, str), f"Field '{key}' must be a string when set" + try: + return enum_type(value) + except ValueError: + valid_values = [enum_value.value for enum_value in enum_type] + raise AssertionError(f"Unknown {key} '{value}'. Expected one of {valid_values}") from None + + +def assert_env_graph_universal_ids(nodes: list[Any], tasks: list[Any], state_specs: list[Any]) -> None: + id_locations: dict[str, list[str]] = {} + for node in nodes: + _add_id_location(id_locations, node.id, f"node '{node.id}'") + for task in tasks: + _add_id_location(id_locations, task.id, f"task '{task.id}'") + for state_spec in state_specs: + _add_id_location(id_locations, state_spec.id, f"state spec '{state_spec.id}'") + for constraint in state_spec.spatial_constraints: + _add_id_location(id_locations, constraint.id, f"spatial constraint '{constraint.id}'") + for constraint in state_spec.task_constraints: + _add_id_location(id_locations, constraint.id, f"task constraint '{constraint.id}'") + + duplicates = {spec_id: locations for spec_id, locations in id_locations.items() if len(locations) > 1} + assert not duplicates, f"Duplicate env graph ids found: {duplicates}" + + +def assert_env_graph_references_exist(nodes: list[Any], tasks: list[Any], state_specs: list[Any]) -> None: + node_ids = {node.id for node in nodes} + state_spec_ids = {state_spec.id for state_spec in state_specs} + + for node in nodes: + if node.parent is not None: + assert node.parent in node_ids, f"Node '{node.id}' references unknown parent '{node.parent}'" + + for task in tasks: + for label, state_spec_id in ( + ("initial_state_spec_id", task.initial_state_spec_id), + ("success_state_spec_id", task.success_state_spec_id), + ): + assert ( + state_spec_id in state_spec_ids + ), f"Task '{task.id}' references unknown state spec '{state_spec_id}' for '{label}'" + + for state_spec in state_specs: + for constraint in state_spec.spatial_constraints: + assert ( + constraint.parent in node_ids + ), f"Constraint '{constraint.id}' references unknown parent node '{constraint.parent}'" + if constraint.child is not None: + assert ( + constraint.child in node_ids + ), f"Constraint '{constraint.id}' references unknown child node '{constraint.child}'" + + for constraint in state_spec.task_constraints: + if constraint.parent is not None: + assert ( + constraint.parent in node_ids + ), f"Constraint '{constraint.id}' references unknown parent node '{constraint.parent}'" + if constraint.child is not None: + assert ( + constraint.child in node_ids + ), f"Constraint '{constraint.id}' references unknown child node '{constraint.child}'" + + +def _add_id_location(id_locations: dict[str, list[str]], spec_id: str, location: str) -> None: + id_locations.setdefault(spec_id, []).append(location) diff --git a/isaaclab_arena/tests/test_arena_env_graph_spec.py b/isaaclab_arena/tests/test_arena_env_graph_spec.py new file mode 100644 index 000000000..76f0570f0 --- /dev/null +++ b/isaaclab_arena/tests/test_arena_env_graph_spec.py @@ -0,0 +1,216 @@ +# Copyright (c) 2026, The Isaac Lab Arena Project Developers (https://github.com/isaac-sim/IsaacLab-Arena/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 + +from pathlib import Path + +from isaaclab_arena.assets.object_base import ObjectType +from isaaclab_arena.environments.arena_env_graph_spec import ( + ArenaEnvGraphNodeType, + ArenaEnvGraphSpatialConstraintType, + ArenaEnvGraphSpec, + ArenaEnvGraphStateSpec, +) + +TEST_DATA_DIR = Path(__file__).parent / "test_data" + + +def test_arena_env_graph_spec_loads_pick_and_place_yaml(): + spec = ArenaEnvGraphSpec.from_yaml(TEST_DATA_DIR / "pick_and_place_maple_table_env_graph.yaml") + + assert spec.env_name == "pick_and_place_maple_table_default" + assert len(spec.nodes) == 5 + assert len(spec.tasks) == 1 + assert len(spec.state_specs) == 2 + + table = spec.nodes_by_id["maple_table_robolab_table"] + assert table.type == ArenaEnvGraphNodeType.OBJECT_REFERENCE + assert table.parent == "maple_table_robolab" + assert table.prim_path == "{ENV_REGEX_NS}/maple_table_robolab/table" + assert table.object_type == ObjectType.RIGID + + task = spec.tasks_by_id["pick_and_place_0"] + assert task.initial_state_spec_id == "state_spec_0" + assert task.success_state_spec_id == "state_spec_1" + assert task.task_args["object"] == "rubiks_cube_hot3d_robolab" + assert task.task_args["destination"] == "bowl_ycb_robolab" + + initial_state = spec.state_specs_by_id["state_spec_0"] + assert isinstance(initial_state, ArenaEnvGraphStateSpec) + assert len(initial_state.spatial_constraints) == 5 + assert len(initial_state.task_constraints) == 1 + + cube_limits = initial_state.spatial_constraints[2] + assert cube_limits.type == ArenaEnvGraphSpatialConstraintType.POSITION_LIMITS + assert cube_limits.parent == "rubiks_cube_hot3d_robolab" + assert cube_limits.params == {"x_min": 0.55, "x_max": 0.70, "y_min": -0.40, "y_max": -0.10} + + final_state = spec.state_specs_by_id["state_spec_1"] + in_constraint = final_state.spatial_constraints[3] + assert in_constraint.type == ArenaEnvGraphSpatialConstraintType.IN + assert in_constraint.parent == "bowl_ycb_robolab" + assert in_constraint.child == "rubiks_cube_hot3d_robolab" + + +def test_arena_env_graph_spec_parses_optional_task_constraints_and_at_pose(): + data = _minimal_env_graph_data() + data["state_specs"][0]["spatial_constraints"] = [_at_pose_constraint()] + del data["state_specs"][0]["task_constraints"] + + spec = ArenaEnvGraphSpec.from_dict(data) + state_spec = spec.state_specs_by_id["state_0"] + fixed_pose = state_spec.spatial_constraints[0] + + assert state_spec.task_constraints == [] + assert fixed_pose.type == ArenaEnvGraphSpatialConstraintType.AT_POSE + assert fixed_pose.parent == "cube" + assert fixed_pose.params["position_xyz"] == (0.1, 0.2, 0.3) + assert fixed_pose.params["rotation_xyzw"] == (0.0, 0.0, 0.0, 1.0) + + +def test_arena_env_graph_spec_rejects_invalid_data(): + cases = [ + ( + "duplicate node id", + lambda data: data["nodes"].append({"id": "table", "name": "duplicate_table", "type": "objectReference"}), + "Duplicate env graph ids", + ), + ( + "duplicate id across spec types", + lambda data: data["tasks"][0].__setitem__("id", "table"), + "Duplicate env graph ids", + ), + ( + "duplicate constraint id", + lambda data: data["state_specs"][0]["task_constraints"][0].__setitem__("id", "table_is_anchor"), + "Duplicate env graph ids", + ), + ( + "missing task state reference", + lambda data: data["tasks"][0].__setitem__("initial_state_spec_id", "missing_state"), + "unknown state spec 'missing_state'", + ), + ( + "missing required task state spec id", + lambda data: data["tasks"][0].pop("success_state_spec_id"), + "Missing required string field 'success_state_spec_id'", + ), + ( + "old task state map", + lambda data: data["tasks"][0].__setitem__("state_specs", {"initial": "state_0", "final": "state_0"}), + "must use initial_state_spec_id and success_state_spec_id", + ), + ( + "old task state keys", + _add_old_task_state_keys, + "must use initial_state_spec_id and success_state_spec_id", + ), + ( + "missing constraint node reference", + lambda data: data["state_specs"][0]["task_constraints"][0].__setitem__("child", "missing_cube"), + "unknown child node 'missing_cube'", + ), + ( + "missing spatial parent", + lambda data: data["state_specs"][0]["spatial_constraints"][0].pop("parent"), + "Missing required string field 'parent'", + ), + ( + "old state edges wrapper", + _move_state_constraints_under_edges, + "must define spatial_constraints and task_constraints directly", + ), + ( + "missing node parent reference", + lambda data: data["nodes"][1].__setitem__("parent", "missing_background"), + "unknown parent 'missing_background'", + ), + ( + "unknown object type", + lambda data: data["nodes"][1].__setitem__("object_type", "unknown"), + "Unknown object_type 'unknown'", + ), + ( + "unknown node type", + lambda data: data["nodes"][0].__setitem__("type", "unknown"), + "Unknown type 'unknown'", + ), + ( + "unknown spatial constraint type", + lambda data: data["state_specs"][0]["spatial_constraints"][0].__setitem__("type", "unknown"), + "Unknown type 'unknown'", + ), + ( + "invalid at_pose position", + lambda data: data["state_specs"][0]["spatial_constraints"].append( + _at_pose_constraint(position_xyz=[0.1, 0.2]) + ), + "Field 'position_xyz' must contain 3 numbers", + ), + ] + + for label, mutate, error_match in cases: + data = _minimal_env_graph_data() + mutate(data) + + try: + ArenaEnvGraphSpec.from_dict(data) + except AssertionError as exc: + assert error_match in str(exc), label + else: + raise AssertionError(f"{label}: expected AssertionError") + + +def _minimal_env_graph_data(): + return { + "env_name": "minimal_env_graph", + "nodes": [ + {"id": "robot", "name": "robot", "type": "embodiment"}, + {"id": "table", "name": "table", "type": "objectReference"}, + {"id": "cube", "name": "cube", "type": "object"}, + ], + "tasks": [{ + "id": "task_0", + "name": "task_0", + "type": "pick_and_place", + "initial_state_spec_id": "state_0", + "success_state_spec_id": "state_0", + }], + "state_specs": [{ + "id": "state_0", + "name": "state_0", + "spatial_constraints": [{"id": "table_is_anchor", "type": "is_anchor", "parent": "table"}], + "task_constraints": [{ + "id": "robot_reach_cube", + "type": "reach", + "parent": "robot", + "child": "cube", + }], + }], + } + + +def _at_pose_constraint(position_xyz=None, rotation_xyzw=None): + return { + "id": "cube_fixed_pose", + "type": "at_pose", + "parent": "cube", + "params": { + "position_xyz": [0.1, 0.2, 0.3] if position_xyz is None else position_xyz, + "rotation_xyzw": [0.0, 0.0, 0.0, 1.0] if rotation_xyzw is None else rotation_xyzw, + }, + } + + +def _add_old_task_state_keys(data): + data["tasks"][0]["initial_state_spec"] = "state_0" + data["tasks"][0]["success_state_spec"] = "state_0" + + +def _move_state_constraints_under_edges(data): + state_spec = data["state_specs"][0] + state_spec["edges"] = { + "spatial_constraints": state_spec.pop("spatial_constraints"), + "task_constraints": state_spec.pop("task_constraints"), + } diff --git a/isaaclab_arena/tests/test_data/pick_and_place_maple_table_env_graph.yaml b/isaaclab_arena/tests/test_data/pick_and_place_maple_table_env_graph.yaml index 84a932768..537b42e1c 100644 --- a/isaaclab_arena/tests/test_data/pick_and_place_maple_table_env_graph.yaml +++ b/isaaclab_arena/tests/test_data/pick_and_place_maple_table_env_graph.yaml @@ -3,7 +3,7 @@ # # SPDX-License-Identifier: Apache-2.0 -name: pick_and_place_maple_table_default +env_name: pick_and_place_maple_table_default nodes: - id: droid_abs_joint_pos @@ -33,9 +33,8 @@ tasks: - id: pick_and_place_0 name: pick_and_place_0 type: pick_and_place - state_specs: - initial: state_spec_0 - final: state_spec_1 + initial_state_spec_id: state_spec_0 + success_state_spec_id: state_spec_1 task_args: object: rubiks_cube_hot3d_robolab destination: bowl_ycb_robolab @@ -45,75 +44,73 @@ tasks: state_specs: - id: state_spec_0 name: state_spec_0 - edges: - spatial_constraints: - - id: maple_table_robolab_table_is_anchor - type: is_anchor - parent: maple_table_robolab_table - - - id: rubiks_cube_hot3d_robolab_on_maple_table_robolab_table - type: "on" - parent: maple_table_robolab_table - child: rubiks_cube_hot3d_robolab - - - id: rubiks_cube_hot3d_robolab_position_limits - type: position_limits - parent: rubiks_cube_hot3d_robolab - params: - x_min: 0.55 - x_max: 0.70 - y_min: -0.40 - y_max: -0.10 - - - id: bowl_ycb_robolab_on_maple_table_robolab_table - type: "on" - parent: maple_table_robolab_table - child: bowl_ycb_robolab - - - id: bowl_ycb_robolab_position_limits - type: position_limits - parent: bowl_ycb_robolab - params: - x_min: 0.55 - x_max: 0.70 - y_min: -0.40 - y_max: -0.10 - - task_constraints: - - id: droid_reach_rubiks_cube_hot3d_robolab - type: "reach" - parent: droid_abs_joint_pos - child: rubiks_cube_hot3d_robolab + spatial_constraints: + - id: state_spec_0_maple_table_robolab_table_is_anchor + type: is_anchor + parent: maple_table_robolab_table + + - id: state_spec_0_rubiks_cube_hot3d_robolab_on_maple_table_robolab_table + type: "on" + parent: maple_table_robolab_table + child: rubiks_cube_hot3d_robolab + + - id: state_spec_0_rubiks_cube_hot3d_robolab_position_limits + type: position_limits + parent: rubiks_cube_hot3d_robolab + params: + x_min: 0.55 + x_max: 0.70 + y_min: -0.40 + y_max: -0.10 + + - id: state_spec_0_bowl_ycb_robolab_on_maple_table_robolab_table + type: "on" + parent: maple_table_robolab_table + child: bowl_ycb_robolab + + - id: state_spec_0_bowl_ycb_robolab_position_limits + type: position_limits + parent: bowl_ycb_robolab + params: + x_min: 0.55 + x_max: 0.70 + y_min: -0.40 + y_max: -0.10 + + task_constraints: + - id: state_spec_0_droid_reach_rubiks_cube_hot3d_robolab + type: "reach" + parent: droid_abs_joint_pos + child: rubiks_cube_hot3d_robolab - id: state_spec_1 name: state_spec_1 - edges: - spatial_constraints: - - id: maple_table_robolab_table_is_anchor - type: is_anchor - parent: maple_table_robolab_table - - - id: bowl_ycb_robolab_on_maple_table_robolab_table - type: "on" - parent: maple_table_robolab_table - child: bowl_ycb_robolab - - - id: bowl_ycb_robolab_position_limits - type: position_limits - parent: bowl_ycb_robolab - params: - x_min: 0.55 - x_max: 0.70 - y_min: -0.40 - y_max: -0.10 - - - id: rubiks_cube_hot3d_robolab_in_bowl_ycb_robolab - type: "in" - parent: bowl_ycb_robolab - child: rubiks_cube_hot3d_robolab - - task_constraints: - - id: droid_reach_bowl_ycb_robolab - type: "reach" - parent: droid_abs_joint_pos - child: bowl_ycb_robolab + spatial_constraints: + - id: state_spec_1_maple_table_robolab_table_is_anchor + type: is_anchor + parent: maple_table_robolab_table + + - id: state_spec_1_bowl_ycb_robolab_on_maple_table_robolab_table + type: "on" + parent: maple_table_robolab_table + child: bowl_ycb_robolab + + - id: state_spec_1_bowl_ycb_robolab_position_limits + type: position_limits + parent: bowl_ycb_robolab + params: + x_min: 0.55 + x_max: 0.70 + y_min: -0.40 + y_max: -0.10 + + - id: state_spec_1_rubiks_cube_hot3d_robolab_in_bowl_ycb_robolab + type: "in" + parent: bowl_ycb_robolab + child: rubiks_cube_hot3d_robolab + + task_constraints: + - id: state_spec_1_droid_reach_bowl_ycb_robolab + type: "reach" + parent: droid_abs_joint_pos + child: bowl_ycb_robolab diff --git a/isaaclab_arena/tests/test_env_graph_spec.py b/isaaclab_arena/tests/test_env_graph_spec.py deleted file mode 100644 index 861d6ec15..000000000 --- a/isaaclab_arena/tests/test_env_graph_spec.py +++ /dev/null @@ -1,165 +0,0 @@ -# Copyright (c) 2026, The Isaac Lab Arena Project Developers (https://github.com/isaac-sim/IsaacLab-Arena/blob/main/CONTRIBUTORS.md). -# All rights reserved. -# -# SPDX-License-Identifier: Apache-2.0 - -from copy import deepcopy -from pathlib import Path - -import pytest - -from isaaclab_arena.assets.object_base import ObjectType -from isaaclab_arena.environments.env_graph_spec import ( - EnvGraphNodeType, - EnvGraphSpatialConstraintType, - EnvGraphSpec, - EnvGraphStateSpec, -) - -TEST_DATA_DIR = Path(__file__).parent / "test_data" - - -def test_env_graph_spec_loads_pick_and_place_yaml(): - spec = EnvGraphSpec.from_yaml(TEST_DATA_DIR / "pick_and_place_maple_table_env_graph.yaml") - - assert spec.name == "pick_and_place_maple_table_default" - assert len(spec.nodes) == 5 - assert len(spec.tasks) == 1 - assert len(spec.state_specs) == 2 - - table = spec.nodes_by_id["maple_table_robolab_table"] - assert table.type == EnvGraphNodeType.OBJECT_REFERENCE - assert table.parent == "maple_table_robolab" - assert table.prim_path == "{ENV_REGEX_NS}/maple_table_robolab/table" - assert table.object_type == ObjectType.RIGID - - assert spec.nodes_by_id["rubiks_cube_hot3d_robolab"].type == EnvGraphNodeType.OBJECT - assert spec.nodes_by_id["bowl_ycb_robolab"].type == EnvGraphNodeType.OBJECT - - task = spec.tasks_by_id["pick_and_place_0"] - assert task.state_specs == {"initial": "state_spec_0", "final": "state_spec_1"} - assert task.task_args["object"] == "rubiks_cube_hot3d_robolab" - assert task.task_args["destination"] == "bowl_ycb_robolab" - assert task.task_args["episode_length_s"] == 20.0 - - initial_state = spec.state_specs_by_id["state_spec_0"] - assert isinstance(initial_state, EnvGraphStateSpec) - assert len(initial_state.edges.spatial_constraints) == 5 - assert len(initial_state.edges.task_constraints) == 1 - - cube_limits = initial_state.edges.spatial_constraints[2] - assert cube_limits.type == EnvGraphSpatialConstraintType.POSITION_LIMITS - assert cube_limits.parent == "rubiks_cube_hot3d_robolab" - assert cube_limits.child is None - assert cube_limits.params == { - "x_min": 0.55, - "x_max": 0.70, - "y_min": -0.40, - "y_max": -0.10, - } - - final_state = spec.state_specs_by_id["state_spec_1"] - assert isinstance(final_state, EnvGraphStateSpec) - in_constraint = final_state.edges.spatial_constraints[3] - assert in_constraint.type == EnvGraphSpatialConstraintType.IN - assert in_constraint.parent == "bowl_ycb_robolab" - assert in_constraint.child == "rubiks_cube_hot3d_robolab" - - reach_constraint = final_state.edges.task_constraints[0] - assert reach_constraint.type == "reach" - assert reach_constraint.parent == "droid_abs_joint_pos" - assert reach_constraint.child == "bowl_ycb_robolab" - - -def test_env_graph_spec_rejects_duplicate_ids(): - data = _minimal_env_graph_data() - data["nodes"].append({"id": "table", "name": "duplicate_table", "type": "objectReference"}) - - with pytest.raises(AssertionError, match="Duplicate node ids"): - EnvGraphSpec.from_dict(data) - - -def test_env_graph_spec_rejects_missing_task_state_reference(): - data = _minimal_env_graph_data() - data["tasks"][0]["state_specs"]["initial"] = "missing_state" - - with pytest.raises(AssertionError, match="unknown state spec 'missing_state'"): - EnvGraphSpec.from_dict(data) - - -def test_env_graph_spec_rejects_missing_constraint_node_reference(): - data = _minimal_env_graph_data() - data["state_specs"][0]["edges"]["task_constraints"][0]["child"] = "missing_cube" - - with pytest.raises(AssertionError, match="unknown child node 'missing_cube'"): - EnvGraphSpec.from_dict(data) - - -def test_env_graph_spec_rejects_missing_spatial_constraint_parent(): - data = _minimal_env_graph_data() - del data["state_specs"][0]["edges"]["spatial_constraints"][0]["parent"] - - with pytest.raises(AssertionError, match="Missing required string field 'parent'"): - EnvGraphSpec.from_dict(data) - - -def test_env_graph_spec_rejects_missing_node_parent_reference(): - data = _minimal_env_graph_data() - data["nodes"][1]["parent"] = "missing_background" - - with pytest.raises(AssertionError, match="unknown parent 'missing_background'"): - EnvGraphSpec.from_dict(data) - - -def test_env_graph_spec_rejects_unknown_object_type(): - data = _minimal_env_graph_data() - data["nodes"][1]["object_type"] = "unknown" - - with pytest.raises(AssertionError, match="Unknown object_type 'unknown'"): - EnvGraphSpec.from_dict(data) - - -def test_env_graph_spec_rejects_unknown_node_type(): - data = _minimal_env_graph_data() - data["nodes"][0]["type"] = "unknown" - - with pytest.raises(AssertionError, match="Unknown type 'unknown'"): - EnvGraphSpec.from_dict(data) - - -def test_env_graph_spec_rejects_unknown_spatial_constraint_type(): - data = _minimal_env_graph_data() - data["state_specs"][0]["edges"]["spatial_constraints"][0]["type"] = "unknown" - - with pytest.raises(AssertionError, match="Unknown type 'unknown'"): - EnvGraphSpec.from_dict(data) - - -def _minimal_env_graph_data(): - return deepcopy({ - "name": "minimal_env_graph", - "nodes": [ - {"id": "robot", "name": "robot", "type": "embodiment"}, - {"id": "table", "name": "table", "type": "objectReference"}, - {"id": "cube", "name": "cube", "type": "object"}, - ], - "tasks": [{ - "id": "task_0", - "name": "task_0", - "type": "pick_and_place", - "state_specs": {"initial": "state_0"}, - }], - "state_specs": [{ - "id": "state_0", - "name": "state_0", - "edges": { - "spatial_constraints": [{"id": "table_is_anchor", "type": "is_anchor", "parent": "table"}], - "task_constraints": [{ - "id": "robot_reach_cube", - "type": "reach", - "parent": "robot", - "child": "cube", - }], - }, - }], - }) From 5ad0851279da2021e8a1c3d68f59e48f614f2b14 Mon Sep 17 00:00:00 2001 From: Xinjie Yao Date: Thu, 21 May 2026 16:19:08 -0700 Subject: [PATCH 7/8] comment --- isaaclab_arena/environments/arena_env_graph_spec.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/isaaclab_arena/environments/arena_env_graph_spec.py b/isaaclab_arena/environments/arena_env_graph_spec.py index 792ec6df3..4a5717927 100644 --- a/isaaclab_arena/environments/arena_env_graph_spec.py +++ b/isaaclab_arena/environments/arena_env_graph_spec.py @@ -54,7 +54,7 @@ class ArenaEnvGraphNodeSpec: """ id: str - name: str + name: str # Name registered in the asset registry type: ArenaEnvGraphNodeType parent: str | None = None # Optional, only need for object references prim_path: str | None = None # Optional, only need for object references From da8e825953a04d0ef27a0fc209ed498773a4c6f6 Mon Sep 17 00:00:00 2001 From: Qian Lin Date: Fri, 22 May 2026 10:08:17 +0800 Subject: [PATCH 8/8] lint --- isaaclab_arena/environments/arena_env_graph_spec.py | 9 ++++----- isaaclab_arena/environments/utils.py | 6 +++--- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/isaaclab_arena/environments/arena_env_graph_spec.py b/isaaclab_arena/environments/arena_env_graph_spec.py index 4a5717927..1dbf5f592 100644 --- a/isaaclab_arena/environments/arena_env_graph_spec.py +++ b/isaaclab_arena/environments/arena_env_graph_spec.py @@ -3,13 +3,12 @@ # # SPDX-License-Identifier: Apache-2.0 +import yaml from dataclasses import dataclass, field from enum import Enum from pathlib import Path from typing import Any -import yaml - from isaaclab_arena.assets.object_base import ObjectType from isaaclab_arena.environments.utils import ( as_dict, @@ -37,8 +36,8 @@ class ArenaEnvGraphSpatialConstraintType(Enum): IS_ANCHOR = "is_anchor" NEXT_TO = "next_to" ON = "on" - AT_POSE = "at_pose" # through set_initial_pose() - AT_POSITION = "at_position" # through object relation solver: AtPosition + AT_POSE = "at_pose" # through set_initial_pose() + AT_POSITION = "at_position" # through object relation solver: AtPosition POSITION_LIMITS = "position_limits" RANDOM_AROUND_SOLUTION = "random_around_solution" ROTATE_AROUND_SOLUTION = "rotate_around_solution" @@ -54,7 +53,7 @@ class ArenaEnvGraphNodeSpec: """ id: str - name: str # Name registered in the asset registry + name: str # Name registered in the asset registry type: ArenaEnvGraphNodeType parent: str | None = None # Optional, only need for object references prim_path: str | None = None # Optional, only need for object references diff --git a/isaaclab_arena/environments/utils.py b/isaaclab_arena/environments/utils.py index fcc9cc1d0..6942b345b 100644 --- a/isaaclab_arena/environments/utils.py +++ b/isaaclab_arena/environments/utils.py @@ -42,9 +42,9 @@ def required_number_sequence(data: dict[str, Any], key: str, length: int) -> tup value = data.get(key) assert isinstance(value, (list, tuple)), f"Missing required numeric sequence field '{key}'" assert len(value) == length, f"Field '{key}' must contain {length} numbers" - assert all(isinstance(item, Real) and not isinstance(item, bool) for item in value), ( - f"Field '{key}' must contain only numbers" - ) + assert all( + isinstance(item, Real) and not isinstance(item, bool) for item in value + ), f"Field '{key}' must contain only numbers" return tuple(float(item) for item in value)