Skip to content

Support parallel envs in Pi0RemotePolicy#688

Merged
cvolkcvolk merged 2 commits into
mainfrom
cvolk/feature/openpi-parallel-envs
May 20, 2026
Merged

Support parallel envs in Pi0RemotePolicy#688
cvolkcvolk merged 2 commits into
mainfrom
cvolk/feature/openpi-parallel-envs

Conversation

@cvolkcvolk
Copy link
Copy Markdown
Collaborator

@cvolkcvolk cvolkcvolk commented May 20, 2026

Summary

Arena rollouts can now request num_envs > 1 instead of being restricted to single-env.

  • Loop over envs on the Arena client side, with a per-env action-chunk cache and step counter (openpi's format is still one obs per request).
  • reset(env_ids) clears only the listed envs' caches, so per-env termination leaves untouched envs replaying uninterrupted.

Replace the num_envs==1 assert with a per-env action cache and a loop
over envs in get_action. openpi's wire format is one obs per request,
so this still issues one server call per env per chunk refill; the
correctness payoff is that any rollout (policy_runner or eval_runner)
can now request num_envs>1 instead of being silently single-env.

Concretely:
- Pi0EmbodimentAdapter.extract(observation) gains an env_id parameter
  so adapters slice one env at a time. Pi0DroidAdapter swaps its
  hardcoded [0] indexing for [env_id]. This is a breaking change for
  any external Pi0EmbodimentAdapter subclass.
- Pi0RemotePolicy keeps per-env _cached_action_chunks and
  _next_chunk_steps lists, lazy-allocated on the first get_action when
  num_envs is observable. Mid-rollout num_envs changes assert.
- reset(env_ids) clears only the listed envs' caches, so per-env
  termination (the common case in parallel rollouts) leaves untouched
  envs replaying their cached chunks uninterrupted.
- Connection-drop reconnect still flushes all per-env caches to
  preserve the prior "stale server state" defensive behaviour.

Tests cover the num_envs>1 loop, partial reset semantics, and the
reconnect-flush behaviour; the single-env tests are unchanged.

Signed-off-by: Clemens Volk <cvolk@nvidia.com>
Copy link
Copy Markdown
Contributor

@isaaclab-review-bot isaaclab-review-bot Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review for PR #688: Support parallel envs in Pi0RemotePolicy

Thank you for this well-structured PR! The implementation cleanly extends Pi0RemotePolicy to support parallel environments while maintaining backward compatibility for the single-env case. Here are my findings:

✅ Strengths

  1. Well-documented breaking change: The Pi0EmbodimentAdapter.extract() interface change is clearly noted in the PR description and commit message.
  2. Efficient lazy allocation: Per-env caches are only allocated on first get_action() call when num_envs is observable.
  3. Clean partial reset semantics: reset(env_ids) correctly preserves caches for untouched envs, which is important for per-env termination in parallel rollouts.
  4. Comprehensive test coverage: New tests for parallel envs, partial reset, and existing reconnect behavior.

🔄 Update (2026-05-20)

Reviewed new commit ae326000. Changes since previous review:

Fixed: 0-dim env_ids tensor handling in reset() - now uses .reshape(-1).tolist() as suggested.

Remaining: The P1 concern about step counters being mutated before exception rollback (line 175) was not addressed in this commit. This is a minor edge case for non-websocket exceptions.

Overall, looking good! 🎉

@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented May 20, 2026

Greptile Summary

This PR extends Pi0RemotePolicy to support num_envs > 1 by replacing the single-env action-chunk cache with a per-env list of caches and step counters, lazily initialised on the first get_action call. The Pi0EmbodimentAdapter.extract() interface gains an env_id parameter so each env's observation slice can be extracted independently, since the openpi server still accepts one observation per request.

  • get_action loops over all envs, fetches a fresh chunk per-env on cache miss, and stacks the selected rows into an (num_envs, action_dim) tensor.
  • reset(env_ids) now clears only the listed envs' caches so partial-episode resets leave unaffected envs replaying their cached chunks uninterrupted.

Confidence Score: 3/5

Safe to merge for single-env workloads; multi-env rollouts that hit a websocket reconnect mid-loop will receive an action batch that mixes pre-reconnect and post-reconnect model outputs for different envs in the same step.

The reconnect flush in _call_server_with_retry wipes all per-env caches and step counters while get_action is still mid-loop. Envs processed before the reconnect have their actions already staged in a local list, but their caches and counters are reset to 0, while the env that triggered the reconnect gets a fresh chunk. The resulting action tensor has incoherent provenance across envs, and step counters diverge from what was actually returned.

isaaclab_arena_openpi/policy/pi0_remote_policy.py — specifically the interaction between the per-env loop in get_action and the all-env flush in _call_server_with_retry.

Important Files Changed

Filename Overview
isaaclab_arena_openpi/policy/pi0_remote_policy.py Core policy rewritten to support parallel envs with per-env chunk caches; reconnect flush during the get_action loop can produce a mixed stale/fresh action batch across envs.
isaaclab_arena_openpi/policy/droid_adapter.py Adapter's extract() updated to accept env_id and index into the per-env tensor dimension; straightforward change with no issues found.
isaaclab_arena_openpi/tests/test_pi0_remote_policy.py Two new tests cover multi-env looping and selective reset; existing tests updated to match new API. Reconnect-during-loop scenario is not covered, which is consistent with the live defect above.

Sequence Diagram

sequenceDiagram
    participant Caller
    participant Pi0RemotePolicy
    participant Pi0EmbodimentAdapter
    participant OpenPiServer

    Caller->>Pi0RemotePolicy: get_action(env, obs)
    Pi0RemotePolicy->>Pi0RemotePolicy: _maybe_init_per_env_state(N)
    loop for env_id in 0..N-1
        alt cache miss
            Pi0RemotePolicy->>Pi0EmbodimentAdapter: extract(obs, env_id)
            Pi0EmbodimentAdapter-->>Pi0RemotePolicy: extracted obs
            Pi0RemotePolicy->>Pi0EmbodimentAdapter: pack_request(extracted, task)
            Pi0EmbodimentAdapter-->>Pi0RemotePolicy: wire-format request
            Pi0RemotePolicy->>OpenPiServer: infer(request)
            OpenPiServer-->>Pi0RemotePolicy: actions (H, action_dim)
            Pi0RemotePolicy->>Pi0RemotePolicy: store chunk, reset step to 0
        end
        Pi0RemotePolicy->>Pi0RemotePolicy: append chunk row, increment step
    end
    Pi0RemotePolicy->>Pi0RemotePolicy: np.stack into (N, action_dim)
    Pi0RemotePolicy-->>Caller: torch.Tensor shape (N, action_dim)

    Caller->>Pi0RemotePolicy: reset(env_ids)
    Pi0RemotePolicy->>Pi0RemotePolicy: clear selected env caches and counters
Loading

Comments Outside Diff (1)

  1. isaaclab_arena_openpi/policy/pi0_remote_policy.py, line 227-252 (link)

    P1 Reconnect mid-loop flushes already-processed envs, producing a mixed stale/fresh action batch

    _call_server_with_retry is called once per env inside the get_action loop. When a reconnect fires while processing env k, the flush at lines 250–252 sets _cached_action_chunks[i] = None and _next_chunk_steps[i] = 0 for all envs — including envs 0..k-1 whose actions are already sitting in the local actions list. The method then continues: _fetch_action_chunk returns the fresh post-reconnect chunk for env k, which is correctly stored and appended. But np.stack(actions) assembles a batch that mixes pre-reconnect rows for envs 0..k-1 with a post-reconnect row for env k.

    The cache inconsistency also diverges from what was returned: _next_chunk_steps[0..k-1] are now 0 (reset by flush) while those envs already "consumed" row 0 into the returned action batch, so the next get_action call will replay row 0 again for them rather than advancing to row 1.

    A straightforward guard is to detect whether a reconnect occurred during the loop (e.g., a boolean flag set in the flush block) and, if so, abort the current get_action early — returning a zeroed-out batch or re-invoking a fresh get_action — so callers never receive a mixed-state batch.

Reviews (2): Last reviewed commit: "Update isaaclab_arena_openpi/policy/pi0_..." | Re-trigger Greptile

Comment thread isaaclab_arena_openpi/policy/pi0_remote_policy.py
Comment thread isaaclab_arena_openpi/policy/pi0_remote_policy.py Outdated
Copy link
Copy Markdown
Collaborator

@alexmillane alexmillane left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clean! Thanks!

Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
@cvolkcvolk cvolkcvolk enabled auto-merge (squash) May 20, 2026 15:41
@cvolkcvolk cvolkcvolk merged commit 55c501c into main May 20, 2026
5 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants