1616 *
1717 */
1818
19- use std:: collections:: HashSet ;
2019use actix_web:: http:: header:: HeaderMap ;
2120use bytes:: Bytes ;
2221use ulid:: Ulid ;
2322
2423use crate :: {
25- alerts:: AlertConfig , event:: format:: LogSource , handlers:: {
24+ alerts:: AlertConfig ,
25+ event:: format:: LogSource ,
26+ handlers:: {
2627 CUSTOM_PARTITION_KEY , LOG_SOURCE_KEY , STATIC_SCHEMA_FLAG , STREAM_TYPE_KEY ,
2728 TELEMETRY_TYPE_KEY , TIME_PARTITION_KEY , TIME_PARTITION_LIMIT_KEY , TelemetryType ,
28- UPDATE_STREAM_KEY ,
29+ UPDATE_STREAM_KEY , http :: logstream :: error :: StreamError ,
2930 } ,
3031 metastore:: MetastoreError ,
31- parseable:: {
32- PARSEABLE , StreamNotFound
32+ parseable:: { PARSEABLE , StreamNotFound } ,
33+ rbac:: role:: {
34+ ParseableResourceType ,
35+ model:: DefaultPrivilege
3336 } ,
34- storage:: StreamType ,
37+ storage:: { ObjectStorageError , StorageMetadata , StreamType } ,
3538 users:: dashboards:: Dashboard
3639} ;
3740
@@ -89,7 +92,9 @@ impl From<&HeaderMap> for PutStreamHeaders {
8992#[ derive( Debug , Default , serde:: Serialize ) ]
9093pub struct LogstreamAffectedResources {
9194 pub filters : Vec < String > ,
92- pub dashboards : Vec < LogstreamAffectedDashboard >
95+ pub dashboards : Vec < LogstreamAffectedDashboard > ,
96+ pub alerts : Vec < Ulid > ,
97+ pub roles : Vec < String > ,
9398}
9499
95100#[ derive( Debug , Default , serde:: Serialize ) ]
@@ -100,39 +105,44 @@ pub struct LogstreamAffectedDashboard {
100105
101106#[ derive( thiserror:: Error , Debug ) ]
102107pub enum LogstreamAffectedResourcesError {
103- #[ error( "Stream not found: {0}" ) ]
104- NoSuchStream ( #[ from] StreamNotFound ) ,
108+ #[ error( "(to fetch affected resources) logstream not found: {0}" ) ]
109+ StreamNotFound ( #[ from] StreamNotFound ) ,
105110
106- #[ error( "Metastore error: {0}" ) ]
107- FromMetastoreError ( #[ from] MetastoreError ) ,
111+ #[ error( "(get affected resources) metastore error: {0}" ) ]
112+ MetastoreError ( #[ from] MetastoreError ) ,
113+
114+ #[ error( "(get affected resources) objectstore error: {0}" ) ]
115+ ObjectStorageError ( #[ from] ObjectStorageError ) ,
116+
117+ #[ error( "(get affected resources) could not parse JSON: {0}" ) ]
118+ Bytes2JSONError ( #[ from] Bytes2JSONError )
108119}
109120
110121impl LogstreamAffectedResources {
111- pub async fn load ( stream_name : & str ) -> Self {
112- Self {
113- filters : LogstreamAffectedResources :: fetch_affected_filters ( stream_name)
114- . await
115- . unwrap_or_else ( |e| {
116- tracing:: warn!( "failed to fetch filters: {}" , e) ;
117- Vec :: new ( )
118- } ) ,
119-
120- dashboards : LogstreamAffectedResources :: fetch_affected_dashboards ( stream_name)
121- . await
122- . unwrap_or_else ( |e| {
123- tracing:: warn!( "failed to fetch dashboards: {}" , e) ;
124- Vec :: new ( )
125- } ) ,
126- }
122+ /// Load all resources that will be affected if the given logstream is deleted.
123+ ///
124+ /// ### Arguments
125+ /// - `stream_name` - The name of the logstream to check for dependencies
126+ ///
127+ /// ### Returns
128+ /// A tuple where:
129+ /// - First element: `true` if no resources are affected (empty loaded struct), `false` otherwise
130+ /// - Second element: The populated `LogstreamAffectedResources` struct
131+
132+ pub async fn load ( stream_name : & str ) -> Result < Self , LogstreamAffectedResourcesError > {
133+ Ok ( Self {
134+ filters : Self :: fetch_affected_filters ( stream_name) . await ?,
135+ dashboards : Self :: fetch_affected_dashboards ( stream_name) . await ?,
136+ alerts : Self :: fetch_affected_alerts ( stream_name) . await ?,
137+ roles : Self :: fetch_affected_roles ( stream_name) . await ?,
138+ } )
127139 }
128140
129141 pub async fn fetch_affected_filters (
130142 stream_name : & str
131143 ) -> Result < Vec < String > , LogstreamAffectedResourcesError > {
132144 if !PARSEABLE . streams . contains ( stream_name) {
133- return Err ( LogstreamAffectedResourcesError :: NoSuchStream (
134- StreamNotFound ( stream_name. to_string ( ) )
135- ) ) ;
145+ return Err ( StreamNotFound ( stream_name. to_string ( ) ) . into ( ) ) ;
136146 }
137147
138148 Ok ( PARSEABLE . metastore . get_filters ( ) . await ?
@@ -149,9 +159,7 @@ impl LogstreamAffectedResources {
149159 stream_name : & str
150160 ) -> Result < Vec < LogstreamAffectedDashboard > , LogstreamAffectedResourcesError > {
151161 if !PARSEABLE . streams . contains ( stream_name) {
152- return Err ( LogstreamAffectedResourcesError :: NoSuchStream (
153- StreamNotFound ( stream_name. to_string ( ) )
154- ) ) ;
162+ return Err ( StreamNotFound ( stream_name. to_string ( ) ) . into ( ) ) ;
155163 }
156164
157165 let all_dashboards = PARSEABLE . metastore . get_dashboards ( ) . await ?;
@@ -178,8 +186,7 @@ impl LogstreamAffectedResources {
178186 continue ;
179187 } ;
180188
181- let mut affected_tile_ids = HashSet :: < Ulid > :: new ( ) ;
182-
189+ let mut affected_tile_ids = Vec :: < Ulid > :: new ( ) ;
183190 for tile in tiles {
184191 let Some ( tile_fields) = tile. other_fields . as_ref ( ) else {
185192 continue ;
@@ -191,7 +198,7 @@ impl LogstreamAffectedResources {
191198
192199 if let Some ( chart_query) = tile_value. as_str ( ) {
193200 if chart_query. contains ( stream_name) && !affected_tile_ids. contains ( & tile. tile_id ) {
194- affected_tile_ids. insert ( tile. tile_id ) ;
201+ affected_tile_ids. push ( tile. tile_id ) ;
195202 }
196203 }
197204 }
@@ -202,7 +209,7 @@ impl LogstreamAffectedResources {
202209 tracing:: warn!( "dashboard {}: [id] is missing -- for logstream {}" , dash_i, stream_name) ;
203210 Ulid :: new ( ) // default to a new ULID if missing -- what else?
204211 } ) ,
205- affected_tile_ids : affected_tile_ids . into_iter ( ) . collect ( )
212+ affected_tile_ids
206213 } ) ;
207214 }
208215 }
@@ -214,14 +221,12 @@ impl LogstreamAffectedResources {
214221 stream_name : & str
215222 ) -> Result < Vec < Ulid > , LogstreamAffectedResourcesError > {
216223 if !PARSEABLE . streams . contains ( stream_name) {
217- return Err ( LogstreamAffectedResourcesError :: NoSuchStream (
218- StreamNotFound ( stream_name. to_string ( ) )
219- ) ) ;
224+ return Err ( StreamNotFound ( stream_name. to_string ( ) ) . into ( ) ) ;
220225 }
221226
222227 let all_alerts = PARSEABLE . metastore . get_alerts ( ) . await ?;
223228
224- let mut stream_alerts = HashSet :: < Ulid > :: new ( ) ;
229+ let mut stream_alerts = Vec :: < Ulid > :: new ( ) ;
225230 for alert_bytes in all_alerts {
226231 let alert = match self :: bytes_to_json :: < AlertConfig > ( alert_bytes) {
227232 Ok ( alert_val) => alert_val,
@@ -231,16 +236,98 @@ impl LogstreamAffectedResources {
231236 }
232237 } ;
233238
234- if !alert. datasets . contains ( & stream_name. to_string ( ) ) { continue } ;
235- stream_alerts. insert ( alert. id ) ;
239+ if !alert. datasets . iter ( ) . any ( |s| s == stream_name) {
240+ continue
241+ } ;
242+
243+ if !stream_alerts. contains ( & alert. id ) {
244+ stream_alerts. push ( alert. id ) ;
245+ }
236246 }
237247
238- Ok ( stream_alerts. into_iter ( ) . collect ( ) )
248+ Ok ( stream_alerts)
249+ }
250+
251+
252+ pub async fn fetch_affected_roles (
253+ stream_name : & str
254+ ) -> Result < Vec < String > , LogstreamAffectedResourcesError > {
255+ if !PARSEABLE . streams . contains ( stream_name) {
256+ return Err ( StreamNotFound ( stream_name. to_string ( ) ) . into ( ) ) ;
257+ }
258+
259+ let metadata_bytes = PARSEABLE
260+ . metastore
261+ . get_parseable_metadata ( )
262+ . await
263+ . map_err ( |e| ObjectStorageError :: MetastoreError ( Box :: new ( e. to_detail ( ) ) ) ) ?
264+ . ok_or_else ( || ObjectStorageError :: Custom ( "parseable metadata not initialized" . into ( ) ) ) ?;
265+
266+ let metadata = self :: bytes_to_json :: < StorageMetadata > ( metadata_bytes) ?;
267+
268+ let mut stream_associated_roles = Vec :: < String > :: new ( ) ;
269+ for ( role_name, privileges) in & metadata. roles {
270+ for privilege in privileges {
271+
272+ let associated_stream = match privilege {
273+ DefaultPrivilege :: Ingestor { resource } => {
274+ match resource {
275+ ParseableResourceType :: Stream ( stream) => stream,
276+ _ => continue
277+ }
278+ } ,
279+
280+ DefaultPrivilege :: Reader { resource } => {
281+ match resource {
282+ ParseableResourceType :: Stream ( stream) => stream,
283+ _ => continue
284+ }
285+ } ,
286+
287+ DefaultPrivilege :: Writer { resource } => {
288+ match resource {
289+ ParseableResourceType :: Stream ( stream) => stream,
290+ _ => continue
291+ }
292+ } ,
293+
294+ _ => continue
295+ } ;
296+
297+ if associated_stream == stream_name && !stream_associated_roles. contains ( role_name) {
298+ stream_associated_roles. push ( role_name. to_string ( ) ) ;
299+
300+ // if any role privilege matches the input stream,
301+ // add the role to the set and break
302+ break ;
303+ }
304+
305+ }
306+ }
307+
308+ Ok ( stream_associated_roles)
309+ }
310+ }
311+
312+
313+ impl From < LogstreamAffectedResourcesError > for StreamError {
314+ fn from ( err : LogstreamAffectedResourcesError ) -> Self {
315+ match err {
316+ LogstreamAffectedResourcesError :: StreamNotFound ( e) => {
317+ StreamError :: StreamNotFound ( e)
318+ }
319+ LogstreamAffectedResourcesError :: MetastoreError ( e) => {
320+ StreamError :: MetastoreError ( e)
321+ }
322+ other => {
323+ StreamError :: Anyhow ( anyhow:: anyhow!( other. to_string( ) ) )
324+ }
325+ }
239326 }
240327}
241328
242329
243- // utility funcs :
330+ // utility:
244331
245332#[ derive( Debug , thiserror:: Error ) ]
246333pub enum Bytes2JSONError {
0 commit comments