feat(infra): Support for proxy server through RayScheduler#1161
feat(infra): Support for proxy server through RayScheduler#1161hlyli wants to merge 6 commits intoinclusionAI:mainfrom
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces a base RayServer class and a new RayHTTPLauncher to support launching proxy and HTTP servers within Ray actors, enabling proxy worker support in the RayScheduler. Key changes include refactoring RayRPCServer, updating the scheduler to handle forked workers with custom commands, and optimizing resource allocations for internal Ray tasks. Review feedback highlights several areas for improvement: the HTTP retry logic in RayHTTPLauncher lacks proper exception handling and robust status code checks, the rpc_meta parameter is currently ignored in remote calls, and the heuristic for selecting the actor class is considered fragile. Additionally, there is dead code in the launcher's cleanup logic and opportunities to modernize Ray API usage.
| if command and "rpc.rpc_server" not in command: | ||
| actor_cls = RayHTTPLauncher | ||
| else: | ||
| actor_cls = RayRPCServer |
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces a base RayServer class and a new RayHTTPLauncher to support launching HTTP-based proxy servers within Ray actors. It also updates the RayScheduler to handle these forked HTTP workers and removes previous restrictions on proxy workers in Ray environments. Feedback includes concerns regarding subprocess output handling in Ray, incorrect retry logic for client-side HTTP errors, and potential blocking of the actor thread during retries. Additionally, there are suggestions to improve the robustness of actor class selection and to revert a change in default GPU allocation for forked workers to avoid regressions.
| self.worker_process = subprocess.Popen( | ||
| cmd, env=_env, stdout=sys.stdout, stderr=subprocess.STDOUT | ||
| ) |
There was a problem hiding this comment.
Using sys.stdout as the stdout argument for subprocess.Popen within a Ray actor can be problematic. In many Ray environments, sys.stdout is redirected to a custom stream object that does not have a valid file descriptor (fileno()), which will cause Popen to raise an UnsupportedOperation or ValueError. It is safer to use None to inherit the standard output or redirect to a specific log file.
| self.worker_process = subprocess.Popen( | |
| cmd, env=_env, stdout=sys.stdout, stderr=subprocess.STDOUT | |
| ) | |
| self.worker_process = subprocess.Popen( | |
| cmd, env=_env, stdout=None, stderr=subprocess.STDOUT | |
| ) |
There was a problem hiding this comment.
I didn't face such an issue when running with Ray. I can change to None if needed.
| f"(attempt {attempt}/{max_retries}): {last_error}. " | ||
| f"Retrying in {delay:.1f}s..." | ||
| ) | ||
| time.sleep(delay) |
There was a problem hiding this comment.
Using time.sleep inside a Ray actor method blocks the actor's execution thread. This prevents the actor from responding to other incoming messages, such as ping() or destroy(), which could lead to unnecessary timeouts in the scheduler. If the actor is intended to be responsive during retries, consider using asyncio.sleep (if the actor is async) or reducing the retry delay.
| if command and "rpc.rpc_server" not in command: | ||
| actor_cls = RayHTTPLauncher | ||
| else: | ||
| actor_cls = RayRPCServer |
There was a problem hiding this comment.
The logic to select the actor class based on the presence of "rpc.rpc_server" in the command string is brittle. This assumes that any forked worker not using the default RPC server module must be an HTTP server. It would be more robust to explicitly pass the desired launcher type or use a more reliable detection mechanism.
| schedulings.append(SchedulingSpec(cpu=0, mem=0, gpu=1, port_count=1)) | ||
| # use 0 gpu to prevent any scheduling issues since forks so far only use cpu | ||
| # future forks that require gpu should change fork implementation to accept a scheduling spec | ||
| schedulings.append(SchedulingSpec(cpu=0, mem=0, gpu=0, port_count=1)) |
There was a problem hiding this comment.
Changing the default GPU allocation for forked workers from 1 to 0 is a regression for any forked workers that require GPU access (e.g., a reference model forked from an actor to share GPU memory). If gpu=0 is specified, Ray will not set the CUDA_VISIBLE_DEVICES environment variable for the forked actor, which may cause it to fail or incorrectly access all GPUs on the node. While this works for the current proxy server use case, it breaks the fork_workers API for other GPU-dependent tasks.
| schedulings.append(SchedulingSpec(cpu=0, mem=0, gpu=0, port_count=1)) | |
| schedulings.append(SchedulingSpec(cpu=0, mem=0, gpu=1, port_count=1)) |
There was a problem hiding this comment.
Will leave this for a future implementation to support gpu colocation for forked workers. Current proxy is colocated with rollout, which often has gpu > 1, which means gpu colocation is not supported through ray.
There was a problem hiding this comment.
Different idea - I will always use 0 gpus when forking and simply copy the device env var from the parent to the forked worker. This will prevent any scheduling issues especially for multi-gpu workers. I can address this in another PR.
garrett4wade
left a comment
There was a problem hiding this comment.
I think we can only use Ray to schedule and create workers. The RayRPCServer should be a guard or daemon process where we create the initial HTTP RPC server. Ray should not involve any futher forking, engine creation, and calling.
The create_workers method of RayScheduler should schedule and create a RayRPCServer, as in the current code. Then, upon initialization, RayRPCServer should launch a subprocess that runs @areal/infra/rpc/rpc_server.py . Then, every call to this worker will be redirected to the HTTP subprocess, which will then share the same logic with the current local and slurm scheduler.
Should we have a discussion before proceeding?
|
I think I agree with that. We also had some internal discussion within our group to possibly retire the current RayRPCServer and move to a full HTTP design so as to make maintaining Ray easier when new features are added to the RPCServer. |
|
We can close the PR for now. We agree with the idea and can discuss in this week's meeting. |
Description
This commit enables proxy servers by introducing a RayHTTPLauncher actor for forked HTTP workers. The RayHTTPLauncher will read the
commandparameter that was previously unused in the RayRPCServer and launch the command through POpen, much like how a LocalScheduler launches RPCServers. Communication between the RayHTTPLauncher and the proxy will be through HTTP as to not touch the preexisting proxy server code.An additional small tweak is that I added a
__repr__to the ray actors to also print their actor names so as to more easily distinguish between rollout instances.Coauthored with @ActuallyEdward
Related Issue
#963
Type of Change
Checklist
pre-commit run --all-files)./docs/build_all.sh)main/review-prcommand/create-prBreaking Change Details (if applicable):
Additional Context
/review-pr was done with codex
RayHTTPLauncher is only tested for the proxy server. Other HTTP-style servers may be compatible but are untested. However, I hope that this can more easily enable other types of servers.
Need help? Check the Contributing Guide or ask in
GitHub Discussions!