@@ -1495,13 +1495,15 @@ func EORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
14951495 Error : err .Error (),
14961496 })
14971497
1498+ addFunctionResult (& wholeMetric , "timeout" )
14981499 break
14991500 }
15001501 dcsEvent , err = stream .Recv ()
15011502 if errors .Is (err , io .EOF ) { // correct stream termination
15021503 log .Debug ("DCS EOR event stream was closed from the DCS side (EOF)" )
15031504 err = nil
15041505
1506+ addFunctionResult (& wholeMetric , "ok" )
15051507 break // no more data
15061508 }
15071509 if errors .Is (err , context .DeadlineExceeded ) {
@@ -1521,6 +1523,7 @@ func EORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
15211523 Error : err .Error (),
15221524 })
15231525
1526+ addFunctionResult (& wholeMetric , "timeout" )
15241527 break
15251528 }
15261529 if err != nil { // stream termination in case of unknown or gRPC error
@@ -1542,6 +1545,7 @@ func EORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
15421545 Payload : string (payloadJsonForKafka [:]),
15431546 Error : err .Error (),
15441547 })
1548+ addFunctionResult (& wholeMetric , "gRPC_timeout" )
15451549
15461550 } else if got == codes .Unknown { // unknown error, likely not a gRPC code
15471551 logMsg := "bad DCS EOR event received, any future DCS events are ignored"
@@ -1558,6 +1562,7 @@ func EORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
15581562 Payload : string (payloadJsonForKafka [:]),
15591563 Error : logMsg ,
15601564 })
1565+ addFunctionResult (& wholeMetric , "gRPC_unknown" )
15611566 } else { // some other gRPC error code
15621567 log .WithError (err ).
15631568 Debug ("DCS EOR call error" )
@@ -1573,6 +1578,7 @@ func EORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
15731578 Payload : string (payloadJsonForKafka [:]),
15741579 Error : err .Error (),
15751580 })
1581+ addFunctionResult (& wholeMetric , "gRPC_error" )
15761582 }
15771583
15781584 break
@@ -1736,13 +1742,15 @@ func SORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
17361742 Error : err .Error (),
17371743 })
17381744
1745+ addFunctionResult (& wholeMetric , "timeout" )
17391746 break
17401747 }
17411748 dcsEvent , err = stream .Recv ()
17421749 if errors .Is (err , io .EOF ) { // correct stream termination
17431750 log .Debug ("DCS SOR event stream was closed from the DCS side (EOF)" )
17441751 err = nil
17451752
1753+ addFunctionResult (& wholeMetric , "ok" )
17461754 break // no more data
17471755 }
17481756 if errors .Is (err , context .DeadlineExceeded ) {
@@ -1762,6 +1770,7 @@ func SORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
17621770 Error : err .Error (),
17631771 })
17641772
1773+ addFunctionResult (& wholeMetric , "timeout" )
17651774 break
17661775 }
17671776 if err != nil { // stream termination in case of unknown or gRPC error
@@ -1784,6 +1793,7 @@ func SORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
17841793 Error : err .Error (),
17851794 })
17861795
1796+ addFunctionResult (& wholeMetric , "gRPC_timeout" )
17871797 } else if got == codes .Unknown { // unknown error, likely not a gRPC code
17881798 logMsg := "bad DCS SOR event received, any future DCS events are ignored"
17891799 log .WithError (err ).
@@ -1799,6 +1809,7 @@ func SORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
17991809 Payload : string (payloadJsonForKafka [:]),
18001810 Error : logMsg ,
18011811 })
1812+ addFunctionResult (& wholeMetric , "gRPC_unknown" )
18021813 } else { // some other gRPC error code
18031814 log .WithError (err ).
18041815 Debug ("DCS SOR call error" )
@@ -1814,6 +1825,7 @@ func SORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
18141825 Payload : string (payloadJsonForKafka [:]),
18151826 Error : err .Error (),
18161827 })
1828+ addFunctionResult (& wholeMetric , "gRPC_error" )
18171829 }
18181830
18191831 break
@@ -2002,6 +2014,10 @@ func convertAndSendDetectorDurationsAndStates(method string, detectorStatusMap m
20022014 }
20032015}
20042016
2017+ func addFunctionResult (metric * monitoring.Metric , result string ) {
2018+ metric .AddTag ("result" , result )
2019+ }
2020+
20052021func PFRgRPCCommunicationLoop (ctx context.Context , timeout time.Duration , call * callable.Call , envId string ,
20062022 payloadJsonForKafka []byte , stream dcspb.Configurator_StartOfRunClient , detectorStatusMap map [dcspb.Detector ]dcspb.DetectorState ,
20072023 callFailedStr string , payload map [string ]interface {}, runType string ,
@@ -2032,13 +2048,15 @@ func PFRgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
20322048 Error : err .Error (),
20332049 })
20342050
2051+ addFunctionResult (& wholeMetric , "timeout" )
20352052 break
20362053 }
20372054 dcsEvent , err = stream .Recv ()
20382055 if errors .Is (err , io .EOF ) { // correct stream termination
20392056 log .Debug ("DCS PFR event stream was closed from the DCS side (EOF)" )
20402057 err = nil
20412058
2059+ addFunctionResult (& wholeMetric , "ok" )
20422060 break // no more data
20432061 }
20442062 if errors .Is (err , context .DeadlineExceeded ) {
@@ -2058,6 +2076,7 @@ func PFRgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
20582076 Error : err .Error (),
20592077 })
20602078
2079+ addFunctionResult (& wholeMetric , "timeout" )
20612080 break
20622081 }
20632082 if err != nil { // stream termination in case of unknown or gRPC error
@@ -2079,6 +2098,7 @@ func PFRgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
20792098 Payload : string (payloadJsonForKafka [:]),
20802099 Error : err .Error (),
20812100 })
2101+ addFunctionResult (& wholeMetric , "gRPC_timeout" )
20822102 } else if got == codes .Unknown { // unknown error, likely not a gRPC code
20832103 logMsg := "bad DCS PFR event received, any future DCS events are ignored"
20842104 log .WithError (err ).
@@ -2095,6 +2115,7 @@ func PFRgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
20952115 Payload : string (payloadJsonForKafka [:]),
20962116 Error : logMsg ,
20972117 })
2118+ addFunctionResult (& wholeMetric , "gRPC_unknown" )
20982119 } else { // some other gRPC error code
20992120 log .WithError (err ).
21002121 Error ("DCS PFR call error" )
@@ -2110,6 +2131,7 @@ func PFRgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
21102131 Payload : string (payloadJsonForKafka [:]),
21112132 Error : err .Error (),
21122133 })
2134+ addFunctionResult (& wholeMetric , "gRPC_error" )
21132135 }
21142136
21152137 break
0 commit comments