Skip to content

Commit 5c19aba

Browse files
committed
Expose more metrics
Expose "lag as seen by konsumerator" metric. Expose number of samples received from prometheus for production/consumption/offsets to know if some timeseries are missing.
1 parent 2212918 commit 5c19aba

2 files changed

Lines changed: 27 additions & 4 deletions

File tree

pkg/providers/metrics.go

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,18 +67,36 @@ var (
6767
},
6868
[]string{"consumer", "addr"},
6969
)
70+
lagObserved = prometheus.NewGaugeVec(
71+
prometheus.GaugeOpts{
72+
Namespace: namespace,
73+
Subsystem: subsystem,
74+
Name: "lag_observed_seconds",
75+
Help: "Lag in seconds as seen by konsumerator (uses production rate and messages behind",
76+
},
77+
[]string{"consumer", "partition"},
78+
)
79+
samplesReceivedTotal = prometheus.NewGaugeVec(
80+
prometheus.GaugeOpts{
81+
Namespace: namespace,
82+
Subsystem: subsystem,
83+
Name: "samples_received_total",
84+
Help: "Number of timeseries samples received from the prometheus",
85+
},
86+
[]string{"consumer", "type"},
87+
)
7088
zeroValuesTotal = prometheus.NewCounterVec(
7189
prometheus.CounterOpts{
7290
Namespace: namespace,
7391
Subsystem: subsystem,
7492
Name: "zero_values_total",
7593
Help: "Total number of estimation queries that returned 0 - NO DATA(metrics are missing, not even a cache)",
7694
},
77-
[]string{"consumer", "partitions", "type"},
95+
[]string{"consumer", "partition", "type"},
7896
)
7997
)
8098

8199
func initMetrics() {
82100
metrics.Registry.MustRegister(requestsTotal, requestErrors, requestDuration,
83-
subRequestTotal, subRequestErrors, subRequestDuration, zeroValuesTotal)
101+
subRequestTotal, subRequestErrors, subRequestDuration, lagObserved, zeroValuesTotal)
84102
}

pkg/providers/prometheus.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,8 +127,10 @@ func (l *PrometheusMP) GetLagByPartition(partition int32) time.Duration {
127127
if production == 0 {
128128
return 0
129129
}
130-
lag := float64(behind) / float64(production)
131-
return time.Duration(lag) * time.Second
130+
lagM := float64(behind) / float64(production)
131+
lagT := time.Duration(lagM) * time.Second
132+
lagObserved.WithLabelValues(l.consumer, strconv.Itoa(int(partition))).Set(lagT.Seconds())
133+
return lagT
132134
}
133135

134136
// Update updates metrics values by querying Prometheus
@@ -233,6 +235,7 @@ func (l *PrometheusMP) queryOffset() (metricsMap, error) {
233235
return nil, errors.New("failed to get offset metrics from prometheus")
234236
}
235237
metrics := l.parse(value, l.offsetPartitionLabel)
238+
samplesReceivedTotal.WithLabelValues(l.consumer, labelOffset).Set(float64(len(metrics)))
236239
return metrics, nil
237240
}
238241

@@ -250,6 +253,7 @@ func (l *PrometheusMP) queryProductionRate() (metricsMap, error) {
250253
return nil, errors.New("failed to get production metrics from prometheus")
251254
}
252255
metrics := l.parse(value, l.productionPartitionLabel)
256+
samplesReceivedTotal.WithLabelValues(l.consumer, labelProduction).Set(float64(len(metrics)))
253257
return metrics, nil
254258
}
255259

@@ -266,6 +270,7 @@ func (l *PrometheusMP) queryConsumptionRate() (metricsMap, error) {
266270
return nil, errors.New("failed to get consumption metrics from prometheus")
267271
}
268272
metrics := l.parse(value, l.consumptionPartitionLabel)
273+
samplesReceivedTotal.WithLabelValues(l.consumer, labelConsumption).Set(float64(len(metrics)))
269274
return metrics, nil
270275
}
271276

0 commit comments

Comments
 (0)