Support router as replica with pipelines#3721
Support router as replica with pipelines#3721Bihan wants to merge 13 commits intodstackai:masterfrom
Conversation
2fe5e14 to
bafd2d9
Compare
|
|
||
|
|
||
| class ServiceRouterWorkerSyncFetcher(Fetcher[ServiceRouterWorkerSyncPipelineItem]): | ||
| @sentry_utils.instrument_named_task("pipeline_tasks.ServiceRouterWorkerSyncFetcher.fetch") |
There was a problem hiding this comment.
I recently added @sentry_utils.instrument_pipeline_task – use it to avoid hardcoding pipeline_tasks prefix.
| run_model = sync_row.run | ||
| if run_model is None: | ||
| await session.delete(sync_row) | ||
| await session.commit() | ||
| return |
There was a problem hiding this comment.
How can run_model be None here?
There was a problem hiding this comment.
I thought what if the run row can be hard-deleted, so sync_row.run becomes None. If this is not possible we can delete this block.
There was a problem hiding this comment.
But you defined run_id as non-optional with ondelete="CASCADE" - how can it be possible?
There was a problem hiding this comment.
You are right. Maybe I delete this block.
| .options( | ||
| selectinload(RunModel.project), | ||
| selectinload(RunModel.jobs).selectinload(JobModel.project), | ||
| selectinload(RunModel.jobs) | ||
| .selectinload(JobModel.instance) | ||
| .selectinload(InstanceModel.project), | ||
| ) | ||
| ) |
There was a problem hiding this comment.
This is potentially a very inefficient select – a run can have thousands of job submissions. Select only the jobs that the processing needs, i.e. only the router replica job. Also every selectinload will be a separate query here – not sure if it's justified. joinedload may be a better suited for a one-to-one rel. Also, try to avoid loading all models's columns and use load_only to select only the necessary.
There was a problem hiding this comment.
Please check if below proposed query addresses the concerns
-
Avoid loading thousands of job submissions: no longer load RunModel.jobs unconditionally. The selectinload(RunModel.jobs.and_(...)) restricts the loaded jobs to only RUNNING + registered replicas, which are the only ones sync_router_workers_for_run_model() can use (router job selection and worker list building both ignore non‑running / unregistered jobs).
-
selectinload is intentional: RunModel.jobs is a one‑to‑many collection; using joinedload would duplicate the RunModel row per job.
-
joinedload for one‑to‑one/many‑to‑one: RunModel.project, JobModel.project, JobModel.instance, InstanceModel.project are loaded with joinedload because these are scalar relationships from from run,job and instance.
-
Use load_only: This limits columns required by
sync_router_workers_for_run_model(run_for_sync)and_get_service_replica_client(job_model)
res = await session.execute(
select(RunModel)
.where(RunModel.id == item.run_id)
.options(
load_only(RunModel.id, RunModel.run_spec),
selectinload(
RunModel.jobs.and_(
JobModel.status == JobStatus.RUNNING,
JobModel.registered == true(),
)
)
.load_only(
JobModel.id,
JobModel.status,
JobModel.registered,
JobModel.job_spec_data,
JobModel.job_provisioning_data,
JobModel.job_runtime_data,
)
.options(
joinedload(JobModel.project).load_only(ProjectModel.id, ProjectModel.ssh_private_key),
joinedload(JobModel.instance)
.load_only(InstanceModel.id, InstanceModel.remote_connection_info)
.joinedload(InstanceModel.project)
.load_only(ProjectModel.id, ProjectModel.ssh_private_key),
),
)
)
There was a problem hiding this comment.
looks good, at least at a glance
| router_jobs = [ | ||
| j | ||
| for j in run_model.jobs | ||
| if job_belongs_to_group(j, group_name) and j.status == JobStatus.RUNNING | ||
| ] | ||
| if not router_jobs or not is_replica_registered(router_jobs): | ||
| return None | ||
| return router_jobs[0] |
There was a problem hiding this comment.
Can there be multiple router jobs? If so, how does that work?
There was a problem hiding this comment.
For the first iteration, I suggest restricting the router replica group to count: 1 via configuration validation. The current sync logic effectively assumes a single active router job. We can extend this later to support multiple router replicas for HA.
| def run_spec_has_router_replica_group(run_spec: RunSpec) -> bool: | ||
| if run_spec.configuration.type != "service": | ||
| return False | ||
| cfg = run_spec.configuration | ||
| if not isinstance(cfg, ServiceConfiguration): | ||
| return False | ||
| return any(g.router is not None for g in cfg.replica_groups) | ||
|
|
||
|
|
||
| async def ensure_service_router_worker_sync_row( |
There was a problem hiding this comment.
Why put these router-speicfic functions in top of runs services.
There was a problem hiding this comment.
I kept it there because they are used by run lifecycle. Should I shift them to src/dstack/_internal/server/services/router_worker_sync.py?
There was a problem hiding this comment.
I mean at least they should not be at the top of the file.
| ], | ||
| ) | ||
| global_replica_num += 1 | ||
| await ensure_service_router_worker_sync_row(session, run_model, run_spec) |
There was a problem hiding this comment.
I think in-place update supports replicas. What happens if a user adds a router replica in in-place update if ensure_service_router_worker_sync_row() gets called only on submit_run()?
There was a problem hiding this comment.
Thanks for pointing out. I need to call ensure_service_router_worker_sync_row after this
There was a problem hiding this comment.
What happens if a user adds a router replica in in-place update
@Bihan, is this use case expected to work at all? I think it won't work with the current implementation, because adding a router replica means that only this replica should receive requests, which means that other existing replicas should be unregistered from the gateway, which doesn't seem to be implemented.
Similarly, due to the need to register or unregister existing replicas, I assume that the following use cases won't work as expected:
- Removing a router replica group.
- Adding the
routerproperty to an existing replica group. - Removing the
routerproperty from an existing replica group.
If supporting these use cases requires additional effort, I can suggest to forbid them for now (see _check_can_update_configuration). And, in that case, only call ensure_service_router_worker_sync_row here and simplify its implementation
| if not run_spec_has_router_replica_group(run_spec): | ||
| return | ||
| res = await session.execute( | ||
| select(ServiceRouterWorkerSyncModel.id).where( | ||
| ServiceRouterWorkerSyncModel.run_id == run_model.id | ||
| ) | ||
| ) | ||
| if res.scalar_one_or_none() is not None: | ||
| return |
There was a problem hiding this comment.
How can it be that ServiceRouterWorkerSyncModel already exists for a run if ensure_service_router_worker_sync_row is called only on run submit?
| return | ||
| run_model = sync_row.run | ||
| if run_model is None: | ||
| await session.delete(sync_row) |
There was a problem hiding this comment.
We generally use soft deletes in dstack server easier debugging and historical data. Assuming there will be very few ServiceRouterWorkerSyncModel rows (one per service replica router), I'd also soft-delete it for consistency.
| ) | ||
|
|
||
|
|
||
| class ServiceRouterWorkerSyncModel(PipelineModelMixin, BaseModel): |
There was a problem hiding this comment.
Let's put it somewhere in the end of the file so that "core" models come first.
| @@ -0,0 +1,49 @@ | |||
| """SSH-tunneled async HTTP client to a job's service port (same path as probes).""" | |||
There was a problem hiding this comment.
put this file in jobs services?
| @@ -0,0 +1,345 @@ | |||
| """Reconcile SGLang router /workers with dstack's registered worker replicas (async, SSH-tunneled).""" | |||
There was a problem hiding this comment.
put this file in runs services
r4victor
left a comment
There was a problem hiding this comment.
Did a quick review of the pipeline code. Haven't looked into the worker sync logic.
e155d17 to
7b268cb
Compare
src/dstack/_internal/server/services/job_replica_http_client.py
Outdated
Show resolved
Hide resolved
| async def _stream_response_body_bytes(resp: Response, max_bytes: int) -> bytes: | ||
| buf = bytearray() | ||
| async for chunk in resp.aiter_bytes(): | ||
| buf.extend(chunk) | ||
| if len(buf) > max_bytes: | ||
| raise _ResponseTooLargeError() | ||
| return bytes(buf) |
There was a problem hiding this comment.
(nit) We have the join_byte_stream_checked function that appears to do the same thing
src/dstack/_internal/server/background/pipeline_tasks/service_router_worker_sync.py
Show resolved
Hide resolved
src/dstack/_internal/server/background/pipeline_tasks/service_router_worker_sync.py
Outdated
Show resolved
Hide resolved
src/dstack/_internal/server/background/pipeline_tasks/service_router_worker_sync.py
Outdated
Show resolved
Hide resolved
src/dstack/_internal/server/background/pipeline_tasks/service_router_worker_sync.py
Outdated
Show resolved
Hide resolved
src/dstack/_internal/server/services/runs/router_worker_sync.py
Outdated
Show resolved
Hide resolved
src/dstack/_internal/server/services/runs/router_worker_sync.py
Outdated
Show resolved
Hide resolved
src/dstack/_internal/server/services/runs/router_worker_sync.py
Outdated
Show resolved
Hide resolved
src/dstack/_internal/server/services/runs/router_worker_sync.py
Outdated
Show resolved
Hide resolved
src/dstack/_internal/server/background/pipeline_tasks/service_router_worker_sync.py
Outdated
Show resolved
Hide resolved
3bc04df to
8fe01e5
Compare
There was a problem hiding this comment.
(nit) In my view, blog posts should generally remain unchanged, as they are timestamped and serve a historical purpose. As a reader, I wouldn't expect their content to change significantly over time.
I would keep the blog post as is, but add a note at the top indicating that gateway routers are deprecated, along with a reference to the relevant replica-group routers docs or examples.
| !!! note "Deprecation" | ||
| Configuring the SGLang router in a gateway will be deprecated in a future release. |
There was a problem hiding this comment.
(nit) I'd put this at the top of the Router section, so that users don't have to read all of it before they find out that it's irrelevant.
Or even remove the section.
Also:
will be deprecated in a future release
More like is deprecated and will be disallowed in a future release?
|
|
||
| <!-- TODO: Gateway creation using fleets is coming to simplify this. --> | ||
| !!! note "Gateway-based routing (deprecated)" | ||
| If you create a gateway with the [`sglang` router](https://dstack.ai/docs/concepts/gateways/#sglang), you can also run SGLang with PD disaggregation. This method will be deprecated in the future in favor of running the router as a replica. |
There was a problem hiding this comment.
(nit)
will be deprecated in the future
More like is deprecated and will be disallowed in the future?
| #### SSH fleet | ||
|
|
||
| For example, if you run services on the `kubernetes` backend, make sure to also create the gateway in the same backend: | ||
| Create an [SSH fleet](https://dstack.ai/docs/concepts/fleets/#apply-a-configuration) that includes one CPU host for the router and one or more GPU hosts for the workers. Make sure the CPU and GPU hosts are in the same network. |
There was a problem hiding this comment.
(nit) Does it have to be an SSH fleet specifically? I thought elastic (nodes: 0..) cloud and kubernetes fleets could work too — just don't specify any resource constraints in the fleet, and dstack will automatically provision the correct instances (both CPU and GPU, in the same fleet) based on the resources specified in replicas in the run configuration.
Only some backends won't work — like Nebius, which requires all instances in the cluster to be homogeneous.
The are more references to SSH fleets in the docs updated in this PR
| fleets: [pd-disagg] | ||
|
|
||
| # Custom probe is required for PD disaggregation | ||
| # Custom probe is required for PD disaggregation. |
There was a problem hiding this comment.
(nit) By the way, is it still required? I thought sync_router_workers_for_run_model can gracefully handle the router or workers not being ready, and perform the registration eventually, once they become ready
| def validate_replica_group_router_mutex(cls, values): | ||
| """ | ||
| When a replica group sets `router:`, service-level `router` must be omitted. |
| op.create_index( | ||
| op.f("ix_service_router_worker_sync_pipeline_fetch_q"), | ||
| "service_router_worker_sync", | ||
| [sa.literal_column("last_processed_at ASC")], | ||
| unique=False, | ||
| ) |
| if service.router is not None and service.router.type == RouterType.SGLANG: | ||
| path_for_match = path if path.startswith("/") else f"/{path}" | ||
| if not _is_whitelisted_path(path_for_match, _SGLANG_WHITELISTED_PATHS): | ||
| raise ProxyError("Path is not allowed for this service", status.HTTP_404_NOT_FOUND) |
There was a problem hiding this comment.
(nit) 403 Forbidden for consistency with the gateway and better semantics?
| _SGLANG_WHITELISTED_PATHS = ( | ||
| "/generate", | ||
| "/v1/", | ||
| "/chat/completions", | ||
| ) |
There was a problem hiding this comment.
(nit) Duplicates the list from the gateway. Consider importing the constant from some common place, like proxy/lib/const.py
| await session.execute( | ||
| update(ServiceRouterWorkerSyncModel) | ||
| .where( | ||
| ServiceRouterWorkerSyncModel.id == item.id, | ||
| ServiceRouterWorkerSyncModel.lock_token == item.lock_token, | ||
| ) | ||
| .values(**early_cleanup_update_map) | ||
| ) | ||
| await session.commit() | ||
| return |
There was a problem hiding this comment.
(nit) Missing the log_lock_token_changed_after_processing call in case the update fails.
To avoid such discrepancies, consider refactoring, so that there is only one place that performs the update (currently there are three in the same method)
Refer design document for this PR is here.