Skip to content

fix(shuffle): make shuffle service actors idempotent and enable fault tolerance#472

Open
my-vegetable-has-exploded wants to merge 3 commits into
ray-project:masterfrom
my-vegetable-has-exploded:idempotent-ess-fork
Open

fix(shuffle): make shuffle service actors idempotent and enable fault tolerance#472
my-vegetable-has-exploded wants to merge 3 commits into
ray-project:masterfrom
my-vegetable-has-exploded:idempotent-ess-fork

Conversation

@my-vegetable-has-exploded

Copy link
Copy Markdown
Contributor

Motivation

Prevent duplicate shuffle service actors on the same node by assigning unique names and reusing existing actors. Also enable fault tolerance with max restarts and task retries, and remove the now-unnecessary explicit start call.

Approach

  • Introduce a named actor pattern in ExternalShuffleServiceUtils.createShuffleService(): generate a unique name raydp-shuffle-service-<ip> based on the node IP
  • Call Ray.getActor(actorName) first — if an actor already exists, return the existing handle instead of creating a new one
  • Set setMaxRestarts(-1) / setMaxTaskRetries(-1) to enable automatic fault recovery at the Ray level
  • Remove the separate startShuffleService() method; instead, call start() inside the RayExternalShuffleService constructor so the service auto-starts on creation
  • Simplify the shuffle service creation logic in RayAppMaster when an executor registers, removing the redundant explicit start call

Prevent duplicate shuffle service actors on the same node by assigning unique names and reusing existing actors. Also enable fault tolerance with max restarts and task retries, and remove the now-unnecessary explicit start call.

Signed-off-by: epsilonwang <epsilonwang@didiglobal.com>

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR makes the RayDP external shuffle service actor named and reusable per node to avoid duplicate shuffle service actors, and enables Ray-level fault recovery by configuring unlimited restarts and task retries.

Changes:

  • Create/reuse a named shuffle service actor via Ray.getActor(name) before creating a new actor.
  • Auto-start the shuffle service inside RayExternalShuffleService construction and remove the explicit start call path from RayAppMaster.
  • Configure the shuffle service actor with setMaxRestarts(-1) and setMaxTaskRetries(-1) for fault tolerance.

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.

File Description
core/raydp-main/src/main/scala/org/apache/spark/deploy/raydp/RayExternalShuffleService.scala Auto-start shuffle service upon actor construction.
core/raydp-main/src/main/scala/org/apache/spark/deploy/raydp/RayAppMaster.scala Simplifies shuffle-service startup logic during executor registration (no explicit start task).
core/raydp-main/src/main/java/org/apache/spark/deploy/raydp/ExternalShuffleServiceUtils.java Introduces named-actor reuse and configures actor restart/task retry behavior.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread core/raydp-main/src/main/scala/org/apache/spark/deploy/raydp/RayAppMaster.scala Outdated
- Mark RayExternalShuffleService.start() as final to prevent overridable
  method call from constructor pitfall
- Change log message from 'Starting shuffle service' to 'Ensuring shuffle
  service' to accurately reflect that the actor may already exist
@my-vegetable-has-exploded

Copy link
Copy Markdown
Contributor Author

need to retrigger ci.

@my-vegetable-has-exploded my-vegetable-has-exploded changed the title fix(shuffle): make shuffle service actors named and idempotent fix(shuffle): make shuffle service actors idempotent and enable fault tolerance May 19, 2026

@pang-wu pang-wu left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@my-vegetable-has-exploded Thanks for the proposal, can you elaborate more on what problem we are trying to solve? We do want to support two spark cluster instance running on the same ray cluster, in this case we do want two actors

@my-vegetable-has-exploded

Copy link
Copy Markdown
Contributor Author

Thanks for review.@pang-wu

can you elaborate more on what problem we are trying to solve?

The main problem is that ess can't restart automatically if certain pod restarts. So we add set maxrestart and maxretry for ess actor.

We do want to support two spark cluster instance running on the same ray cluster, in this case we do want two actors

I think it will be great if we can support two or more spark cluster instance running on the same ray cluster. In my opinion, it still make sense that one pod have only one ess. Ess use appId、execId to distinguish different block, so executor from different can still fetch corresponding block using unqie appId、execId.

To support two or more spark cluster, I think we need to separate shuffle block clean (Once application finished) and ess shutdown (Ray cluster down). Btw, I am interesting in proposal for support more spark cluster instance running on the same ray cluster, it will be great if I can participate in related discussion.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants