Skip to content

Commit ef3b4a6

Browse files
committed
feat: get logstream affected resources {filters, dashboards (+ tile_ids)}
1 parent e260bc4 commit ef3b4a6

6 files changed

Lines changed: 186 additions & 8 deletions

File tree

src/handlers/http/logstream.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use self::error::StreamError;
2020
use super::cluster::utils::{IngestionStats, QueriedStats, StorageStats};
2121
use super::query::update_schema_when_distributed;
2222
use crate::event::format::override_data_type;
23+
use crate::handlers::http::modal::utils::logstream_utils::LogstreamAffectedResources;
2324
use crate::hottier::{CURRENT_HOT_TIER_VERSION, HotTierManager, StreamHotTier};
2425
use crate::metadata::SchemaVersion;
2526
use crate::metrics::{EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE};
@@ -494,17 +495,21 @@ pub async fn delete_stream_hot_tier(
494495
))
495496
}
496497

497-
pub async fn get_affected(stream_name: Path<String>) -> Result<impl Responder, StreamError> {
498+
pub async fn get_logstream_affected_resources(
499+
stream_name: Path<String>
500+
) -> Result<impl Responder, StreamError> {
498501
let stream_name = stream_name.into_inner();
499502

500503
// For query mode, if the stream not found in memory map,
501-
//check if it exists in the storage
502-
//create stream and schema from storage
504+
// check if it exists in the storage
505+
// create stream and schema from storage
503506
if !PARSEABLE.check_or_load_stream(&stream_name).await {
504507
return Err(StreamNotFound(stream_name.clone()).into());
505508
}
506509

507-
Ok((web::Json({}), StatusCode::OK))
510+
let affected_resources = LogstreamAffectedResources::load(&stream_name).await;
511+
512+
Ok((web::Json(affected_resources), StatusCode::OK))
508513
}
509514

510515
#[allow(unused)]

src/handlers/http/modal/ingest_server.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,14 @@ impl IngestServer {
272272
.authorize_for_resource(Action::GetStats),
273273
),
274274
)
275+
.service(
276+
// GET "/logstream/{logstream}/affected-resources" ==> Get affected resources for given log stream
277+
web::resource("/affected-resources").route(
278+
web::get()
279+
.to(logstream::get_logstream_affected_resources)
280+
.authorize_for_resource(Action::GetLogstreamAffectedResources),
281+
),
282+
)
275283
.service(
276284
web::scope("/retention").service(
277285
web::resource("/cleanup").route(

src/handlers/http/modal/query_server.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -313,6 +313,14 @@ impl QueryServer {
313313
.authorize_for_resource(Action::GetStats),
314314
),
315315
)
316+
.service(
317+
// GET "/logstream/{logstream}/affected-resources" ==> Get affected resources for given log stream
318+
web::resource("/affected-resources").route(
319+
web::get()
320+
.to(logstream::get_logstream_affected_resources)
321+
.authorize_for_resource(Action::GetLogstreamAffectedResources),
322+
),
323+
)
316324
.service(
317325
web::resource("/retention")
318326
// PUT "/logstream/{logstream}/retention" ==> Set retention for given logstream

src/handlers/http/modal/server.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -488,6 +488,14 @@ impl Server {
488488
.authorize_for_resource(Action::GetStats),
489489
),
490490
)
491+
.service(
492+
// GET "/logstream/{logstream}/affected-resources" ==> Get affected resources for given log stream
493+
web::resource("/affected-resources").route(
494+
web::get()
495+
.to(logstream::get_logstream_affected_resources)
496+
.authorize_for_resource(Action::GetLogstreamAffectedResources),
497+
),
498+
)
491499
.service(
492500
web::resource("/retention")
493501
// PUT "/logstream/{logstream}/retention" ==> Set retention for given logstream

src/handlers/http/modal/utils/logstream_utils.rs

Lines changed: 151 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,29 @@
1717
*/
1818

1919
use actix_web::http::header::HeaderMap;
20+
use datafusion::common::HashSet;
21+
use ulid::Ulid;
2022

2123
use crate::{
22-
event::format::LogSource,
23-
handlers::{
24+
event::format::LogSource, handlers::{
2425
CUSTOM_PARTITION_KEY, LOG_SOURCE_KEY, STATIC_SCHEMA_FLAG, STREAM_TYPE_KEY,
2526
TELEMETRY_TYPE_KEY, TIME_PARTITION_KEY, TIME_PARTITION_LIMIT_KEY, TelemetryType,
2627
UPDATE_STREAM_KEY,
27-
},
28-
storage::StreamType,
28+
},
29+
metastore::MetastoreError,
30+
parseable::{
31+
PARSEABLE, StreamNotFound
32+
},
33+
storage::StreamType,
34+
users::{
35+
dashboards::{Dashboard},
36+
filters::Filter
37+
}
2938
};
3039

40+
/// Field in a dashboard's tile that should contain the logstream name
41+
const TILE_FIELD_REFERRING_TO_STREAM: &str = "chartQuery";
42+
3143
#[derive(Debug, Default)]
3244
pub struct PutStreamHeaders {
3345
pub time_partition: String,
@@ -74,3 +86,138 @@ impl From<&HeaderMap> for PutStreamHeaders {
7486
}
7587
}
7688
}
89+
90+
/// Resources that rely on a specific logstream and will be affected if it gets deleted
91+
#[derive(Debug, Default, serde::Serialize)]
92+
pub struct LogstreamAffectedResources {
93+
pub filters: Vec<Filter>,
94+
pub dashboards: Vec<LogstreamAffectedDashboard>
95+
}
96+
97+
#[derive(Debug, Default, serde::Serialize)]
98+
pub struct LogstreamAffectedDashboard {
99+
pub dashboard: Dashboard,
100+
pub affected_tile_ids: Vec<Ulid>
101+
}
102+
103+
#[derive(thiserror::Error, Debug)]
104+
pub enum LogstreamAffectedResourcesError {
105+
#[error("Stream not found: {0}")]
106+
NoSuchStream(#[from] StreamNotFound),
107+
108+
#[error("Metastore error: {0}")]
109+
FromMetastoreError(#[from] MetastoreError),
110+
}
111+
112+
impl LogstreamAffectedResources {
113+
pub async fn load(stream_name: &str) -> Self {
114+
Self {
115+
filters: LogstreamAffectedResources::fetch_affected_filters(stream_name)
116+
.await
117+
.unwrap_or_else(|e| {
118+
tracing::warn!("failed to fetch filters: {}", e);
119+
Vec::new()
120+
}),
121+
122+
dashboards: LogstreamAffectedResources::fetch_affected_dashboards(stream_name)
123+
.await
124+
.unwrap_or_else(|e| {
125+
tracing::warn!("failed to fetch dashboards: {}", e);
126+
Vec::new()
127+
}),
128+
}
129+
}
130+
131+
pub async fn fetch_affected_filters(
132+
stream_name: &str
133+
) -> Result<Vec<Filter>, LogstreamAffectedResourcesError> {
134+
if !PARSEABLE.streams.contains(stream_name) {
135+
return Err(LogstreamAffectedResourcesError::NoSuchStream(
136+
StreamNotFound(stream_name.to_string())
137+
));
138+
}
139+
140+
Ok(PARSEABLE.metastore.get_filters().await?
141+
.into_iter()
142+
.filter(|filter| filter.stream_name == stream_name)
143+
.collect())
144+
}
145+
146+
pub async fn fetch_affected_dashboards(
147+
stream_name: &str
148+
) -> Result<Vec<LogstreamAffectedDashboard>, LogstreamAffectedResourcesError> {
149+
if !PARSEABLE.streams.contains(stream_name) {
150+
return Err(LogstreamAffectedResourcesError::NoSuchStream(
151+
StreamNotFound(stream_name.to_string())
152+
));
153+
}
154+
155+
let all_dashboards = PARSEABLE.metastore.get_dashboards().await?;
156+
let mut parsed_dashboards = Vec::<Dashboard>::new();
157+
158+
for dashboard in all_dashboards {
159+
if dashboard.is_empty() {
160+
continue;
161+
}
162+
163+
let dashboard_value = match serde_json::from_slice::<serde_json::Value>(&dashboard) {
164+
Ok(value) => value,
165+
Err(err) => {
166+
tracing::warn!("Failed to parse dashboard JSON: {}", err);
167+
continue;
168+
}
169+
};
170+
171+
if let Ok(dashboard) = serde_json::from_value::<Dashboard>(dashboard_value.clone()) {
172+
parsed_dashboards.retain(|d: &Dashboard| {
173+
d.dashboard_id != dashboard.dashboard_id
174+
});
175+
176+
parsed_dashboards.push(dashboard);
177+
} else {
178+
tracing::warn!("Failed to deserialize dashboard: {:?}", dashboard_value);
179+
}
180+
}
181+
182+
let mut affected_dashboards: Vec<LogstreamAffectedDashboard> = vec![];
183+
184+
for dashboard in parsed_dashboards {
185+
let Some(tiles) = dashboard.tiles.as_ref() else {
186+
continue;
187+
};
188+
189+
println!("here");
190+
191+
let mut affected_tile_ids = HashSet::<Ulid>::new();
192+
193+
for tile in tiles {
194+
let Some(tile_fields) = tile.other_fields.as_ref() else {
195+
continue;
196+
};
197+
198+
let Some(tile_value) = tile_fields.get(TILE_FIELD_REFERRING_TO_STREAM) else {
199+
continue;
200+
};
201+
202+
203+
204+
if let Some(chart_query) = tile_value.as_str() {
205+
println!("{}", chart_query);
206+
if chart_query.contains(stream_name) && !affected_tile_ids.contains(&tile.tile_id) {
207+
affected_tile_ids.insert(tile.tile_id);
208+
}
209+
}
210+
}
211+
212+
if !affected_tile_ids.is_empty() {
213+
affected_dashboards.push(LogstreamAffectedDashboard {
214+
dashboard,
215+
affected_tile_ids: affected_tile_ids.into_iter().collect()
216+
});
217+
}
218+
}
219+
220+
println!("h2: {}", affected_dashboards.len());
221+
Ok(affected_dashboards)
222+
}
223+
}

src/rbac/role.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ pub enum Action {
3232
DetectSchema,
3333
GetSchema,
3434
GetStats,
35+
GetLogstreamAffectedResources,
3536
DeleteStream,
3637
GetRetention,
3738
PutRetention,
@@ -164,6 +165,7 @@ impl RoleBuilder {
164165
| Action::GetSchema
165166
| Action::DetectSchema
166167
| Action::GetStats
168+
| Action::GetLogstreamAffectedResources
167169
| Action::GetRetention
168170
| Action::PutRetention
169171
| Action::All => Permission::Resource(action, self.resource_type.clone().unwrap()),

0 commit comments

Comments
 (0)