Skip to content

Commit 0c60b1e

Browse files
feat: add volume idle duration cleanup feature (#2497) (#2842)
* feat: add volume idle duration cleanup feature (#2497) * fix: avoid FOR UPDATE with outer join in process_idle_volumes (Postgres compatibility) * Fix formatting issues * Optimize idle volume cleanup implementation * Removed unnecessary changes and refactor implementations * Removed unnecessary changes and refactor implementations * Fix volume auto-cleanup locking and mocking issues * feat: add merge migration for volume cleanup and secrets * fix: remove accidentally committed test files with non-existent imports * fix: add missing dialect_name parameter to get_locker call * fix: address code review comments * fix: actually delete volumes from cloud providers * Remove redundant code changes * Remove accidentally committed local config file * Fix MissingGreenlet error in process_idle_volumes * Add validation for external volumes with auto_cleanup_duration * Remove merge migration and rebase on master * Added merge migration to resolve multiple heads * Rebase migrations * Update idle_duration type to match updated parse_idle_duration * Refactor process_idle_volumes * Refactor External volumes check * Fix client backward compatibility --------- Co-authored-by: Victor Skvortsov <vds003@gmail.com>
1 parent c96dc62 commit 0c60b1e

File tree

14 files changed

+495
-8
lines changed

14 files changed

+495
-8
lines changed

src/dstack/_internal/cli/services/profile.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ def apply_profile_args(
159159
if args.idle_duration is not None:
160160
profile_settings.idle_duration = args.idle_duration
161161
elif args.dont_destroy:
162-
profile_settings.idle_duration = False
162+
profile_settings.idle_duration = "off"
163163
if args.creation_policy_reuse:
164164
profile_settings.creation_policy = CreationPolicy.REUSE
165165

src/dstack/_internal/core/compatibility/volumes.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,4 +30,6 @@ def _get_volume_configuration_excludes(
3030
configuration_excludes: IncludeExcludeDictType = {}
3131
if configuration.tags is None:
3232
configuration_excludes["tags"] = True
33+
if configuration.auto_cleanup_duration is None:
34+
configuration_excludes["auto_cleanup_duration"] = True
3335
return configuration_excludes

src/dstack/_internal/core/models/profiles.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -76,11 +76,9 @@ def parse_off_duration(v: Optional[Union[int, str, bool]]) -> Optional[Union[str
7676
return parse_duration(v)
7777

7878

79-
def parse_idle_duration(v: Optional[Union[int, str, bool]]) -> Optional[Union[str, int, bool]]:
80-
if v is False:
79+
def parse_idle_duration(v: Optional[Union[int, str]]) -> Optional[Union[str, int]]:
80+
if v == "off" or v == -1:
8181
return -1
82-
if v is True:
83-
return None
8482
return parse_duration(v)
8583

8684

@@ -251,7 +249,7 @@ class ProfileParams(CoreModel):
251249
),
252250
] = None
253251
idle_duration: Annotated[
254-
Optional[Union[Literal["off"], str, int, bool]],
252+
Optional[Union[Literal["off"], str, int]],
255253
Field(
256254
description=(
257255
"Time to wait before terminating idle instances."

src/dstack/_internal/core/models/volumes.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
from dstack._internal.core.models.backends.base import BackendType
1111
from dstack._internal.core.models.common import CoreModel
12+
from dstack._internal.core.models.profiles import parse_idle_duration
1213
from dstack._internal.core.models.resources import Memory
1314
from dstack._internal.utils.common import get_or_error
1415
from dstack._internal.utils.tags import tags_validator
@@ -44,6 +45,16 @@ class VolumeConfiguration(CoreModel):
4445
Optional[str],
4546
Field(description="The volume ID. Must be specified when registering external volumes"),
4647
] = None
48+
auto_cleanup_duration: Annotated[
49+
Optional[Union[str, int]],
50+
Field(
51+
description=(
52+
"Time to wait after volume is no longer used by any job before deleting it. "
53+
"Defaults to keep the volume indefinitely. "
54+
"Use the value 'off' or -1 to disable auto-cleanup."
55+
)
56+
),
57+
] = None
4758
tags: Annotated[
4859
Optional[Dict[str, str]],
4960
Field(
@@ -56,6 +67,9 @@ class VolumeConfiguration(CoreModel):
5667
] = None
5768

5869
_validate_tags = validator("tags", pre=True, allow_reuse=True)(tags_validator)
70+
_validate_auto_cleanup_duration = validator(
71+
"auto_cleanup_duration", pre=True, allow_reuse=True
72+
)(parse_idle_duration)
5973

6074
@property
6175
def size_gb(self) -> int:

src/dstack/_internal/server/background/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
process_gateways,
88
process_gateways_connections,
99
)
10+
from dstack._internal.server.background.tasks.process_idle_volumes import process_idle_volumes
1011
from dstack._internal.server.background.tasks.process_instances import (
1112
process_instances,
1213
)
@@ -74,6 +75,9 @@ def start_background_tasks() -> AsyncIOScheduler:
7475
_scheduler.add_job(
7576
process_submitted_volumes, IntervalTrigger(seconds=10, jitter=2), max_instances=5
7677
)
78+
_scheduler.add_job(
79+
process_idle_volumes, IntervalTrigger(seconds=60, jitter=10), max_instances=1
80+
)
7781
_scheduler.add_job(process_placement_groups, IntervalTrigger(seconds=30, jitter=5))
7882
for replica in range(settings.SERVER_BACKGROUND_PROCESSING_FACTOR):
7983
# Add multiple copies of tasks if requested.
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
import datetime
2+
from typing import List
3+
4+
from sqlalchemy import select
5+
from sqlalchemy.ext.asyncio import AsyncSession
6+
from sqlalchemy.orm import joinedload
7+
8+
from dstack._internal.core.backends.base.compute import ComputeWithVolumeSupport
9+
from dstack._internal.core.errors import BackendNotAvailable
10+
from dstack._internal.core.models.profiles import parse_duration
11+
from dstack._internal.core.models.volumes import VolumeStatus
12+
from dstack._internal.server.db import get_db, get_session_ctx
13+
from dstack._internal.server.models import ProjectModel, VolumeModel
14+
from dstack._internal.server.services import backends as backends_services
15+
from dstack._internal.server.services.locking import get_locker
16+
from dstack._internal.server.services.volumes import (
17+
get_volume_configuration,
18+
volume_model_to_volume,
19+
)
20+
from dstack._internal.utils import common
21+
from dstack._internal.utils.common import get_current_datetime
22+
from dstack._internal.utils.logging import get_logger
23+
24+
logger = get_logger(__name__)
25+
26+
27+
async def process_idle_volumes():
28+
lock, lockset = get_locker(get_db().dialect_name).get_lockset(VolumeModel.__tablename__)
29+
async with get_session_ctx() as session:
30+
async with lock:
31+
res = await session.execute(
32+
select(VolumeModel.id)
33+
.where(
34+
VolumeModel.status == VolumeStatus.ACTIVE,
35+
VolumeModel.deleted == False,
36+
VolumeModel.id.not_in(lockset),
37+
)
38+
.order_by(VolumeModel.last_processed_at.asc())
39+
.limit(10)
40+
.with_for_update(skip_locked=True, key_share=True)
41+
)
42+
volume_ids = list(res.scalars().all())
43+
if not volume_ids:
44+
return
45+
for volume_id in volume_ids:
46+
lockset.add(volume_id)
47+
48+
res = await session.execute(
49+
select(VolumeModel)
50+
.where(VolumeModel.id.in_(volume_ids))
51+
.options(joinedload(VolumeModel.project).joinedload(ProjectModel.backends))
52+
.options(joinedload(VolumeModel.user))
53+
.options(joinedload(VolumeModel.attachments))
54+
.execution_options(populate_existing=True)
55+
)
56+
volume_models = list(res.unique().scalars().all())
57+
try:
58+
volumes_to_delete = [v for v in volume_models if _should_delete_volume(v)]
59+
if not volumes_to_delete:
60+
return
61+
await _delete_idle_volumes(session, volumes_to_delete)
62+
finally:
63+
lockset.difference_update(volume_ids)
64+
65+
66+
def _should_delete_volume(volume: VolumeModel) -> bool:
67+
if volume.attachments:
68+
return False
69+
70+
config = get_volume_configuration(volume)
71+
if not config.auto_cleanup_duration:
72+
return False
73+
74+
duration_seconds = parse_duration(config.auto_cleanup_duration)
75+
if not duration_seconds or duration_seconds <= 0:
76+
return False
77+
78+
idle_time = _get_idle_time(volume)
79+
threshold = datetime.timedelta(seconds=duration_seconds)
80+
return idle_time > threshold
81+
82+
83+
def _get_idle_time(volume: VolumeModel) -> datetime.timedelta:
84+
last_used = volume.last_job_processed_at or volume.created_at
85+
last_used_utc = last_used.replace(tzinfo=datetime.timezone.utc)
86+
idle_time = get_current_datetime() - last_used_utc
87+
return max(idle_time, datetime.timedelta(0))
88+
89+
90+
async def _delete_idle_volumes(session: AsyncSession, volumes: List[VolumeModel]):
91+
# Note: Multiple volumes are deleted in the same transaction,
92+
# so long deletion of one volume may block processing other volumes.
93+
for volume_model in volumes:
94+
logger.info("Deleting idle volume %s", volume_model.name)
95+
try:
96+
await _delete_idle_volume(session, volume_model)
97+
except Exception:
98+
logger.exception("Error when deleting idle volume %s", volume_model.name)
99+
100+
volume_model.deleted = True
101+
volume_model.deleted_at = get_current_datetime()
102+
103+
logger.info("Deleted idle volume %s", volume_model.name)
104+
105+
await session.commit()
106+
107+
108+
async def _delete_idle_volume(session: AsyncSession, volume_model: VolumeModel):
109+
volume = volume_model_to_volume(volume_model)
110+
111+
if volume.provisioning_data is None:
112+
logger.error(
113+
f"Failed to delete volume {volume_model.name}. volume.provisioning_data is None."
114+
)
115+
return
116+
117+
if volume.provisioning_data.backend is None:
118+
logger.error(
119+
f"Failed to delete volume {volume_model.name}. volume.provisioning_data.backend is None."
120+
)
121+
return
122+
123+
try:
124+
backend = await backends_services.get_project_backend_by_type_or_error(
125+
project=volume_model.project,
126+
backend_type=volume.provisioning_data.backend,
127+
)
128+
except BackendNotAvailable:
129+
logger.error(
130+
f"Failed to delete volume {volume_model.name}. Backend {volume.configuration.backend} not available."
131+
)
132+
return
133+
134+
compute = backend.compute()
135+
assert isinstance(compute, ComputeWithVolumeSupport)
136+
await common.run_async(
137+
compute.delete_volume,
138+
volume=volume,
139+
)

src/dstack/_internal/server/background/tasks/process_submitted_jobs.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -739,3 +739,5 @@ async def _attach_volume(
739739
attachment_data=attachment_data.json(),
740740
)
741741
instance.volume_attachments.append(volume_attachment_model)
742+
743+
volume_model.last_job_processed_at = common_utils.get_current_datetime()
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
"""Add VolumeModel.last_job_processed_at
2+
3+
Revision ID: d5863798bf41
4+
Revises: 644b8a114187
5+
Create Date: 2025-07-15 14:26:22.981687
6+
7+
"""
8+
9+
import sqlalchemy as sa
10+
from alembic import op
11+
12+
import dstack._internal.server.models
13+
14+
# revision identifiers, used by Alembic.
15+
revision = "d5863798bf41"
16+
down_revision = "644b8a114187"
17+
branch_labels = None
18+
depends_on = None
19+
20+
21+
def upgrade() -> None:
22+
# ### commands auto generated by Alembic - please adjust! ###
23+
with op.batch_alter_table("volumes", schema=None) as batch_op:
24+
batch_op.add_column(
25+
sa.Column(
26+
"last_job_processed_at",
27+
dstack._internal.server.models.NaiveDateTime(),
28+
nullable=True,
29+
)
30+
)
31+
32+
# ### end Alembic commands ###
33+
34+
35+
def downgrade() -> None:
36+
# ### commands auto generated by Alembic - please adjust! ###
37+
with op.batch_alter_table("volumes", schema=None) as batch_op:
38+
batch_op.drop_column("last_job_processed_at")
39+
40+
# ### end Alembic commands ###

src/dstack/_internal/server/models.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -645,6 +645,7 @@ class VolumeModel(BaseModel):
645645
last_processed_at: Mapped[datetime] = mapped_column(
646646
NaiveDateTime, default=get_current_datetime
647647
)
648+
last_job_processed_at: Mapped[Optional[datetime]] = mapped_column(NaiveDateTime)
648649
deleted: Mapped[bool] = mapped_column(Boolean, default=False)
649650
deleted_at: Mapped[Optional[datetime]] = mapped_column(NaiveDateTime)
650651

src/dstack/_internal/server/services/jobs/__init__.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,19 @@ async def process_terminating_job(
293293
# so that stuck volumes don't prevent the instance from terminating.
294294
job_model.instance_id = None
295295
instance_model.last_job_processed_at = common.get_current_datetime()
296+
297+
volume_names = (
298+
jrd.volume_names
299+
if jrd and jrd.volume_names
300+
else [va.volume.name for va in instance_model.volume_attachments]
301+
)
302+
if volume_names:
303+
volumes = await list_project_volume_models(
304+
session=session, project=instance_model.project, names=volume_names
305+
)
306+
for volume in volumes:
307+
volume.last_job_processed_at = common.get_current_datetime()
308+
296309
logger.info(
297310
"%s: instance '%s' has been released, new status is %s",
298311
fmt(job_model),

0 commit comments

Comments
 (0)