Skip to content

Commit d583564

Browse files
[*] move cache to Prometheus sink struct, closes #1176 (#1177)
* Refactor Prometheus writer to have cache in struct instead of global * move type definition to top and anonymize embedded RWMutex * rename fields --------- Co-authored-by: Pavlo Golub <pavlo.golub@gmail.com>
1 parent 05cb931 commit d583564

2 files changed

Lines changed: 39 additions & 37 deletions

File tree

internal/sinks/prometheus.go

Lines changed: 39 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,20 @@ import (
1818
"github.com/prometheus/client_golang/prometheus/promhttp"
1919
)
2020

21+
type PromMetricCache = map[string]map[string]metrics.MeasurementEnvelope // [dbUnique][metric]lastly_fetched_data
22+
2123
// PrometheusWriter is a sink that allows to expose metric measurements to Prometheus scrapper.
2224
// Prometheus collects metrics data from pgwatch by scraping metrics HTTP endpoints.
2325
type PrometheusWriter struct {
24-
logger log.Logger
25-
ctx context.Context
26-
lastScrapeErrors prometheus.Gauge
27-
totalScrapes, totalScrapeFailures prometheus.Counter
28-
PrometheusNamespace string
29-
gauges map[string]([]string) // map of metric names to their gauge names, used for Prometheus gauge metrics
26+
sync.RWMutex
27+
logger log.Logger
28+
ctx context.Context
29+
lastScrapeErrors prometheus.Gauge
30+
totalScrapes prometheus.Counter
31+
totalScrapeFailures prometheus.Counter
32+
gauges map[string]([]string) // map of metric names to their gauge names, used for Prometheus gauge metrics
33+
Namespace string
34+
Cache PromMetricCache // [dbUnique][metric]lastly_fetched_data
3035
}
3136

3237
const promInstanceUpStateMetric = "instance_up"
@@ -46,9 +51,10 @@ func NewPrometheusWriter(ctx context.Context, connstr string) (promw *Prometheus
4651
l := log.GetLogger(ctx).WithField("sink", "prometheus").WithField("address", addr)
4752
ctx = log.WithLogger(ctx, l)
4853
promw = &PrometheusWriter{
49-
ctx: ctx,
50-
logger: l,
51-
PrometheusNamespace: namespace,
54+
ctx: ctx,
55+
logger: l,
56+
Namespace: namespace,
57+
Cache: make(PromMetricCache),
5258
lastScrapeErrors: prometheus.NewGauge(prometheus.GaugeOpts{
5359
Namespace: namespace,
5460
Name: "exporter_last_scrape_errors",
@@ -108,36 +114,30 @@ func (promw *PrometheusWriter) Write(msg metrics.MeasurementEnvelope) error {
108114
return nil
109115
}
110116

111-
type PromMetricCache = map[string]map[string]metrics.MeasurementEnvelope // [dbUnique][metric]lastly_fetched_data
112-
113-
// Async Prom cache
114-
var promAsyncMetricCache = make(PromMetricCache)
115-
var promAsyncMetricCacheLock = sync.RWMutex{}
116-
117117
func (promw *PrometheusWriter) PromAsyncCacheAddMetricData(dbUnique, metric string, msgArr metrics.MeasurementEnvelope) { // cache structure: [dbUnique][metric]lastly_fetched_data
118-
promAsyncMetricCacheLock.Lock()
119-
defer promAsyncMetricCacheLock.Unlock()
120-
if _, ok := promAsyncMetricCache[dbUnique]; ok {
121-
promAsyncMetricCache[dbUnique][metric] = msgArr
118+
promw.Lock()
119+
defer promw.Unlock()
120+
if _, ok := promw.Cache[dbUnique]; ok {
121+
promw.Cache[dbUnique][metric] = msgArr
122122
}
123123
}
124124

125125
func (promw *PrometheusWriter) PromAsyncCacheInitIfRequired(dbUnique, _ string) { // cache structure: [dbUnique][metric]lastly_fetched_data
126-
promAsyncMetricCacheLock.Lock()
127-
defer promAsyncMetricCacheLock.Unlock()
128-
if _, ok := promAsyncMetricCache[dbUnique]; !ok {
129-
promAsyncMetricCache[dbUnique] = make(map[string]metrics.MeasurementEnvelope)
126+
promw.Lock()
127+
defer promw.Unlock()
128+
if _, ok := promw.Cache[dbUnique]; !ok {
129+
promw.Cache[dbUnique] = make(map[string]metrics.MeasurementEnvelope)
130130
}
131131
}
132132

133133
func (promw *PrometheusWriter) PurgeMetricsFromPromAsyncCacheIfAny(dbUnique, metric string) {
134-
promAsyncMetricCacheLock.Lock()
135-
defer promAsyncMetricCacheLock.Unlock()
134+
promw.Lock()
135+
defer promw.Unlock()
136136

137137
if metric == "" {
138-
delete(promAsyncMetricCache, dbUnique) // whole host removed from config
138+
delete(promw.Cache, dbUnique) // whole host removed from config
139139
} else {
140-
delete(promAsyncMetricCache[dbUnique], metric)
140+
delete(promw.Cache[dbUnique], metric)
141141
}
142142
}
143143

@@ -160,18 +160,21 @@ func (promw *PrometheusWriter) Collect(ch chan<- prometheus.Metric) {
160160
promw.totalScrapes.Add(1)
161161
ch <- promw.totalScrapes
162162

163-
promAsyncMetricCacheLock.Lock()
164-
if len(promAsyncMetricCache) == 0 {
165-
promAsyncMetricCacheLock.Unlock()
163+
promw.Lock()
164+
if len(promw.Cache) == 0 {
165+
promw.Unlock()
166166
promw.logger.Warning("No dbs configured for monitoring. Check config")
167167
ch <- promw.totalScrapeFailures
168168
promw.lastScrapeErrors.Set(0)
169169
ch <- promw.lastScrapeErrors
170170
return
171171
}
172-
snapshot := promAsyncMetricCache
173-
promAsyncMetricCache = make(PromMetricCache, len(snapshot))
174-
promAsyncMetricCacheLock.Unlock()
172+
snapshot := promw.Cache
173+
promw.Cache = make(PromMetricCache, len(snapshot))
174+
for dbUnique := range snapshot {
175+
promw.Cache[dbUnique] = make(map[string]metrics.MeasurementEnvelope)
176+
}
177+
promw.Unlock()
175178

176179
t1 := time.Now()
177180
for _, metricsMessages := range snapshot {
@@ -258,12 +261,12 @@ func (promw *PrometheusWriter) MetricStoreMessageToPromMetrics(msg metrics.Measu
258261
fieldPromDataType = prometheus.GaugeValue
259262
}
260263
var desc *prometheus.Desc
261-
if promw.PrometheusNamespace != "" {
264+
if promw.Namespace != "" {
262265
if msg.MetricName == promInstanceUpStateMetric { // handle the special "instance_up" check
263-
desc = prometheus.NewDesc(fmt.Sprintf("%s_%s", promw.PrometheusNamespace, msg.MetricName),
266+
desc = prometheus.NewDesc(fmt.Sprintf("%s_%s", promw.Namespace, msg.MetricName),
264267
msg.MetricName, labelKeys, nil)
265268
} else {
266-
desc = prometheus.NewDesc(fmt.Sprintf("%s_%s_%s", promw.PrometheusNamespace, msg.MetricName, field),
269+
desc = prometheus.NewDesc(fmt.Sprintf("%s_%s_%s", promw.Namespace, msg.MetricName, field),
267270
msg.MetricName, labelKeys, nil)
268271
}
269272
} else {

internal/sinks/prometheus_race_test.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ import (
1212

1313
func TestCollect_RaceCondition_Real(_ *testing.T) {
1414
// 1. Initialize the real PrometheusWriter
15-
// Note: In the current buggy code, this shares the global 'promAsyncMetricCache'
1615
promw, _ := NewPrometheusWriter(testutil.TestContext, "127.0.0.1:0/pgwatch")
1716

1817
// 2. Register a metric so Write() actually puts data into the map

0 commit comments

Comments
 (0)