@@ -13,9 +13,10 @@ use std::sync::Arc;
1313use tracing:: { error, info} ;
1414use uuid:: Uuid ;
1515
16- use crate :: { AppError , AppState } ;
17- use crate :: auth:: { AuthUser , jwt:: JwtManager } ;
16+ use crate :: auth :: middleware :: verify_api_key ;
17+ use crate :: auth:: { jwt:: JwtManager , AuthUser } ;
1818use crate :: services:: kubernetes:: KubernetesService ;
19+ use crate :: { AppError , AppState } ;
1920
2021#[ derive( Deserialize ) ]
2122pub struct LogsQuery {
@@ -40,8 +41,6 @@ pub async fn ws_logs_handler(
4041 ws. on_upgrade ( move |socket| handle_socket ( socket, state, deployment_id, query) )
4142}
4243
43-
44-
4544async fn handle_socket (
4645 socket : WebSocket ,
4746 state : Arc < AppState > ,
@@ -50,7 +49,7 @@ async fn handle_socket(
5049) {
5150 // Extract token before using query elsewhere
5251 let token = query. token . clone ( ) ;
53-
52+
5453 // Authenticate user via token first, before splitting the socket
5554 let user_id = match authenticate_websocket_user ( & state, token) . await {
5655 Ok ( user_id) => user_id,
@@ -87,15 +86,19 @@ async fn handle_socket_with_user_internal(
8786 Ok ( false ) => {
8887 error ! ( "User {} does not own deployment {}" , user_id, deployment_id) ;
8988 let _ = sender
90- . send ( Message :: Text ( "Error: Deployment not found or access denied" . to_string ( ) ) )
89+ . send ( Message :: Text (
90+ "Error: Deployment not found or access denied" . to_string ( ) ,
91+ ) )
9192 . await ;
9293 let _ = sender. send ( Message :: Close ( None ) ) . await ;
9394 return ;
9495 }
9596 Err ( e) => {
9697 error ! ( "Failed to verify deployment ownership: {}" , e) ;
9798 let _ = sender
98- . send ( Message :: Text ( "Error: Failed to verify deployment access" . to_string ( ) ) )
99+ . send ( Message :: Text (
100+ "Error: Failed to verify deployment access" . to_string ( ) ,
101+ ) )
99102 . await ;
100103 let _ = sender. send ( Message :: Close ( None ) ) . await ;
101104 return ;
@@ -106,19 +109,31 @@ async fn handle_socket_with_user_internal(
106109 let k8s_service = match KubernetesService :: for_deployment ( & deployment_id, & user_id) . await {
107110 Ok ( service) => service,
108111 Err ( e) => {
109- error ! ( "Failed to create K8s service for deployment {} (user {}): {}" , deployment_id, user_id, e) ;
112+ error ! (
113+ "Failed to create K8s service for deployment {} (user {}): {}" ,
114+ deployment_id, user_id, e
115+ ) ;
110116 let _ = sender
111- . send ( Message :: Text ( format ! ( "Error: Failed to connect to Kubernetes: {}" , e) ) )
117+ . send ( Message :: Text ( format ! (
118+ "Error: Failed to connect to Kubernetes: {}" ,
119+ e
120+ ) ) )
112121 . await ;
113122 let _ = sender. send ( Message :: Close ( None ) ) . await ;
114123 return ;
115124 }
116125 } ;
117126
118127 // Start streaming logs
119- match k8s_service. stream_logs_realtime ( & deployment_id, query. tail ) . await {
128+ match k8s_service
129+ . stream_logs_realtime ( & deployment_id, query. tail )
130+ . await
131+ {
120132 Ok ( mut log_stream) => {
121- info ! ( "Started log stream for deployment: {} (user: {})" , deployment_id, user_id) ;
133+ info ! (
134+ "Started log stream for deployment: {} (user: {})" ,
135+ deployment_id, user_id
136+ ) ;
122137
123138 // Send initial logs from stream
124139 let mut line_count = 0 ;
@@ -142,12 +157,15 @@ async fn handle_socket_with_user_internal(
142157
143158 // If we have logs, keep connection open for potential new logs
144159 if line_count > 0 {
145- let _ = sender. send ( Message :: Text ( "--- End of current logs ---" . to_string ( ) ) ) . await ;
146-
160+ let _ = sender
161+ . send ( Message :: Text ( "--- End of current logs ---" . to_string ( ) ) )
162+ . await ;
163+
147164 // Keep WebSocket connection alive for potential future logs
148165 // This is a simple keepalive mechanism
149166 let mut interval = tokio:: time:: interval ( tokio:: time:: Duration :: from_secs ( 30 ) ) ;
150- for _ in 0 ..10 { // Keep alive for 5 minutes
167+ for _ in 0 ..10 {
168+ // Keep alive for 5 minutes
151169 interval. tick ( ) . await ;
152170 if sender. send ( Message :: Ping ( vec ! [ ] ) ) . await . is_err ( ) {
153171 break ;
@@ -156,7 +174,9 @@ async fn handle_socket_with_user_internal(
156174 }
157175
158176 // Cleanup
159- let _ = sender. send ( Message :: Text ( "Log stream ended" . to_string ( ) ) ) . await ;
177+ let _ = sender
178+ . send ( Message :: Text ( "Log stream ended" . to_string ( ) ) )
179+ . await ;
160180 let _ = sender. send ( Message :: Close ( None ) ) . await ;
161181 info ! ( "Log stream ended for deployment: {}" , deployment_id) ;
162182 }
@@ -168,37 +188,40 @@ async fn handle_socket_with_user_internal(
168188 }
169189}
170190
171-
172-
173191// Authentication function for WebSocket using your existing JWT system
174192async fn authenticate_websocket_user (
175193 state : & AppState ,
176194 token : Option < String > ,
177195) -> Result < Uuid , AppError > {
178-
179196 let token = token. ok_or_else ( || {
180197 error ! ( "No token provided for WebSocket authentication" ) ;
181198 AppError :: auth ( "Token required" )
182199 } ) ?;
183-
200+
184201 // Remove "Bearer " prefix if present
185202 let token = token. strip_prefix ( "Bearer " ) . unwrap_or ( & token) ;
186-
187- // Use your existing JWT verification logic
188- let jwt_manager = JwtManager :: new ( & state. config . jwt_secret , state. config . jwt_expires_in ) ;
189- let claims = jwt_manager. verify_token ( token)
190- . map_err ( |e| {
203+ let user_id = if token. starts_with ( & state. config . api_key_prefix ) {
204+ // API Key authentication
205+ let user_id = verify_api_key ( & state, & token) . await ?;
206+ info ! ( "API key authenticated for user {}" , user_id) ;
207+ user_id
208+ } else {
209+ // JWT token authentication
210+ let jwt_manager = JwtManager :: new ( & state. config . jwt_secret , state. config . jwt_expires_in ) ;
211+ let claims = jwt_manager. verify_token ( token) . map_err ( |e| {
191212 error ! ( "JWT verification failed: {}" , e) ;
192213 e
193214 } ) ?;
194-
195- let user_id = Uuid :: parse_str ( & claims. sub )
196- . map_err ( |e| {
215+
216+ let user_id = Uuid :: parse_str ( & claims. sub ) . map_err ( |e| {
197217 error ! ( "Invalid UUID in token claims: {}" , e) ;
198218 AppError :: auth ( "Invalid token format" )
199219 } ) ?;
200-
201-
220+
221+ info ! ( "JWT authenticated for user {}" , user_id) ;
222+ user_id
223+ } ;
224+
202225 // Verify user exists and is active in database
203226 let user_exists = sqlx:: query!(
204227 "SELECT id FROM users WHERE id = $1 AND is_active = true" ,
@@ -210,7 +233,7 @@ async fn authenticate_websocket_user(
210233 error ! ( "Database error during user verification: {}" , e) ;
211234 AppError :: internal ( & format ! ( "Database error: {}" , e) )
212235 } ) ?;
213-
236+
214237 match user_exists {
215238 Some ( _) => {
216239 info ! ( "User {} authenticated successfully for WebSocket" , user_id) ;
@@ -248,7 +271,7 @@ pub async fn get_logs_handler(
248271 State ( state) : State < AppState > ,
249272 Path ( deployment_id) : Path < Uuid > ,
250273 Query ( query) : Query < LogsQuery > ,
251- user : AuthUser ,
274+ user : AuthUser ,
252275) -> Result < axum:: response:: Json < LogsResponse > , AppError > {
253276 // Verify deployment ownership
254277 if !verify_deployment_ownership ( & state, deployment_id, user. user_id ) . await ? {
0 commit comments