Skip to content

Commit f4765cf

Browse files
authored
fix: Fixes various file/lock delete failures on windows to allow us to unpin lockfile (#792)
1 parent 6332939 commit f4765cf

4 files changed

Lines changed: 96 additions & 42 deletions

File tree

requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
torch
22
torchvision
33
lightning-utilities
4-
filelock <3.24 # v3.24.0 removed lock file auto-delete on Windows, breaking our cleanup logic
4+
filelock
55
numpy
66
boto3
77
requests

src/litdata/streaming/downloader.py

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -539,18 +539,25 @@ def download_file(self, remote_filepath: str, local_filepath: str) -> None:
539539
if not os.path.exists(remote_filepath):
540540
raise FileNotFoundError(f"The provided remote_path doesn't exist: {remote_filepath}")
541541

542+
lock_path = local_filepath + ".lock"
543+
lock_acquired = False
542544
with (
543545
suppress(Timeout, FileNotFoundError),
544-
FileLock(local_filepath + ".lock", timeout=1 if remote_filepath.endswith(_INDEX_FILENAME) else 0),
546+
FileLock(lock_path, timeout=1 if remote_filepath.endswith(_INDEX_FILENAME) else 0),
545547
):
546-
if remote_filepath == local_filepath or os.path.exists(local_filepath):
547-
return
548-
# make an atomic operation to be safe
549-
temp_file_path = local_filepath + ".tmp"
550-
shutil.copy(remote_filepath, temp_file_path)
551-
os.rename(temp_file_path, local_filepath)
548+
lock_acquired = True
549+
if not (remote_filepath == local_filepath or os.path.exists(local_filepath)):
550+
# make an atomic operation to be safe
551+
temp_file_path = local_filepath + ".tmp"
552+
shutil.copy(remote_filepath, temp_file_path)
553+
os.rename(temp_file_path, local_filepath)
554+
# FileLock doesn't delete its lock file on release — we clean it up manually.
555+
# This must happen after release (Windows can't delete open files) and after the
556+
# work is done (on Linux, deleting an in-use lock file lets other processes lock
557+
# on a new inode, bypassing mutual exclusion).
558+
if lock_acquired:
552559
with contextlib.suppress(Exception):
553-
os.remove(local_filepath + ".lock")
560+
os.remove(lock_path)
554561

555562

556563
class HFDownloader(Downloader):

src/litdata/streaming/reader.py

Lines changed: 66 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -113,55 +113,88 @@ def _decrement_local_lock(self, chunk_index: int) -> int:
113113
chunk_filepath, _, _ = self._config[ChunkedIndex(index=-1, chunk_index=chunk_index)]
114114

115115
countpath = chunk_filepath + ".cnt"
116-
with suppress(Timeout, FileNotFoundError), FileLock(countpath + ".lock", timeout=3):
117-
if not os.path.exists(countpath):
118-
return 0
119-
with open(countpath) as count_f:
120-
try:
121-
curr_count = int(count_f.read().strip())
122-
except Exception:
123-
curr_count = 1
124-
curr_count -= 1
125-
if curr_count <= 0:
126-
with suppress(FileNotFoundError, PermissionError):
127-
os.remove(countpath)
128-
129-
with suppress(FileNotFoundError, PermissionError):
130-
os.remove(countpath + ".lock")
116+
lock_path = countpath + ".lock"
117+
curr_count = 0
118+
remove_lock = False
119+
with suppress(Timeout, FileNotFoundError), FileLock(lock_path, timeout=3):
120+
if os.path.exists(countpath):
121+
with open(countpath) as count_f:
122+
try:
123+
curr_count = int(count_f.read().strip())
124+
except Exception:
125+
curr_count = 1
126+
curr_count -= 1
127+
if curr_count <= 0:
128+
with suppress(FileNotFoundError, PermissionError):
129+
os.remove(countpath)
130+
remove_lock = True
131+
else:
132+
with open(countpath, "w+") as count_f:
133+
logger.debug(_get_log_msg({"name": f"decrement_lock_{chunk_index}_to_{curr_count}", "ph": "B"}))
134+
count_f.write(str(curr_count))
135+
logger.debug(_get_log_msg({"name": f"decrement_lock_{chunk_index}_to_{curr_count}", "ph": "E"}))
131136
else:
132-
with open(countpath, "w+") as count_f:
133-
logger.debug(_get_log_msg({"name": f"decrement_lock_{chunk_index}_to_{curr_count}", "ph": "B"}))
134-
count_f.write(str(curr_count))
135-
logger.debug(_get_log_msg({"name": f"decrement_lock_{chunk_index}_to_{curr_count}", "ph": "E"}))
136-
return curr_count
137-
return 0
137+
remove_lock = True
138+
# FileLock doesn't delete its lock file on release — we clean it up manually.
139+
# This must happen after release (Windows can't delete open files) and after the
140+
# work is done (on Linux, deleting an in-use lock file lets other processes lock
141+
# on a new inode, bypassing mutual exclusion).
142+
if remove_lock:
143+
with suppress(FileNotFoundError, PermissionError):
144+
os.remove(lock_path)
145+
return curr_count
146+
147+
def _cleanup_download_locks(self, chunk_filepath: str, chunk_index: int) -> None:
148+
"""Remove stale download lock files for a chunk.
149+
150+
Download lock files (e.g. ``chunk-0-3.zstd.bin.lock``) are FileLock artifacts created
151+
during download. They are safe to remove once the chunk exists locally, regardless of
152+
the refcount held in ``.cnt`` files. Reference-count lock files (``.cnt.lock``) are
153+
excluded because they may still be needed by concurrent refcount operations.
154+
155+
"""
156+
base_name = os.path.basename(chunk_filepath)
157+
base_prefix = os.path.splitext(base_name)[0]
158+
cache_dir = os.path.dirname(chunk_filepath)
159+
pattern = os.path.join(cache_dir, f"{base_prefix}*.lock")
160+
matched_locks = [p for p in glob.glob(pattern) if not p.endswith(".cnt.lock")]
161+
if matched_locks:
162+
logger.debug(f"_apply_delete({chunk_index}): glob matched {matched_locks}")
163+
for lock_path in matched_locks:
164+
try:
165+
os.remove(lock_path)
166+
logger.debug(f"_apply_delete({chunk_index}): removed {lock_path}")
167+
except (FileNotFoundError, PermissionError) as e:
168+
logger.warning(f"_apply_delete({chunk_index}): failed to remove {lock_path}: {e}")
169+
except Exception as e:
170+
logger.warning(f"_apply_delete({chunk_index}): unexpected error removing {lock_path}: {e}")
138171

139172
def _apply_delete(self, chunk_index: int, skip_lock: bool = False) -> None:
140173
"""Inform the item loader of the chunk to delete."""
174+
logger.debug(f"_apply_delete({chunk_index}, skip_lock={skip_lock}) called")
141175
# TODO: Fix the can_delete method
142176
can_delete_chunk = self._config.can_delete(chunk_index)
143177
chunk_filepath, _, _ = self._config[ChunkedIndex(index=-1, chunk_index=chunk_index)]
144178

145179
if not skip_lock:
146180
remaining_locks = self._remaining_locks(chunk_filepath)
147181
if remaining_locks > 0: # Can't delete this, something has it
182+
logger.debug(f"_apply_delete({chunk_index}): skipping data deletion, remaining_locks={remaining_locks}")
148183
if _DEBUG:
149184
print(f"Skip delete {chunk_filepath} by {self._rank or 0}, current lock count: {remaining_locks}")
185+
self._cleanup_download_locks(chunk_filepath, chunk_index)
150186
return
151187

152188
if _DEBUG:
153189
with open(chunk_filepath + ".tmb", "w+") as tombstone_file:
154190
tombstone_file.write(f"Deleted {chunk_filepath} by {self._rank or 0}. Debug: {can_delete_chunk}")
155191

156-
self._item_loader.delete(chunk_index, chunk_filepath)
192+
try:
193+
self._item_loader.delete(chunk_index, chunk_filepath)
194+
except (FileNotFoundError, PermissionError) as e:
195+
logger.debug(f"_apply_delete({chunk_index}): could not remove data file: {e}")
157196

158-
base_name = os.path.basename(chunk_filepath)
159-
base_prefix = os.path.splitext(base_name)[0]
160-
cache_dir = os.path.dirname(chunk_filepath)
161-
pattern = os.path.join(cache_dir, f"{base_prefix}*.lock")
162-
for lock_path in glob.glob(pattern):
163-
with suppress(FileNotFoundError, PermissionError):
164-
os.remove(lock_path)
197+
self._cleanup_download_locks(chunk_filepath, chunk_index)
165198

166199
def stop(self) -> None:
167200
"""Receive the list of the chunk indices to download for the current epoch."""
@@ -462,6 +495,10 @@ def read(self, index: ChunkedIndex) -> Any:
462495
self._last_chunk_size = index.chunk_size
463496

464497
if index.is_last_index and self._prepare_thread:
498+
# Close the item loader's handle on the last chunk before requesting
499+
# deletion. On Windows, os.remove fails if the file is still open.
500+
self._item_loader.close(self._last_chunk_index)
501+
465502
# inform the thread it is time to stop
466503
self._prepare_thread._decrement_local_lock(index.chunk_index)
467504
self._prepare_thread.delete([index.chunk_index])
@@ -475,7 +512,6 @@ def read(self, index: ChunkedIndex) -> Any:
475512
"This can happen if the chunk files are too large."
476513
)
477514
self._prepare_thread = None
478-
self._item_loader.close(self._last_chunk_index)
479515
self._last_chunk_index = None
480516
self._last_chunk_size = None
481517
self._chunks_queued_for_download = False

tests/streaming/test_lock_cleanup.py

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import logging
12
import os
23
import shutil
34
from contextlib import suppress
@@ -40,7 +41,10 @@ def download_file(self, remote_filepath: str, local_filepath: str) -> None: # t
4041

4142

4243
@pytest.mark.skipif(not _ZSTD_AVAILABLE, reason="Requires: ['zstd']")
43-
def test_reader_lock_cleanup_with_nonlocal_like_downloader(tmpdir):
44+
def test_reader_lock_cleanup_with_nonlocal_like_downloader(tmpdir, caplog):
45+
# Enable debug logging so _apply_delete diagnostics appear in CI output
46+
caplog.set_level(logging.DEBUG, logger="litdata.streaming.reader")
47+
4448
cache_dir = os.path.join(tmpdir, "cache_dir")
4549
remote_dir = os.path.join(tmpdir, "remote_dir")
4650
os.makedirs(cache_dir, exist_ok=True)
@@ -74,8 +78,15 @@ def test_reader_lock_cleanup_with_nonlocal_like_downloader(tmpdir):
7478
chunk_idx = ChunkedIndex(index=idx[0], chunk_index=idx[1], is_last_index=(i == 9))
7579
reader.read(chunk_idx)
7680

81+
# Diagnostic: dump all files and captured logs before asserting
82+
all_files = sorted(os.listdir(cache_dir))
83+
print(f"\n[DIAG] All files in cache_dir: {all_files}")
84+
print("[DIAG] Captured log messages:")
85+
for record in caplog.records:
86+
print(f" [{record.levelname}] {record.message}")
87+
7788
# At the end, no chunk-related lock files should remain
78-
leftover_locks = [f for f in os.listdir(cache_dir) if f.endswith(".lock") and f.startswith("chunk-")]
79-
assert leftover_locks == []
89+
leftover_locks = [f for f in all_files if f.endswith(".lock") and f.startswith("chunk-")]
90+
assert leftover_locks == [], f"Leftover locks: {leftover_locks}, all files: {all_files}"
8091
finally:
8192
unregister_downloader(prefix)

0 commit comments

Comments
 (0)