Skip to content

Commit c83d2ed

Browse files
Optimize idle volume cleanup implementation
1 parent ef79345 commit c83d2ed

2 files changed

Lines changed: 33 additions & 75 deletions

File tree

src/dstack/_internal/server/background/tasks/process_idle_volumes.py

Lines changed: 21 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,7 @@
1616
logger = get_logger(__name__)
1717

1818

19-
async def process_idle_volumes(batch_size: int = 10):
20-
"""
21-
Process volumes to check if they have exceeded their idle_duration and delete them.
22-
"""
19+
async def process_idle_volumes():
2320
lock, lockset = get_locker().get_lockset(VolumeModel.__tablename__)
2421
async with get_session_ctx() as session:
2522
async with lock:
@@ -31,36 +28,31 @@ async def process_idle_volumes(batch_size: int = 10):
3128
VolumeModel.id.not_in(lockset),
3229
)
3330
.order_by(VolumeModel.last_processed_at.asc())
34-
.limit(batch_size)
31+
.limit(10)
3532
.with_for_update(skip_locked=True)
3633
)
3734
volume_models = list(res.unique().scalars().all())
38-
for volume_model in volume_models:
39-
await session.refresh(volume_model, ["project", "attachments"])
4035
if not volume_models:
4136
return
42-
4337
for volume_model in volume_models:
38+
await session.refresh(volume_model, ["project", "attachments"])
4439
lockset.add(volume_model.id)
4540

4641
try:
4742
volumes_to_delete = []
4843
for volume_model in volume_models:
49-
if await _should_delete_idle_volume(volume_model):
44+
if _should_delete_idle_volume(volume_model):
5045
volumes_to_delete.append(volume_model)
5146

5247
if volumes_to_delete:
5348
await _delete_idle_volumes(session, volumes_to_delete)
5449

5550
finally:
5651
for volume_model in volume_models:
57-
lockset.difference_update([volume_model.id])
52+
lockset.discard(volume_model.id)
5853

5954

60-
async def _should_delete_idle_volume(volume_model: VolumeModel) -> bool:
61-
"""
62-
Check if a volume should be deleted based on its idle duration.
63-
"""
55+
def _should_delete_idle_volume(volume_model: VolumeModel) -> bool:
6456
configuration = get_volume_configuration(volume_model)
6557

6658
if configuration.idle_duration is None:
@@ -82,32 +74,33 @@ async def _should_delete_idle_volume(volume_model: VolumeModel) -> bool:
8274

8375
if idle_duration > idle_threshold:
8476
logger.info(
85-
"Volume %s idle duration expired: idle time %s seconds, threshold %s seconds. Marking for deletion",
77+
"Volume %s idle duration expired: idle time %.1f hours, threshold %.1f hours. Marking for deletion",
8678
volume_model.name,
87-
idle_duration.total_seconds(),
88-
idle_threshold.total_seconds(),
79+
idle_duration.total_seconds() / 3600,
80+
idle_threshold.total_seconds() / 3600,
8981
)
9082
return True
9183

9284
return False
9385

9486

9587
def _get_volume_idle_duration(volume_model: VolumeModel) -> datetime.timedelta:
96-
"""
97-
Calculate how long a volume has been idle.
98-
A volume is considered idle from the time it was last processed by a job.
99-
If it was never used by a job, use the created_at time.
100-
"""
101-
last_time = volume_model.created_at.replace(tzinfo=datetime.timezone.utc)
88+
reference_time = volume_model.created_at
10289
if volume_model.last_job_processed_at is not None:
103-
last_time = volume_model.last_job_processed_at.replace(tzinfo=datetime.timezone.utc)
104-
return get_current_datetime() - last_time
90+
reference_time = volume_model.last_job_processed_at
91+
92+
reference_time_utc = reference_time.replace(tzinfo=datetime.timezone.utc)
93+
current_time = get_current_datetime()
94+
95+
idle_duration = current_time - reference_time_utc
96+
97+
if idle_duration.total_seconds() < 0:
98+
return datetime.timedelta(0)
99+
100+
return idle_duration
105101

106102

107103
async def _delete_idle_volumes(session: AsyncSession, volume_models: List[VolumeModel]):
108-
"""
109-
Delete volumes that have exceeded their idle duration.
110-
"""
111104
volumes_by_project = {}
112105
for volume_model in volume_models:
113106
project = volume_model.project

src/tests/_internal/server/background/tasks/test_process_idle_volumes.py

Lines changed: 12 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,13 @@
2222
)
2323

2424

25+
@pytest.mark.asyncio
26+
@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True)
2527
class TestProcessIdleVolumes:
26-
@pytest.mark.asyncio
27-
@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True)
2828
async def test_no_idle_duration_configured(self, test_db, session: AsyncSession):
29-
"""Test that volumes without idle_duration configured are not deleted."""
3029
project = await create_project(session=session)
3130
user = await create_user(session=session)
3231

33-
# Create volume without idle_duration
3432
volume = await create_volume(
3533
session=session,
3634
project=project,
@@ -41,17 +39,13 @@ async def test_no_idle_duration_configured(self, test_db, session: AsyncSession)
4139
volume_provisioning_data=get_volume_provisioning_data(),
4240
)
4341

44-
should_delete = await _should_delete_idle_volume(volume)
42+
should_delete = _should_delete_idle_volume(volume)
4543
assert not should_delete
4644

47-
@pytest.mark.asyncio
48-
@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True)
4945
async def test_idle_duration_disabled(self, test_db, session: AsyncSession):
50-
"""Test that volumes with idle_duration set to -1 (disabled) are not deleted."""
5146
project = await create_project(session=session)
5247
user = await create_user(session=session)
5348

54-
# Create volume with idle_duration disabled
5549
volume_config = get_volume_configuration(name="test-volume")
5650
volume_config.idle_duration = -1
5751

@@ -65,19 +59,15 @@ async def test_idle_duration_disabled(self, test_db, session: AsyncSession):
6559
volume_provisioning_data=get_volume_provisioning_data(),
6660
)
6761

68-
should_delete = await _should_delete_idle_volume(volume)
62+
should_delete = _should_delete_idle_volume(volume)
6963
assert not should_delete
7064

71-
@pytest.mark.asyncio
72-
@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True)
7365
async def test_volume_still_attached(self, test_db, session: AsyncSession):
74-
"""Test that volumes still attached to instances are not deleted."""
7566
project = await create_project(session=session)
7667
user = await create_user(session=session)
7768

78-
# Create volume with idle_duration
7969
volume_config = get_volume_configuration(name="test-volume")
80-
volume_config.idle_duration = "1h" # 1 hour
70+
volume_config.idle_duration = "1h"
8171

8272
volume = await create_volume(
8373
session=session,
@@ -89,7 +79,6 @@ async def test_volume_still_attached(self, test_db, session: AsyncSession):
8979
volume_provisioning_data=get_volume_provisioning_data(),
9080
)
9181

92-
# Create an instance and attach the volume to it
9382
instance = await create_instance(session=session, project=project)
9483
attachment = VolumeAttachmentModel(
9584
volume_id=volume.id,
@@ -98,19 +87,15 @@ async def test_volume_still_attached(self, test_db, session: AsyncSession):
9887
volume.attachments.append(attachment)
9988
await session.commit()
10089

101-
should_delete = await _should_delete_idle_volume(volume)
90+
should_delete = _should_delete_idle_volume(volume)
10291
assert not should_delete
10392

104-
@pytest.mark.asyncio
105-
@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True)
10693
async def test_idle_duration_not_exceeded(self, test_db, session: AsyncSession):
107-
"""Test that volumes within idle duration threshold are not deleted."""
10894
project = await create_project(session=session)
10995
user = await create_user(session=session)
11096

111-
# Create volume with idle_duration
11297
volume_config = get_volume_configuration(name="test-volume")
113-
volume_config.idle_duration = "1h" # 1 hour
98+
volume_config.idle_duration = "1h"
11499

115100
volume = await create_volume(
116101
session=session,
@@ -122,24 +107,19 @@ async def test_idle_duration_not_exceeded(self, test_db, session: AsyncSession):
122107
volume_provisioning_data=get_volume_provisioning_data(),
123108
)
124109

125-
# Set last_job_processed_at to 30 minutes ago (less than 1 hour)
126110
volume.last_job_processed_at = (
127111
datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(minutes=30)
128112
).replace(tzinfo=None)
129113

130-
should_delete = await _should_delete_idle_volume(volume)
114+
should_delete = _should_delete_idle_volume(volume)
131115
assert not should_delete
132116

133-
@pytest.mark.asyncio
134-
@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True)
135117
async def test_idle_duration_exceeded(self, test_db, session: AsyncSession):
136-
"""Test that volumes exceeding idle duration threshold are marked for deletion."""
137118
project = await create_project(session=session)
138119
user = await create_user(session=session)
139120

140-
# Create volume with idle_duration
141121
volume_config = get_volume_configuration(name="test-volume")
142-
volume_config.idle_duration = "1h" # 1 hour
122+
volume_config.idle_duration = "1h"
143123

144124
volume = await create_volume(
145125
session=session,
@@ -151,22 +131,17 @@ async def test_idle_duration_exceeded(self, test_db, session: AsyncSession):
151131
volume_provisioning_data=get_volume_provisioning_data(),
152132
)
153133

154-
# Set last_job_processed_at to 2 hours ago (more than 1 hour)
155134
volume.last_job_processed_at = (
156135
datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(hours=2)
157136
).replace(tzinfo=None)
158137

159-
should_delete = await _should_delete_idle_volume(volume)
138+
should_delete = _should_delete_idle_volume(volume)
160139
assert should_delete
161140

162-
@pytest.mark.asyncio
163-
@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True)
164141
async def test_volume_never_used_by_job(self, test_db, session: AsyncSession):
165-
"""Test idle duration calculation for volumes never used by jobs."""
166142
project = await create_project(session=session)
167143
user = await create_user(session=session)
168144

169-
# Create volume with old created_at time
170145
volume = await create_volume(
171146
session=session,
172147
project=project,
@@ -178,23 +153,17 @@ async def test_volume_never_used_by_job(self, test_db, session: AsyncSession):
178153
created_at=datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(hours=2),
179154
)
180155

181-
# last_job_processed_at is None, so it should use created_at
182156
volume.last_job_processed_at = None
183157

184158
idle_duration = _get_volume_idle_duration(volume)
185-
# Should be approximately 2 hours
186-
assert idle_duration.total_seconds() >= 7000 # ~2 hours in seconds
159+
assert idle_duration.total_seconds() >= 7000
187160

188-
@pytest.mark.asyncio
189-
@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True)
190161
async def test_process_idle_volumes_integration(self, test_db, session: AsyncSession):
191-
"""Integration test for the full process_idle_volumes function."""
192162
project = await create_project(session=session)
193163
user = await create_user(session=session)
194164

195-
# Create volume that should be deleted (exceeded idle duration)
196165
volume_config = get_volume_configuration(name="test-volume")
197-
volume_config.idle_duration = "1h" # 1 hour
166+
volume_config.idle_duration = "1h"
198167

199168
volume = await create_volume(
200169
session=session,
@@ -206,21 +175,17 @@ async def test_process_idle_volumes_integration(self, test_db, session: AsyncSes
206175
volume_provisioning_data=get_volume_provisioning_data(),
207176
)
208177

209-
# Set as idle for more than threshold
210178
volume.last_job_processed_at = (
211179
datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(hours=2)
212180
).replace(tzinfo=None)
213181

214182
await session.commit()
215183

216-
# Mock the delete_volumes function to avoid actual deletion
217184
with patch(
218185
"dstack._internal.server.background.tasks.process_idle_volumes.delete_volumes"
219186
) as mock_delete:
220187
await process_idle_volumes()
221188

222-
# Should have called delete_volumes with the volume
223189
mock_delete.assert_called_once()
224190
call_args = mock_delete.call_args
225-
# The function is called with (session, project, volume_names_list)
226191
assert call_args[0][2] == ["test-volume"]

0 commit comments

Comments
 (0)