Skip to content

Commit d061685

Browse files
authored
Evict jobs if instance is no longer imported (#3772)
Terminate jobs running on instances that are no longer imported, e.g., if the fleet was removed from an export, or the importer was removed, or the export was deleted. Use the new `INSTANCE_ACCESS_REVOKED` job termination reason.
1 parent 1713ef1 commit d061685

3 files changed

Lines changed: 135 additions & 1 deletion

File tree

src/dstack/_internal/core/models/runs.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,7 @@ class JobTerminationReason(str, Enum):
134134
FAILED_TO_START_DUE_TO_NO_CAPACITY = "failed_to_start_due_to_no_capacity"
135135
INTERRUPTED_BY_NO_CAPACITY = "interrupted_by_no_capacity"
136136
INSTANCE_UNREACHABLE = "instance_unreachable"
137+
INSTANCE_ACCESS_REVOKED = "instance_access_revoked"
137138
WAITING_INSTANCE_LIMIT_EXCEEDED = "waiting_instance_limit_exceeded"
138139
WAITING_RUNNER_LIMIT_EXCEEDED = "waiting_runner_limit_exceeded"
139140
TERMINATED_BY_USER = "terminated_by_user"
@@ -158,6 +159,7 @@ def to_status(self) -> JobStatus:
158159
self.FAILED_TO_START_DUE_TO_NO_CAPACITY: JobStatus.FAILED,
159160
self.INTERRUPTED_BY_NO_CAPACITY: JobStatus.FAILED,
160161
self.INSTANCE_UNREACHABLE: JobStatus.FAILED,
162+
self.INSTANCE_ACCESS_REVOKED: JobStatus.FAILED,
161163
self.WAITING_INSTANCE_LIMIT_EXCEEDED: JobStatus.FAILED,
162164
self.WAITING_RUNNER_LIMIT_EXCEEDED: JobStatus.FAILED,
163165
self.TERMINATED_BY_USER: JobStatus.TERMINATED,
@@ -196,6 +198,7 @@ def to_error(self) -> Optional[str]:
196198
# handled and shown in status_message.
197199
error_mapping = {
198200
JobTerminationReason.INSTANCE_UNREACHABLE: "instance unreachable",
201+
JobTerminationReason.INSTANCE_ACCESS_REVOKED: "instance access revoked",
199202
JobTerminationReason.WAITING_INSTANCE_LIMIT_EXCEEDED: "waiting instance limit exceeded",
200203
JobTerminationReason.WAITING_RUNNER_LIMIT_EXCEEDED: "waiting runner limit exceeded",
201204
JobTerminationReason.VOLUME_ERROR: "volume error",

src/dstack/_internal/server/background/pipeline_tasks/jobs_running.py

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from typing import Dict, Iterable, Literal, Optional, Sequence, Union
77

88
import httpx
9-
from sqlalchemy import and_, func, or_, select, update
9+
from sqlalchemy import and_, exists, func, or_, select, update
1010
from sqlalchemy.ext.asyncio import AsyncSession
1111
from sqlalchemy.orm import aliased, contains_eager, joinedload, load_only
1212

@@ -51,7 +51,9 @@
5151
from dstack._internal.server.background.pipeline_tasks.common import get_provisioning_timeout
5252
from dstack._internal.server.db import get_db, get_session_ctx
5353
from dstack._internal.server.models import (
54+
ExportedFleetModel,
5455
FleetModel,
56+
ImportModel,
5557
InstanceModel,
5658
JobModel,
5759
ProbeModel,
@@ -309,6 +311,7 @@ class _ProcessContext:
309311
job: Job
310312
job_submission: JobSubmission
311313
job_provisioning_data: Optional[JobProvisioningData]
314+
instance_access_revoked: bool
312315
server_ssh_private_keys: Optional[tuple[str, Optional[str]]] = None
313316

314317
@property
@@ -374,6 +377,7 @@ async def _load_process_context(item: JobRunningPipelineItem) -> Optional[_Proce
374377
)
375378
run = run_model_to_run(run_model, include_sensitive=True)
376379
job = find_job(run.jobs, job_model.replica_num, job_model.job_num)
380+
instance_access_revoked = await _is_instance_access_revoked(session, job_model)
377381
job_submission = job_model_to_job_submission(job_model)
378382
server_ssh_private_keys = get_instance_ssh_private_keys(get_or_error(job_model.instance))
379383
return _ProcessContext(
@@ -383,12 +387,24 @@ async def _load_process_context(item: JobRunningPipelineItem) -> Optional[_Proce
383387
job=job,
384388
job_submission=job_submission,
385389
job_provisioning_data=job_submission.job_provisioning_data,
390+
instance_access_revoked=instance_access_revoked,
386391
server_ssh_private_keys=server_ssh_private_keys,
387392
)
388393

389394

390395
async def _process_running_job(context: _ProcessContext) -> _ProcessResult:
391396
result = _ProcessResult()
397+
if context.instance_access_revoked:
398+
_terminate_job(
399+
job_model=context.job_model,
400+
job_update_map=result.job_update_map,
401+
termination_reason=JobTerminationReason.INSTANCE_ACCESS_REVOKED,
402+
termination_reason_message=(
403+
"The instance is no longer imported into the job's project"
404+
),
405+
)
406+
return result
407+
392408
if context.job_provisioning_data is None:
393409
logger.error("%s: job_provisioning_data of an active job is None", fmt(context.job_model))
394410
_terminate_job(
@@ -559,6 +575,22 @@ async def _fetch_run_model(
559575
return res.unique().scalar_one()
560576

561577

578+
async def _is_instance_access_revoked(session: AsyncSession, job_model: JobModel) -> bool:
579+
if job_model.instance is None or job_model.instance.project_id == job_model.project_id:
580+
return False
581+
return not (
582+
await session.execute(
583+
select(
584+
exists().where(
585+
ImportModel.project_id == job_model.project_id,
586+
ImportModel.export_id == ExportedFleetModel.export_id,
587+
ExportedFleetModel.fleet_id == job_model.instance.fleet_id,
588+
)
589+
)
590+
)
591+
).scalar()
592+
593+
562594
async def _process_provisioning_status(
563595
context: _ProcessContext,
564596
startup_context: _StartupContext,

src/tests/_internal/server/background/pipeline_tasks/test_running_jobs.py

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1995,6 +1995,105 @@ async def test_registers_service_replica_in_gateway_when_running_on_imported_ins
19951995
ssh_head_proxy_private_key=None,
19961996
)
19971997

1998+
@pytest.mark.parametrize("job_status", [JobStatus.RUNNING, JobStatus.PULLING])
1999+
async def test_terminates_job_when_instance_access_revoked(
2000+
self,
2001+
test_db,
2002+
session: AsyncSession,
2003+
worker: JobRunningWorker,
2004+
job_status: JobStatus,
2005+
):
2006+
user = await create_user(session=session)
2007+
exporter_project = await create_project(session=session, name="exporter", owner=user)
2008+
importer_project = await create_project(session=session, name="importer", owner=user)
2009+
fleet = await create_fleet(session=session, project=exporter_project)
2010+
instance = await create_instance(
2011+
session=session,
2012+
project=exporter_project,
2013+
status=InstanceStatus.BUSY,
2014+
fleet=fleet,
2015+
)
2016+
repo = await create_repo(session=session, project_id=importer_project.id)
2017+
run = await create_run(
2018+
session=session,
2019+
project=importer_project,
2020+
repo=repo,
2021+
user=user,
2022+
)
2023+
job = await create_job(
2024+
session=session,
2025+
run=run,
2026+
status=job_status,
2027+
job_provisioning_data=get_job_provisioning_data(dockerized=True),
2028+
instance=instance,
2029+
instance_assigned=True,
2030+
)
2031+
# No export created -> the import link no longer exists -> access revoked
2032+
2033+
await _process_job(session, worker, job)
2034+
2035+
await session.refresh(job)
2036+
assert job.status == JobStatus.TERMINATING
2037+
assert job.termination_reason == JobTerminationReason.INSTANCE_ACCESS_REVOKED
2038+
events = await list_events(session)
2039+
assert len(events) == 1
2040+
assert events[0].message == (
2041+
f"Job status changed {job_status.upper()} -> TERMINATING."
2042+
" Termination reason: INSTANCE_ACCESS_REVOKED"
2043+
" (The instance is no longer imported into the job's project)"
2044+
)
2045+
2046+
@pytest.mark.parametrize("job_status", [JobStatus.RUNNING, JobStatus.PULLING])
2047+
async def test_does_not_terminate_job_when_instance_access_is_valid(
2048+
self,
2049+
test_db,
2050+
session: AsyncSession,
2051+
worker: JobRunningWorker,
2052+
ssh_tunnel_mock: Mock,
2053+
runner_client_mock: Mock,
2054+
job_status: JobStatus,
2055+
):
2056+
user = await create_user(session=session)
2057+
exporter_project = await create_project(session=session, name="exporter", owner=user)
2058+
importer_project = await create_project(session=session, name="importer", owner=user)
2059+
fleet = await create_fleet(session=session, project=exporter_project)
2060+
instance = await create_instance(
2061+
session=session,
2062+
project=exporter_project,
2063+
status=InstanceStatus.BUSY,
2064+
fleet=fleet,
2065+
)
2066+
await create_export(
2067+
session=session,
2068+
exporter_project=exporter_project,
2069+
importer_projects=[importer_project],
2070+
exported_fleets=[fleet],
2071+
)
2072+
repo = await create_repo(session=session, project_id=importer_project.id)
2073+
run = await create_run(
2074+
session=session,
2075+
project=importer_project,
2076+
repo=repo,
2077+
user=user,
2078+
)
2079+
job = await create_job(
2080+
session=session,
2081+
run=run,
2082+
status=job_status,
2083+
job_provisioning_data=get_job_provisioning_data(dockerized=False),
2084+
instance=instance,
2085+
instance_assigned=True,
2086+
)
2087+
runner_client_mock.pull.return_value = PullResponse(
2088+
job_states=[], job_logs=[], runner_logs=[], last_updated=0
2089+
)
2090+
2091+
await _process_job(session, worker, job)
2092+
2093+
await session.refresh(job)
2094+
assert job.status == job_status
2095+
assert job.termination_reason is None
2096+
19982097
async def test_apply_skips_probe_insert_when_lock_token_changes_after_processing(
19992098
self,
20002099
test_db,

0 commit comments

Comments
 (0)