Skip to content

Commit ca7813c

Browse files
[-] fix race condition in PrometheusWriter, fixes #1194
1 parent 93644f6 commit ca7813c

2 files changed

Lines changed: 69 additions & 0 deletions

File tree

internal/sinks/prometheus.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,8 @@ func NewPrometheusWriter(ctx context.Context, connstr string) (promw *Prometheus
9999
}
100100

101101
func (promw *PrometheusWriter) DefineMetrics(metrics *metrics.Metrics) (err error) {
102+
promw.Lock()
103+
defer promw.Unlock()
102104
promw.gauges = make(map[string]([]string))
103105
for name, m := range metrics.MetricDefs {
104106
promw.gauges[name] = m.Gauges
@@ -202,7 +204,9 @@ func (promw *PrometheusWriter) MetricStoreMessageToPromMetrics(msg metrics.Measu
202204
return promMetrics
203205
}
204206

207+
promw.RLock()
205208
gauges := promw.gauges[msg.MetricName]
209+
promw.RUnlock()
206210

207211
epochTime = time.Unix(0, msg.Data.GetEpoch())
208212

internal/sinks/prometheus_race_test.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,3 +68,68 @@ func TestCollect_RaceCondition_Real(_ *testing.T) {
6868

6969
wg.Wait()
7070
}
71+
72+
func TestGaugesMap_RaceCondition(_ *testing.T) {
73+
// 1. Initialize PrometheusWriter
74+
promw, _ := NewPrometheusWriter(testutil.TestContext, "127.0.0.1:0/pgwatch")
75+
76+
// 2. Register a metric so Write() actually puts data into the map
77+
_ = promw.SyncMetric("race_db", "test_metric", AddOp)
78+
79+
// 3. Pre-fill cache so Collect has something to do
80+
_ = promw.Write(metrics.MeasurementEnvelope{
81+
DBName: "race_db",
82+
MetricName: "test_metric",
83+
Data: metrics.Measurements{
84+
{
85+
metrics.EpochColumnName: time.Now().UnixNano(),
86+
"value": int64(100),
87+
},
88+
},
89+
})
90+
91+
var wg sync.WaitGroup
92+
done := make(chan struct{})
93+
94+
// --- The Config Reloader (Simulating configuration updates) ---
95+
wg.Go(func() {
96+
for {
97+
select {
98+
case <-done:
99+
return
100+
default:
101+
// Call the REAL DefineMetrics method (Writes to gauges map)
102+
_ = promw.DefineMetrics(&metrics.Metrics{
103+
MetricDefs: metrics.MetricDefs{
104+
"test_metric": {Gauges: []string{"value"}},
105+
},
106+
})
107+
}
108+
}
109+
})
110+
111+
// --- The Collector (Simulating Prometheus Scrapes) ---
112+
wg.Go(func() {
113+
// Prometheus provides a channel to receive metrics
114+
ch := make(chan prometheus.Metric, 10000)
115+
116+
// Scrape 50 times (more than enough to trigger a race in a tight loop)
117+
for range 50 {
118+
// Call the REAL Collect method (Reads from gauges map)
119+
promw.Collect(ch)
120+
121+
// Drain the channel so it doesn't block
122+
drainLoop:
123+
for {
124+
select {
125+
case <-ch:
126+
default:
127+
break drainLoop
128+
}
129+
}
130+
}
131+
close(done) // Tell the reloader to stop
132+
})
133+
134+
wg.Wait()
135+
}

0 commit comments

Comments
 (0)