Skip to content

Commit cd74051

Browse files
committed
feat: implement locked temp file write version
1 parent 4b0489c commit cd74051

4 files changed

Lines changed: 92 additions & 19 deletions

File tree

Cargo.lock

Lines changed: 12 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ email_address = "0.2.9"
8787
fake = "4"
8888
fantoccini = "0.22"
8989
form_urlencoded = "1"
90+
fs4 = { version = "0.13.1", features = ["tokio", "sync"] }
9091
futures = { version = "0.3", default-features = false }
9192
futures-core = { version = "0.3", default-features = false }
9293
futures-util = { version = "0.3", default-features = false }

cot/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ digest.workspace = true
3434
email_address.workspace = true
3535
fake = { workspace = true, optional = true, features = ["derive", "chrono"] }
3636
form_urlencoded.workspace = true
37+
fs4.workspace = true
3738
futures-core.workspace = true
3839
futures-util.workspace = true
3940
hex.workspace = true

cot/src/cache/store/file.rs

Lines changed: 78 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ use std::path::Path;
4747

4848
use blake3::hash;
4949
use chrono::{DateTime, Utc};
50+
use fs4::fs_std::FileExt;
51+
use fs4::tokio::AsyncFileExt;
5052
use serde_json::Value;
5153
use thiserror::Error;
5254
use tokio::fs::OpenOptions;
@@ -158,11 +160,14 @@ impl FileStore {
158160
std::fs::create_dir_all(&self.dir_path)
159161
.map_err(|e| FileCacheStoreError::DirCreation(Box::new(e)))?;
160162

161-
// try to cleanup the leftovers .tmp files due to crash
162163
if let Ok(entries) = std::fs::read_dir(&self.dir_path) {
163164
for entry in entries.flatten() {
164165
let path = entry.path();
165-
if path.extension().is_some_and(|ext| ext == TEMPFILE_SUFFIX) {
166+
167+
if path.extension().is_some_and(|ext| ext == TEMPFILE_SUFFIX)
168+
&& let Ok(file) = std::fs::File::open(&path)
169+
&& file.try_lock_exclusive().is_ok()
170+
{
166171
let _ = std::fs::remove_file(path);
167172
}
168173
}
@@ -190,11 +195,22 @@ impl FileStore {
190195
file.sync_data()
191196
.await
192197
.map_err(|e| FileCacheStoreError::Io(Box::new(e)))?;
193-
tokio::fs::rename(file_path, self.dir_path.join(&key_hash))
198+
199+
file.unlock_async()
194200
.await
195201
.map_err(|e| FileCacheStoreError::Io(Box::new(e)))?;
196202

197-
Ok(())
203+
match tokio::fs::rename(&file_path, self.dir_path.join(&key_hash)).await {
204+
Ok(()) => Ok(()),
205+
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
206+
// if the source temp file is gone, it means another thread
207+
// likely renamed it already or the directory was cleared.
208+
// In a thundering herd, this is actually a "success" state
209+
// because the data is already there.
210+
Ok(())
211+
}
212+
Err(e) => Err(FileCacheStoreError::Io(Box::new(e)))?,
213+
}
198214
}
199215

200216
async fn read(&self, key: &str) -> CacheStoreResult<Option<Value>> {
@@ -324,36 +340,33 @@ impl FileStore {
324340
&self,
325341
key_hash: &str,
326342
) -> CacheStoreResult<(tokio::fs::File, std::path::PathBuf)> {
327-
let mut temp_path = self.dir_path.join(format!("{key_hash}.{TEMPFILE_SUFFIX}"));
343+
let temp_path = self.dir_path.join(format!("{key_hash}.{TEMPFILE_SUFFIX}"));
328344

329345
let temp_file = loop {
330346
match OpenOptions::new()
331347
.write(true)
332-
.create_new(true)
333-
.truncate(true)
348+
.read(true)
349+
.create(true)
350+
.truncate(false)
334351
.open(&temp_path)
335352
.await
336353
{
337354
Ok(handle) => {
338-
break handle;
355+
if handle.try_lock_exclusive().is_ok() {
356+
handle
357+
.set_len(0)
358+
.await
359+
.map_err(|e| FileCacheStoreError::TempFileCreation(Box::new(e)))?;
360+
361+
break handle;
362+
}
339363
}
340364
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
341365
// if this fails then we should bail regardless
342366
self.create_dir_root()
343367
.await
344368
.map_err(|e| FileCacheStoreError::DirCreation(Box::new(e)))?;
345369
}
346-
Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {
347-
// we continuously append a .tmp to a the filename
348-
// until we find an empty spot
349-
let file_name = temp_path
350-
.file_name()
351-
.ok_or_else(|| FileCacheStoreError::TempFileCreation(Box::new(e)))?;
352-
let mut os_name = file_name.to_os_string();
353-
os_name.push(format!(".{TEMPFILE_SUFFIX}"));
354-
355-
temp_path.set_file_name(os_name);
356-
}
357370
Err(e) => {
358371
return Err(FileCacheStoreError::TempFileCreation(Box::new(e)))?;
359372
}
@@ -660,6 +673,52 @@ mod tests {
660673
let _ = tokio::fs::remove_dir_all(&path).await;
661674
}
662675

676+
#[cot::test]
677+
async fn test_interference_during_write() {
678+
let path = make_store_path();
679+
let store = FileStore::new(path.clone()).expect("failed to init store");
680+
681+
let key = "test_key".to_string();
682+
let value = serde_json::json!({ "id": 1, "message": "hello world" });
683+
684+
let num_task = 10;
685+
let barrier = Arc::new(Barrier::new(num_task + 1));
686+
let mut handles = Vec::with_capacity(num_task);
687+
688+
for _ in 0..num_task {
689+
let b = barrier.clone();
690+
let k = key.clone();
691+
let s = store.clone();
692+
let v = value.clone();
693+
694+
handles.push(tokio::spawn(async move {
695+
b.wait().await;
696+
s.insert(k, v, Timeout::Never)
697+
.await
698+
.expect("failed to insert data to store");
699+
700+
sleep(Duration::from_millis(10)).await;
701+
}));
702+
}
703+
704+
barrier.wait().await;
705+
706+
tokio::task::yield_now().await;
707+
708+
let _store_2 = FileStore::new(path.clone()).expect("failed to init store");
709+
710+
for handle in handles {
711+
handle.await.expect("task panicked");
712+
}
713+
714+
let retrieved = store.read(&key).await.expect("failed to read from store");
715+
if let Some(found) = retrieved {
716+
assert_eq!(found, value);
717+
}
718+
719+
let _ = tokio::fs::remove_dir_all(&path).await;
720+
}
721+
663722
#[cot::test]
664723
async fn test_thundering_write() {
665724
let path = make_store_path();

0 commit comments

Comments
 (0)