@@ -104,7 +104,7 @@ func (h *handler) Handle(ctx context.Context, header *protocol.RequestHeader, re
104104 h .logger .Debug ("received request" , "api_key" , header .APIKey , "api_version" , header .APIVersion , "correlation" , header .CorrelationID , "client_id" , header .ClientID )
105105 }
106106 principal := principalFromContext (ctx , header )
107- switch req .(type ) {
107+ switch req := req .(type ) {
108108 case * protocol.ApiVersionsRequest :
109109 errorCode := protocol .NONE
110110 responseVersion := header .APIVersion
@@ -122,7 +122,7 @@ func (h *handler) Handle(ctx context.Context, header *protocol.RequestHeader, re
122122 }
123123 return protocol .EncodeApiVersionsResponse (resp , responseVersion )
124124 case * protocol.MetadataRequest :
125- metaReq := req .( * protocol. MetadataRequest )
125+ metaReq := req
126126 if h .traceKafka {
127127 h .logger .Debug ("metadata request" , "topics" , metaReq .Topics , "topic_ids" , len (metaReq .TopicIDs ))
128128 }
@@ -201,9 +201,9 @@ func (h *handler) Handle(ctx context.Context, header *protocol.RequestHeader, re
201201 }
202202 return protocol .EncodeMetadataResponse (resp , header .APIVersion )
203203 case * protocol.ProduceRequest :
204- return h .handleProduce (ctx , header , req .( * protocol. ProduceRequest ) )
204+ return h .handleProduce (ctx , header , req )
205205 case * protocol.FetchRequest :
206- return h .handleFetch (ctx , header , req .( * protocol. FetchRequest ) )
206+ return h .handleFetch (ctx , header , req )
207207 case * protocol.FindCoordinatorRequest :
208208 coord := h .coordinatorBroker (ctx )
209209 resp := & protocol.FindCoordinatorResponse {
@@ -216,7 +216,6 @@ func (h *handler) Handle(ctx context.Context, header *protocol.RequestHeader, re
216216 }
217217 return protocol .EncodeFindCoordinatorResponse (resp , header .APIVersion )
218218 case * protocol.JoinGroupRequest :
219- req := req .(* protocol.JoinGroupRequest )
220219 if ! h .allowGroup (principal , req .GroupID , acl .ActionGroupWrite ) {
221220 h .recordAuthzDeniedWithPrincipal (principal , acl .ActionGroupWrite , acl .ResourceGroup , req .GroupID )
222221 return protocol .EncodeJoinGroupResponse (& protocol.JoinGroupResponse {
@@ -243,7 +242,6 @@ func (h *handler) Handle(ctx context.Context, header *protocol.RequestHeader, re
243242 }
244243 return protocol .EncodeJoinGroupResponse (resp , header .APIVersion )
245244 case * protocol.SyncGroupRequest :
246- req := req .(* protocol.SyncGroupRequest )
247245 if ! h .allowGroup (principal , req .GroupID , acl .ActionGroupWrite ) {
248246 h .recordAuthzDeniedWithPrincipal (principal , acl .ActionGroupWrite , acl .ResourceGroup , req .GroupID )
249247 return protocol .EncodeSyncGroupResponse (& protocol.SyncGroupResponse {
@@ -270,7 +268,6 @@ func (h *handler) Handle(ctx context.Context, header *protocol.RequestHeader, re
270268 }
271269 return protocol .EncodeSyncGroupResponse (resp , header .APIVersion )
272270 case * protocol.DescribeGroupsRequest :
273- req := req .(* protocol.DescribeGroupsRequest )
274271 return h .withAdminMetrics (header .APIKey , func () ([]byte , error ) {
275272 allowed := make ([]string , 0 , len (req .Groups ))
276273 denied := make (map [string ]struct {})
@@ -361,14 +358,13 @@ func (h *handler) Handle(ctx context.Context, header *protocol.RequestHeader, re
361358 Groups : nil ,
362359 }, header .APIVersion )
363360 }
364- resp , err := h .coordinator .ListGroups (ctx , req .( * protocol. ListGroupsRequest ) , header .CorrelationID )
361+ resp , err := h .coordinator .ListGroups (ctx , req , header .CorrelationID )
365362 if err != nil {
366363 return nil , err
367364 }
368365 return protocol .EncodeListGroupsResponse (resp , header .APIVersion )
369366 })
370367 case * protocol.HeartbeatRequest :
371- req := req .(* protocol.HeartbeatRequest )
372368 if ! h .allowGroup (principal , req .GroupID , acl .ActionGroupWrite ) {
373369 h .recordAuthzDeniedWithPrincipal (principal , acl .ActionGroupWrite , acl .ResourceGroup , req .GroupID )
374370 return protocol .EncodeHeartbeatResponse (& protocol.HeartbeatResponse {
@@ -392,7 +388,6 @@ func (h *handler) Handle(ctx context.Context, header *protocol.RequestHeader, re
392388 resp := h .coordinator .Heartbeat (ctx , req , header .CorrelationID )
393389 return protocol .EncodeHeartbeatResponse (resp , header .APIVersion )
394390 case * protocol.LeaveGroupRequest :
395- req := req .(* protocol.LeaveGroupRequest )
396391 if ! h .allowGroup (principal , req .GroupID , acl .ActionGroupWrite ) {
397392 h .recordAuthzDeniedWithPrincipal (principal , acl .ActionGroupWrite , acl .ResourceGroup , req .GroupID )
398393 return protocol .EncodeLeaveGroupResponse (& protocol.LeaveGroupResponse {
@@ -415,7 +410,6 @@ func (h *handler) Handle(ctx context.Context, header *protocol.RequestHeader, re
415410 resp := h .coordinator .LeaveGroup (ctx , req , header .CorrelationID )
416411 return protocol .EncodeLeaveGroupResponse (resp )
417412 case * protocol.OffsetCommitRequest :
418- req := req .(* protocol.OffsetCommitRequest )
419413 if ! h .allowGroup (principal , req .GroupID , acl .ActionGroupWrite ) {
420414 h .recordAuthzDeniedWithPrincipal (principal , acl .ActionGroupWrite , acl .ResourceGroup , req .GroupID )
421415 topics := make ([]protocol.OffsetCommitTopicResponse , 0 , len (req .Topics ))
@@ -485,7 +479,6 @@ func (h *handler) Handle(ctx context.Context, header *protocol.RequestHeader, re
485479 }
486480 return protocol .EncodeOffsetCommitResponse (resp )
487481 case * protocol.OffsetFetchRequest :
488- req := req .(* protocol.OffsetFetchRequest )
489482 if ! h .allowGroup (principal , req .GroupID , acl .ActionGroupRead ) {
490483 h .recordAuthzDeniedWithPrincipal (principal , acl .ActionGroupRead , acl .ResourceGroup , req .GroupID )
491484 topics := make ([]protocol.OffsetFetchTopicResponse , 0 , len (req .Topics ))
@@ -565,35 +558,35 @@ func (h *handler) Handle(ctx context.Context, header *protocol.RequestHeader, re
565558 return protocol .EncodeOffsetFetchResponse (resp , header .APIVersion )
566559 case * protocol.OffsetForLeaderEpochRequest :
567560 return h .withAdminMetrics (header .APIKey , func () ([]byte , error ) {
568- offsetReq := req .( * protocol. OffsetForLeaderEpochRequest )
561+ offsetReq := req
569562 if ! h .allowTopics (principal , topicsFromOffsetForLeaderEpoch (offsetReq ), acl .ActionFetch ) {
570563 return h .unauthorizedOffsetForLeaderEpoch (principal , header , offsetReq )
571564 }
572565 return h .handleOffsetForLeaderEpoch (ctx , header , offsetReq )
573566 })
574567 case * protocol.DescribeConfigsRequest :
575568 return h .withAdminMetrics (header .APIKey , func () ([]byte , error ) {
576- return h .handleDescribeConfigs (ctx , header , req .( * protocol. DescribeConfigsRequest ) )
569+ return h .handleDescribeConfigs (ctx , header , req )
577570 })
578571 case * protocol.AlterConfigsRequest :
579572 return h .withAdminMetrics (header .APIKey , func () ([]byte , error ) {
580- alterReq := req .( * protocol. AlterConfigsRequest )
573+ alterReq := req
581574 if ! h .allowAdmin (principal ) {
582575 return h .unauthorizedAlterConfigs (principal , header , alterReq )
583576 }
584577 return h .handleAlterConfigs (ctx , header , alterReq )
585578 })
586579 case * protocol.CreatePartitionsRequest :
587580 return h .withAdminMetrics (header .APIKey , func () ([]byte , error ) {
588- createReq := req .( * protocol. CreatePartitionsRequest )
581+ createReq := req
589582 if ! h .allowAdmin (principal ) {
590583 return h .unauthorizedCreatePartitions (principal , header , createReq )
591584 }
592585 return h .handleCreatePartitions (ctx , header , createReq )
593586 })
594587 case * protocol.DeleteGroupsRequest :
595588 return h .withAdminMetrics (header .APIKey , func () ([]byte , error ) {
596- deleteReq := req .( * protocol. DeleteGroupsRequest )
589+ deleteReq := req
597590 allowed := make ([]string , 0 , len (deleteReq .Groups ))
598591 denied := make (map [string ]struct {})
599592 for _ , groupID := range deleteReq .Groups {
@@ -653,19 +646,19 @@ func (h *handler) Handle(ctx context.Context, header *protocol.RequestHeader, re
653646 }, header .APIVersion )
654647 })
655648 case * protocol.CreateTopicsRequest :
656- createReq := req .( * protocol. CreateTopicsRequest )
649+ createReq := req
657650 if ! h .allowAdmin (principal ) {
658651 return h .unauthorizedCreateTopics (principal , header , createReq )
659652 }
660653 return h .handleCreateTopics (ctx , header , createReq )
661654 case * protocol.DeleteTopicsRequest :
662- deleteReq := req .( * protocol. DeleteTopicsRequest )
655+ deleteReq := req
663656 if ! h .allowAdmin (principal ) {
664657 return h .unauthorizedDeleteTopics (principal , header , deleteReq )
665658 }
666659 return h .handleDeleteTopics (ctx , header , deleteReq )
667660 case * protocol.ListOffsetsRequest :
668- listReq := req .( * protocol. ListOffsetsRequest )
661+ listReq := req
669662 if ! h .allowTopics (principal , topicsFromListOffsets (listReq ), acl .ActionFetch ) {
670663 return h .unauthorizedListOffsets (principal , header , listReq )
671664 }
@@ -713,32 +706,32 @@ func (h *handler) backpressureErrorCode() int16 {
713706func (h * handler ) metricsHandler (w http.ResponseWriter , r * http.Request ) {
714707 snap := h .s3Health .Snapshot ()
715708 w .Header ().Set ("Content-Type" , "text/plain; version=0.0.4" )
716- fmt .Fprintln (w , "# HELP kafscale_s3_health_state Current broker view of S3 health." )
717- fmt .Fprintln (w , "# TYPE kafscale_s3_health_state gauge" )
709+ _ , _ = fmt .Fprintln (w , "# HELP kafscale_s3_health_state Current broker view of S3 health." )
710+ _ , _ = fmt .Fprintln (w , "# TYPE kafscale_s3_health_state gauge" )
718711 for _ , state := range []broker.S3HealthState {broker .S3StateHealthy , broker .S3StateDegraded , broker .S3StateUnavailable } {
719712 value := 0
720713 if snap .State == state {
721714 value = 1
722715 }
723- fmt .Fprintf (w , "kafscale_s3_health_state{state=%q} %d\n " , state , value )
716+ _ , _ = fmt .Fprintf (w , "kafscale_s3_health_state{state=%q} %d\n " , state , value )
724717 }
725- fmt .Fprintln (w , "# HELP kafscale_s3_latency_ms_avg Average S3 latency in the sliding window." )
726- fmt .Fprintln (w , "# TYPE kafscale_s3_latency_ms_avg gauge" )
727- fmt .Fprintf (w , "kafscale_s3_latency_ms_avg %f\n " , float64 (snap .AvgLatency )/ float64 (time .Millisecond ))
728- fmt .Fprintln (w , "# HELP kafscale_s3_error_rate Fraction of S3 operations that failed in the sliding window." )
729- fmt .Fprintln (w , "# TYPE kafscale_s3_error_rate gauge" )
730- fmt .Fprintf (w , "kafscale_s3_error_rate %f\n " , snap .ErrorRate )
718+ _ , _ = fmt .Fprintln (w , "# HELP kafscale_s3_latency_ms_avg Average S3 latency in the sliding window." )
719+ _ , _ = fmt .Fprintln (w , "# TYPE kafscale_s3_latency_ms_avg gauge" )
720+ _ , _ = fmt .Fprintf (w , "kafscale_s3_latency_ms_avg %f\n " , float64 (snap .AvgLatency )/ float64 (time .Millisecond ))
721+ _ , _ = fmt .Fprintln (w , "# HELP kafscale_s3_error_rate Fraction of S3 operations that failed in the sliding window." )
722+ _ , _ = fmt .Fprintln (w , "# TYPE kafscale_s3_error_rate gauge" )
723+ _ , _ = fmt .Fprintf (w , "kafscale_s3_error_rate %f\n " , snap .ErrorRate )
731724 if ! snap .Since .IsZero () {
732- fmt .Fprintln (w , "# HELP kafscale_s3_state_duration_seconds Seconds spent in the current S3 state." )
733- fmt .Fprintln (w , "# TYPE kafscale_s3_state_duration_seconds gauge" )
734- fmt .Fprintf (w , "kafscale_s3_state_duration_seconds %f\n " , time .Since (snap .Since ).Seconds ())
735- }
736- fmt .Fprintln (w , "# HELP kafscale_produce_rps Broker ingest throughput measured over the sliding window." )
737- fmt .Fprintln (w , "# TYPE kafscale_produce_rps gauge" )
738- fmt .Fprintf (w , "kafscale_produce_rps %f\n " , h .produceRate .rate ())
739- fmt .Fprintln (w , "# HELP kafscale_fetch_rps Broker fetch throughput measured over the sliding window." )
740- fmt .Fprintln (w , "# TYPE kafscale_fetch_rps gauge" )
741- fmt .Fprintf (w , "kafscale_fetch_rps %f\n " , h .fetchRate .rate ())
725+ _ , _ = fmt .Fprintln (w , "# HELP kafscale_s3_state_duration_seconds Seconds spent in the current S3 state." )
726+ _ , _ = fmt .Fprintln (w , "# TYPE kafscale_s3_state_duration_seconds gauge" )
727+ _ , _ = fmt .Fprintf (w , "kafscale_s3_state_duration_seconds %f\n " , time .Since (snap .Since ).Seconds ())
728+ }
729+ _ , _ = fmt .Fprintln (w , "# HELP kafscale_produce_rps Broker ingest throughput measured over the sliding window." )
730+ _ , _ = fmt .Fprintln (w , "# TYPE kafscale_produce_rps gauge" )
731+ _ , _ = fmt .Fprintf (w , "kafscale_produce_rps %f\n " , h .produceRate .rate ())
732+ _ , _ = fmt .Fprintln (w , "# HELP kafscale_fetch_rps Broker fetch throughput measured over the sliding window." )
733+ _ , _ = fmt .Fprintln (w , "# TYPE kafscale_fetch_rps gauge" )
734+ _ , _ = fmt .Fprintf (w , "kafscale_fetch_rps %f\n " , h .fetchRate .rate ())
742735 if h .produceLatency != nil {
743736 h .produceLatency .WritePrometheus (w , "kafscale_produce_latency_ms" , "Produce request latency in milliseconds." )
744737 }
@@ -841,6 +834,7 @@ func (h *handler) allowGroup(principal string, group string, action acl.Action)
841834 return h .authorizer .Allows (principal , action , acl .ResourceGroup , group )
842835}
843836
837+ //nolint:unused // kept for future ACL enforcement
844838func (h * handler ) allowGroups (principal string , groups []string , action acl.Action ) bool {
845839 for _ , group := range groups {
846840 if ! h .allowGroup (principal , group , action ) {
@@ -955,6 +949,7 @@ func (h *handler) unauthorizedCreatePartitions(principal string, header *protoco
955949 }, header .APIVersion )
956950}
957951
952+ //nolint:unused // kept for future ACL enforcement
958953func (h * handler ) unauthorizedDeleteGroups (principal string , header * protocol.RequestHeader , req * protocol.DeleteGroupsRequest ) ([]byte , error ) {
959954 h .recordAuthzDeniedWithPrincipal (principal , acl .ActionGroupAdmin , acl .ResourceGroup , strings .Join (req .Groups , "," ))
960955 results := make ([]protocol.DeleteGroupsResponseGroup , 0 , len (req .Groups ))
@@ -1969,10 +1964,8 @@ func (h *handler) handleFetch(ctx context.Context, header *protocol.RequestHeade
19691964 }
19701965 if errors .Is (offsetErr , context .Canceled ) || errors .Is (offsetErr , context .DeadlineExceeded ) {
19711966 // Treat request timeouts as empty fetches, not server errors.
1972- offsetErr = nil
19731967 } else if ! h .etcdAvailable () {
19741968 nextOffset = part .FetchOffset
1975- offsetErr = nil
19761969 } else {
19771970 h .logger .Error ("fetch wait failed" , "topic" , topicName , "partition" , part .Partition , "error" , offsetErr , "etcd_available" , h .etcdAvailable ())
19781971 nextOffset = 0
@@ -2471,11 +2464,9 @@ func buildConnContextFunc(logger *slog.Logger) broker.ConnContextFunc {
24712464 conn = wrapped
24722465 }
24732466 proxyInfo = parsed
2474- if proxyInfo != nil {
2475- if ! proxyInfo .Local && proxyInfo .SourceAddr != "" {
2476- info .ProxyAddr = proxyInfo .SourceAddr
2477- info .RemoteAddr = proxyInfo .SourceAddr
2478- }
2467+ if ! proxyInfo .Local && proxyInfo .SourceAddr != "" {
2468+ info .ProxyAddr = proxyInfo .SourceAddr
2469+ info .RemoteAddr = proxyInfo .SourceAddr
24792470 }
24802471 }
24812472 if info .RemoteAddr == "" {
@@ -2631,15 +2622,15 @@ func startMetricsServer(ctx context.Context, addr string, h *handler, logger *sl
26312622 mux .HandleFunc ("/metrics" , h .metricsHandler )
26322623 mux .HandleFunc ("/healthz" , func (w http.ResponseWriter , r * http.Request ) {
26332624 w .Header ().Set ("Content-Type" , "text/plain; charset=utf-8" )
2634- fmt .Fprintf (w , "ok state=%s\n " , h .s3Health .State ())
2625+ _ , _ = fmt .Fprintf (w , "ok state=%s\n " , h .s3Health .State ())
26352626 })
26362627 mux .HandleFunc ("/readyz" , func (w http.ResponseWriter , r * http.Request ) {
26372628 w .Header ().Set ("Content-Type" , "text/plain; charset=utf-8" )
26382629 if ready , state := h .readiness (); ! ready {
26392630 w .WriteHeader (http .StatusServiceUnavailable )
2640- fmt .Fprintf (w , "not ready state=%s\n " , state )
2631+ _ , _ = fmt .Fprintf (w , "not ready state=%s\n " , state )
26412632 } else {
2642- fmt .Fprintf (w , "ready state=%s\n " , state )
2633+ _ , _ = fmt .Fprintf (w , "ready state=%s\n " , state )
26432634 }
26442635 })
26452636 srv := & http.Server {
0 commit comments