Skip to content

Commit bce4ad4

Browse files
authored
feat: add helper function to delete old sequences in MongoDB store (#286)
* Add helper function to clean up sequences older than a given age in MongoDB store * Add test cases for MongoDB store cleanups * Fix timestamp comparisons by moving the BsonDateTime for timestamps * Switch from experimental ubuntu-slim to ubuntu-latest
1 parent f6015bb commit bce4ad4

5 files changed

Lines changed: 138 additions & 8 deletions

File tree

.github/workflows/lint-pr-title.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ on:
1010
jobs:
1111
main:
1212
name: Validate PR title
13-
runs-on: ubuntu-slim
13+
runs-on: ubuntu-latest
1414
permissions:
1515
pull-requests: read
1616
steps:

crates/hotfix/src/store/error.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,10 @@ pub enum StoreError {
3232
/// Failed to reset the store.
3333
#[error("failed to reset store")]
3434
Reset(#[source] BoxError),
35+
36+
/// Failed to cleanup old sequences.
37+
#[error("failed to cleanup old sequences")]
38+
Cleanup(#[source] BoxError),
3539
}
3640

3741
/// A specialized Result type for store operations.

crates/hotfix/src/store/mongodb.rs

Lines changed: 55 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
use async_trait::async_trait;
2-
use chrono::{DateTime, Utc};
2+
use chrono::{DateTime, Duration, TimeZone, Utc};
33
use futures::TryStreamExt;
4-
use mongodb::bson::Binary;
54
use mongodb::bson::doc;
65
use mongodb::bson::oid::ObjectId;
76
use mongodb::bson::spec::BinarySubtype;
7+
use mongodb::bson::{Binary, DateTime as BsonDateTime};
88
use mongodb::options::{FindOneOptions, IndexOptions, ReplaceOptions};
99
use mongodb::{Collection, Database, IndexModel};
1010
use serde::{Deserialize, Serialize};
@@ -18,7 +18,7 @@ struct SequenceMeta {
1818
#[serde(rename = "_id")]
1919
object_id: ObjectId,
2020
meta: bool,
21-
creation_time: DateTime<Utc>,
21+
creation_time: BsonDateTime,
2222
sender_seq_number: u64,
2323
target_seq_number: u64,
2424
}
@@ -92,14 +92,61 @@ impl MongoDbMessageStore {
9292
let initial_meta = SequenceMeta {
9393
object_id: sequence_id,
9494
meta: true,
95-
creation_time: Utc::now(),
95+
creation_time: BsonDateTime::now(),
9696
sender_seq_number: 0,
9797
target_seq_number: 0,
9898
};
9999
meta_collection.insert_one(&initial_meta).await?;
100100

101101
Ok(initial_meta)
102102
}
103+
104+
/// Deletes sequences older than the specified age, along with their associated messages.
105+
///
106+
/// Returns the number of deleted sequences.
107+
pub async fn cleanup_older_than(&self, age: Duration) -> Result<u64> {
108+
let cutoff = BsonDateTime::from_millis((Utc::now() - age).timestamp_millis());
109+
110+
// Find old sequence IDs (excluding current sequence)
111+
let filter = doc! {
112+
"meta": true,
113+
"creation_time": { "$lt": cutoff },
114+
"_id": { "$ne": self.current_sequence.object_id }
115+
};
116+
let mut cursor = self
117+
.meta_collection
118+
.find(filter)
119+
.await
120+
.map_err(|e| StoreError::Cleanup(e.into()))?;
121+
122+
let mut old_sequence_ids = Vec::new();
123+
while let Some(meta) = cursor
124+
.try_next()
125+
.await
126+
.map_err(|e| StoreError::Cleanup(e.into()))?
127+
{
128+
old_sequence_ids.push(meta.object_id);
129+
}
130+
131+
if old_sequence_ids.is_empty() {
132+
return Ok(0);
133+
}
134+
135+
// Delete messages first to avoid orphaned meta documents
136+
self.message_collection
137+
.delete_many(doc! { "sequence_id": { "$in": &old_sequence_ids } })
138+
.await
139+
.map_err(|e| StoreError::Cleanup(e.into()))?;
140+
141+
// Delete sequence metas
142+
let result = self
143+
.meta_collection
144+
.delete_many(doc! { "_id": { "$in": &old_sequence_ids } })
145+
.await
146+
.map_err(|e| StoreError::Cleanup(e.into()))?;
147+
148+
Ok(result.deleted_count)
149+
}
103150
}
104151

105152
#[async_trait]
@@ -215,6 +262,9 @@ impl MessageStore for MongoDbMessageStore {
215262
}
216263

217264
fn creation_time(&self) -> DateTime<Utc> {
218-
self.current_sequence.creation_time
265+
#[allow(clippy::expect_used)]
266+
Utc.timestamp_millis_opt(self.current_sequence.creation_time.timestamp_millis())
267+
.single()
268+
.expect("BsonDateTime is guaranteed to store valid timestamp")
219269
}
220270
}

crates/hotfix/tests/common_store_tests.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -312,12 +312,13 @@ async fn test_creation_time_gets_reset_correctly() {
312312
for factory in create_test_store_factories().await {
313313
let mut store = factory.create_store().await;
314314

315-
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
315+
tokio::time::sleep(std::time::Duration::from_millis(2)).await;
316316
let after_sleep = Utc::now();
317+
tokio::time::sleep(std::time::Duration::from_millis(2)).await;
317318

318319
store.reset().await.expect("failed to reset store");
319320
let reset_creation_time = store.creation_time();
320-
assert!(reset_creation_time >= after_sleep);
321+
assert!(reset_creation_time > after_sleep);
321322

322323
if !factory.is_persistent() {
323324
continue;

crates/hotfix/tests/mongodb_store_tests.rs

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#![cfg(feature = "mongodb")]
22

3+
use chrono::Duration;
34
use hotfix::store::mongodb::{Client, MongoDbMessageStore};
45
use hotfix::store::{MessageStore, StoreError};
56
use testcontainers::runners::AsyncRunner;
@@ -114,3 +115,77 @@ async fn test_state_preserved_after_failed_set_target() {
114115
// State should be unchanged
115116
assert_eq!(store.next_target_seq_number(), initial_target_seq);
116117
}
118+
119+
#[tokio::test]
120+
async fn test_cleanup_removes_old_sequences() {
121+
let (container, mut store) = create_dedicated_container_and_store().await;
122+
123+
// Add a message to the initial sequence
124+
store.add(1, b"message in sequence 1").await.unwrap();
125+
126+
// Reset creates a new sequence, making the first one "old"
127+
store.reset().await.unwrap();
128+
store.add(1, b"message in sequence 2").await.unwrap();
129+
130+
// Reset again to have two old sequences
131+
store.reset().await.unwrap();
132+
store.add(1, b"message in sequence 3").await.unwrap();
133+
134+
// Small delay to ensure old sequences have earlier timestamps than the cutoff
135+
tokio::time::sleep(std::time::Duration::from_millis(1)).await;
136+
137+
// Cleanup with zero duration should delete all old sequences
138+
let deleted = store.cleanup_older_than(Duration::zero()).await.unwrap();
139+
140+
assert_eq!(deleted, 2);
141+
142+
drop(container);
143+
}
144+
145+
#[tokio::test]
146+
async fn test_cleanup_preserves_current_sequence() {
147+
let (container, mut store) = create_dedicated_container_and_store().await;
148+
149+
// Add messages to current sequence
150+
store.add(1, b"message 1").await.unwrap();
151+
store.add(2, b"message 2").await.unwrap();
152+
153+
// Cleanup with zero duration - current sequence should be preserved
154+
let deleted = store.cleanup_older_than(Duration::zero()).await.unwrap();
155+
156+
assert_eq!(deleted, 0);
157+
158+
// Verify messages are still accessible
159+
let messages = store.get_slice(1, 2).await.unwrap();
160+
assert_eq!(messages.len(), 2);
161+
162+
drop(container);
163+
}
164+
165+
#[tokio::test]
166+
async fn test_cleanup_respects_age_threshold() {
167+
let (container, mut store) = create_dedicated_container_and_store().await;
168+
169+
// Create an old sequence
170+
store.reset().await.unwrap();
171+
172+
// Cleanup with a large duration should not delete anything
173+
let deleted = store.cleanup_older_than(Duration::days(365)).await.unwrap();
174+
175+
assert_eq!(deleted, 0);
176+
177+
drop(container);
178+
}
179+
180+
#[tokio::test]
181+
async fn test_cleanup_after_connection_drop() {
182+
let (container, store) = create_dedicated_container_and_store().await;
183+
184+
// Stop the container
185+
container.stop().await.unwrap();
186+
187+
// Attempt cleanup - should fail
188+
let result = store.cleanup_older_than(Duration::zero()).await;
189+
190+
assert!(matches!(result, Err(StoreError::Cleanup(_))));
191+
}

0 commit comments

Comments
 (0)