Skip to content

Commit eeb085f

Browse files
committed
feat: fetch affected alerts in LogstreamAffectedResources
1 parent 99bc6cd commit eeb085f

1 file changed

Lines changed: 81 additions & 34 deletions

File tree

src/handlers/http/modal/utils/logstream_utils.rs

Lines changed: 81 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,11 @@
1818

1919
use std::collections::HashSet;
2020
use actix_web::http::header::HeaderMap;
21+
use bytes::Bytes;
2122
use ulid::Ulid;
2223

2324
use crate::{
24-
event::format::LogSource, handlers::{
25+
alerts::AlertConfig, event::format::LogSource, handlers::{
2526
CUSTOM_PARTITION_KEY, LOG_SOURCE_KEY, STATIC_SCHEMA_FLAG, STREAM_TYPE_KEY,
2627
TELEMETRY_TYPE_KEY, TIME_PARTITION_KEY, TIME_PARTITION_LIMIT_KEY, TelemetryType,
2728
UPDATE_STREAM_KEY,
@@ -31,10 +32,7 @@ use crate::{
3132
PARSEABLE, StreamNotFound
3233
},
3334
storage::StreamType,
34-
users::{
35-
dashboards::{Dashboard},
36-
filters::Filter
37-
}
35+
users::dashboards::Dashboard
3836
};
3937

4038
/// Field in a dashboard's tile that should contain the logstream name
@@ -90,13 +88,13 @@ impl From<&HeaderMap> for PutStreamHeaders {
9088
/// Resources that rely on a specific logstream and will be affected if it gets deleted
9189
#[derive(Debug, Default, serde::Serialize)]
9290
pub struct LogstreamAffectedResources {
93-
pub filters: Vec<Filter>,
91+
pub filters: Vec<String>,
9492
pub dashboards: Vec<LogstreamAffectedDashboard>
9593
}
9694

9795
#[derive(Debug, Default, serde::Serialize)]
9896
pub struct LogstreamAffectedDashboard {
99-
pub dashboard: Dashboard,
97+
pub dashboard_id: Ulid,
10098
pub affected_tile_ids: Vec<Ulid>
10199
}
102100

@@ -130,7 +128,7 @@ impl LogstreamAffectedResources {
130128

131129
pub async fn fetch_affected_filters(
132130
stream_name: &str
133-
) -> Result<Vec<Filter>, LogstreamAffectedResourcesError> {
131+
) -> Result<Vec<String>, LogstreamAffectedResourcesError> {
134132
if !PARSEABLE.streams.contains(stream_name) {
135133
return Err(LogstreamAffectedResourcesError::NoSuchStream(
136134
StreamNotFound(stream_name.to_string())
@@ -139,8 +137,12 @@ impl LogstreamAffectedResources {
139137

140138
Ok(PARSEABLE.metastore.get_filters().await?
141139
.into_iter()
142-
.filter(|filter| filter.stream_name == stream_name)
143-
.collect())
140+
.filter_map(|filter| {
141+
if filter.stream_name == stream_name &&
142+
let Some(f_id) = filter.filter_id {
143+
Some(f_id)
144+
} else { None }
145+
}).collect())
144146
}
145147

146148
pub async fn fetch_affected_dashboards(
@@ -155,39 +157,27 @@ impl LogstreamAffectedResources {
155157
let all_dashboards = PARSEABLE.metastore.get_dashboards().await?;
156158
let mut parsed_dashboards = Vec::<Dashboard>::new();
157159

158-
for dashboard in all_dashboards {
159-
if dashboard.is_empty() {
160-
continue;
161-
}
162-
163-
let dashboard_value = match serde_json::from_slice::<serde_json::Value>(&dashboard) {
164-
Ok(value) => value,
165-
Err(err) => {
166-
tracing::warn!("Failed to parse dashboard JSON: {}", err);
160+
for dashboard_bytes in all_dashboards {
161+
let dashboard = match self::bytes_to_json::<Dashboard>(dashboard_bytes) {
162+
Ok(d) => d,
163+
Err(e) => {
164+
tracing::warn!("{}", e.to_string());
167165
continue;
168166
}
169167
};
170168

171-
if let Ok(dashboard) = serde_json::from_value::<Dashboard>(dashboard_value.clone()) {
172-
parsed_dashboards.retain(|d: &Dashboard| {
173-
d.dashboard_id != dashboard.dashboard_id
174-
});
175-
169+
if !parsed_dashboards.iter().any(|d| d.dashboard_id == dashboard.dashboard_id) {
176170
parsed_dashboards.push(dashboard);
177-
} else {
178-
tracing::warn!("Failed to deserialize dashboard: {:?}", dashboard_value);
179171
}
180172
}
181173

182174
let mut affected_dashboards: Vec<LogstreamAffectedDashboard> = vec![];
183175

184-
for dashboard in parsed_dashboards {
176+
for (dash_i, dashboard) in parsed_dashboards.iter().enumerate() {
185177
let Some(tiles) = dashboard.tiles.as_ref() else {
186178
continue;
187179
};
188180

189-
println!("here");
190-
191181
let mut affected_tile_ids = HashSet::<Ulid>::new();
192182

193183
for tile in tiles {
@@ -199,10 +189,7 @@ impl LogstreamAffectedResources {
199189
continue;
200190
};
201191

202-
203-
204192
if let Some(chart_query) = tile_value.as_str() {
205-
println!("{}", chart_query);
206193
if chart_query.contains(stream_name) && !affected_tile_ids.contains(&tile.tile_id) {
207194
affected_tile_ids.insert(tile.tile_id);
208195
}
@@ -211,13 +198,73 @@ impl LogstreamAffectedResources {
211198

212199
if !affected_tile_ids.is_empty() {
213200
affected_dashboards.push(LogstreamAffectedDashboard {
214-
dashboard,
201+
dashboard_id: dashboard.dashboard_id.unwrap_or_else(|| {
202+
tracing::warn!("dashboard {}: [id] is missing -- for logstream {}", dash_i, stream_name);
203+
Ulid::new() // default to a new ULID if missing -- what else?
204+
}),
215205
affected_tile_ids: affected_tile_ids.into_iter().collect()
216206
});
217207
}
218208
}
219209

220-
println!("h2: {}", affected_dashboards.len());
221210
Ok(affected_dashboards)
222211
}
212+
213+
pub async fn fetch_affected_alerts(
214+
stream_name: &str
215+
) -> Result<Vec<Ulid>, LogstreamAffectedResourcesError> {
216+
if !PARSEABLE.streams.contains(stream_name) {
217+
return Err(LogstreamAffectedResourcesError::NoSuchStream(
218+
StreamNotFound(stream_name.to_string())
219+
));
220+
}
221+
222+
let all_alerts = PARSEABLE.metastore.get_alerts().await?;
223+
224+
let mut stream_alerts = HashSet::<Ulid>::new();
225+
for alert_bytes in all_alerts {
226+
let alert = match self::bytes_to_json::<AlertConfig>(alert_bytes) {
227+
Ok(alert_val) => alert_val,
228+
Err(e) => {
229+
tracing::warn!("{}", e.to_string());
230+
continue;
231+
}
232+
};
233+
234+
if !alert.datasets.contains(&stream_name.to_string()) { continue };
235+
stream_alerts.insert(alert.id);
236+
}
237+
238+
Ok(stream_alerts.into_iter().collect())
239+
}
240+
}
241+
242+
243+
// utility funcs:
244+
245+
#[derive(Debug, thiserror::Error)]
246+
pub enum Bytes2JSONError {
247+
#[error("zero sized Bytes")]
248+
ZeroSizedBytes,
249+
250+
#[error("failed to parse bytes to JSON: {0}")]
251+
FailedToParse(String)
252+
}
253+
254+
fn bytes_to_json<T: serde::de::DeserializeOwned>(json_bytes: Bytes) -> Result<T, Bytes2JSONError> {
255+
if json_bytes.is_empty() {
256+
return Err(Bytes2JSONError::ZeroSizedBytes);
257+
}
258+
259+
let json_bytes_value = match serde_json::from_slice::<serde_json::Value>(&json_bytes) {
260+
Ok(value) => value,
261+
Err(err) => {
262+
return Err(Bytes2JSONError::FailedToParse(format!("{:#?}", err)))
263+
}
264+
};
265+
266+
return match serde_json::from_value::<T>(json_bytes_value.clone()) {
267+
Ok(parsed_object) => Ok(parsed_object),
268+
Err(e) => Err(Bytes2JSONError::FailedToParse(format!("deserialization failed: {:#?}", e)))
269+
};
223270
}

0 commit comments

Comments
 (0)