@@ -58,11 +58,10 @@ use crate::alerts::alert_types::ThresholdAlert;
5858use crate :: alerts:: target:: { NotificationConfig , TARGETS } ;
5959use crate :: handlers:: http:: fetch_schema;
6060use crate :: metastore:: MetastoreError ;
61- // use crate::handlers::http::query::create_streams_for_distributed;
62- // use crate::option::Mode;
6361use crate :: parseable:: { PARSEABLE , StreamNotFound } ;
6462use crate :: query:: { QUERY_SESSION , resolve_stream_names} ;
65- use crate :: rbac:: map:: SessionKey ;
63+ use crate :: rbac:: map:: { SessionKey , sessions} ;
64+ use crate :: sse:: { SSE_HANDLER , SSEAlertInfo , SSEEvent } ;
6665use crate :: storage;
6766use crate :: storage:: ObjectStorageError ;
6867use crate :: sync:: alert_runtime;
@@ -606,12 +605,39 @@ impl AlertConfig {
606605
607606 pub async fn trigger_notifications ( & self , message : String ) -> Result < ( ) , AlertError > {
608607 let mut context = self . get_context ( ) ;
609- context. message = message;
608+ context. message . clone_from ( & message) ;
609+
610610 for target_id in & self . targets {
611611 let target = TARGETS . get_target_by_id ( target_id) . await ?;
612612 trace ! ( "Target (trigger_notifications)-\n {target:?}" ) ;
613613 target. call ( context. clone ( ) ) ;
614614 }
615+
616+ // get active sessions
617+ let active_sessions = sessions ( ) . get_active_sessions ( ) ;
618+ let mut broadcast_to = vec ! [ ] ;
619+ for session in active_sessions {
620+ if user_auth_for_query ( & session, & self . query ) . await . is_ok ( )
621+ && let SessionKey :: SessionId ( id) = & session
622+ {
623+ broadcast_to. push ( * id) ;
624+ }
625+ }
626+
627+ if self . state . eq ( & AlertState :: Triggered )
628+ && let Ok ( msg) = & serde_json:: to_string ( & SSEEvent {
629+ criticality : crate :: sse:: Criticality :: Error ,
630+ message : crate :: sse:: Message :: AlertEvent ( SSEAlertInfo {
631+ id : self . id ,
632+ state : self . state ,
633+ name : self . title . clone ( ) ,
634+ } ) ,
635+ } )
636+ && !broadcast_to. is_empty ( )
637+ {
638+ SSE_HANDLER . broadcast ( msg, Some ( & broadcast_to) ) . await ;
639+ }
640+
615641 Ok ( ( ) )
616642 }
617643
0 commit comments