Skip to content

Commit aca2671

Browse files
committed
fix: remove associated filters during stream deletion
1 parent 15cde20 commit aca2671

1 file changed

Lines changed: 16 additions & 0 deletions

File tree

src/handlers/http/logstream.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use crate::rbac::role::Action;
2929
use crate::stats::{Stats, event_labels_date, storage_size_labels_date};
3030
use crate::storage::retention::Retention;
3131
use crate::storage::{ObjectStoreFormat, StreamInfo, StreamType};
32+
use crate::users::filters::{FILTERS, Filter};
3233
use crate::utils::actix::extract_session_key_from_req;
3334
use crate::utils::json::flatten::{
3435
self, convert_to_array, generic_flattening, has_more_than_max_allowed_levels,
@@ -56,6 +57,21 @@ pub async fn delete(stream_name: Path<String>) -> Result<impl Responder, StreamE
5657

5758
let objectstore = PARSEABLE.storage.get_object_store();
5859

60+
let all_filters = PARSEABLE.metastore.get_filters().await.unwrap_or_default();
61+
// collect filters associated with the logstream being deleted
62+
let filters_for_stream: Vec<Filter> = all_filters
63+
.into_iter()
64+
.filter(|filter| filter.stream_name == stream_name)
65+
.collect();
66+
67+
for filter in filters_for_stream.iter() {
68+
PARSEABLE.metastore.delete_filter(filter).await?;
69+
70+
if let Some(filter_id) = filter.filter_id.as_ref() {
71+
FILTERS.delete_filter(filter_id).await;
72+
}
73+
}
74+
5975
// Delete from storage
6076
objectstore.delete_stream(&stream_name).await?;
6177
// Delete from staging

0 commit comments

Comments
 (0)