Skip to content

Commit 4256426

Browse files
committed
fix: node_status exclusion check in SourceReaper.Run() for special metrics
Move the `isRoleExcluded` check earlier in the metric fetch loop to apply consistently to all metric types, including special metrics like `instance_up` and `change_events`. Previously, special metrics weren't excluded if they have a required `node_status` different than the server's
1 parent 7263ecb commit 4256426

3 files changed

Lines changed: 115 additions & 71 deletions

File tree

internal/reaper/source_reaper.go

Lines changed: 9 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,12 @@ func (sr *SourceReaper) Run(ctx context.Context) {
145145
if lf := sr.lastFetch[name]; !lf.IsZero() && now.Sub(lf) < interval {
146146
continue
147147
}
148+
149+
metric, ok := metricDefs.GetMetricDef(name)
150+
if !ok || sr.isRoleExcluded(metric) {
151+
continue
152+
}
153+
148154
switch {
149155
case name == specialMetricServerLogEventCounts:
150156
if sr.lastFetch[name].IsZero() {
@@ -157,16 +163,8 @@ func (sr *SourceReaper) Run(ctx context.Context) {
157163
case IsDirectlyFetchableMetric(sr.md, name):
158164
err = sr.fetchOSMetric(ctx, name)
159165
case name == specialMetricChangeEvents || name == specialMetricInstanceUp:
160-
err = sr.fetchSpecialMetric(ctx, name)
166+
err = sr.fetchSpecialMetric(ctx, name, metric.StorageName)
161167
default:
162-
metric, ok := metricDefs.GetMetricDef(name)
163-
if !ok {
164-
l.WithField("metric", name).Warning("metric definition not found")
165-
continue
166-
}
167-
if sr.isRoleExcluded(metric) {
168-
continue
169-
}
170168
if cached := sr.reaper.GetMeasurementCache(sr.cacheKey(metric, name)); len(cached) > 0 {
171169
l.WithField("metric", name).Info("instance level cache hit")
172170
sr.sendEnvelope(ctx, name, metric.StorageName, cached)
@@ -264,14 +262,7 @@ func (sr *SourceReaper) fetchOSMetric(ctx context.Context, name string) error {
264262
}
265263

266264
// fetchSpecialMetric handles change_events and instance_up metrics.
267-
func (sr *SourceReaper) fetchSpecialMetric(ctx context.Context, name string) error {
268-
metric, ok := metricDefs.GetMetricDef(name)
269-
if !ok {
270-
return fmt.Errorf("metric definition not found for %s", name)
271-
}
272-
if sr.isRoleExcluded(metric) {
273-
return nil
274-
}
265+
func (sr *SourceReaper) fetchSpecialMetric(ctx context.Context, name, storageName string) error {
275266
var (
276267
data metrics.Measurements
277268
err error
@@ -286,7 +277,7 @@ func (sr *SourceReaper) fetchSpecialMetric(ctx context.Context, name string) err
286277
return fmt.Errorf("failed to fetch special metric: %v", err)
287278
}
288279
if len(data) > 0 {
289-
sr.sendEnvelope(ctx, name, metric.StorageName, data)
280+
sr.sendEnvelope(ctx, name, storageName, data)
290281
}
291282
return err
292283
}

internal/reaper/source_reaper_integration_test.go

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,3 +180,103 @@ func TestIntegration_SourceReaper_RunCollectsMetrics(t *testing.T) {
180180
assert.Equal(t, "integration_test", sMsg.DBName)
181181
assert.NotEmpty(t, sMsg.Data)
182182
}
183+
184+
func TestIntegration_SourceReaper_RunExcludesMetricsByNodeStatus(t *testing.T) {
185+
md, tearDown := setupIntegrationDB(t)
186+
defer tearDown()
187+
188+
helperSetNodeStatus := func(status string) {
189+
metricDefs.MetricDefs["test_metric"] = metrics.Metric{
190+
SQLs: metrics.SQLs{0: "SELECT 1 AS value"},
191+
NodeStatus: status,
192+
}
193+
metricDefs.MetricDefs["server_log_event_counts"] = metrics.Metric{
194+
SQLs: metrics.SQLs{0: "SELECT 1 AS value"},
195+
NodeStatus: status,
196+
}
197+
metricDefs.MetricDefs["psutil_cpu"] = metrics.Metric{
198+
SQLs: metrics.SQLs{0: "SELECT 1 AS value"},
199+
NodeStatus: status,
200+
}
201+
metricDefs.MetricDefs[specialMetricInstanceUp] = metrics.Metric{
202+
SQLs: metrics.SQLs{0: "SELECT 1 AS value"},
203+
NodeStatus: status,
204+
}
205+
}
206+
207+
r := &Reaper{
208+
Options: &cmdopts.Options{
209+
Metrics: metrics.CmdOpts{},
210+
Sinks: sinks.CmdOpts{},
211+
},
212+
measurementCh: make(chan metrics.MeasurementEnvelope, 10),
213+
measurementCache: NewInstanceMetricCache(),
214+
}
215+
216+
// using psutil_*, server_log_event_counts, instance_up
217+
// to ensure specially-handled metrics have the same behaviour
218+
md.Metrics = metrics.MetricIntervals{
219+
"test_metric": 5,
220+
"server_log_event_counts": 5,
221+
"psutil_cpu": 5,
222+
specialMetricInstanceUp: 5,
223+
}
224+
225+
t.Run("primary-only/standby-only metrics get excluded when node is standby/primary", func(t *testing.T) {
226+
states := []string{"primary", "standby"}
227+
for _, state := range states {
228+
ctx, cancel := context.WithCancel(log.WithLogger(context.Background(), log.NewNoopLogger()))
229+
230+
md.IsInRecovery = true
231+
if state == "standby" {
232+
md.IsInRecovery = false
233+
}
234+
235+
helperSetNodeStatus(state)
236+
237+
sr := NewSourceReaper(r, md)
238+
go func() {
239+
sr.Run(ctx)
240+
}()
241+
242+
select {
243+
case msg := <-r.measurementCh:
244+
t.Errorf("Expected no measurement for primary-only metrics on standby, but got: %s", msg.MetricName)
245+
case <-time.After(2 * time.Second):
246+
}
247+
248+
cancel()
249+
}
250+
})
251+
252+
t.Run("primary-only/standby-only metrics get executed when node is primary/standby", func(t *testing.T) {
253+
states := []string{"primary", "standby", ""} // "" => should fetch all as well
254+
for _, state := range states {
255+
ctx, cancel := context.WithCancel(log.WithLogger(context.Background(), log.NewNoopLogger()))
256+
257+
md.IsInRecovery = false
258+
if state == "standby" {
259+
md.IsInRecovery = true
260+
}
261+
262+
helperSetNodeStatus(state)
263+
264+
sr := NewSourceReaper(r, md)
265+
go func() {
266+
sr.Run(ctx)
267+
}()
268+
269+
time.Sleep(2 * time.Second)
270+
assert.GreaterOrEqual(t, len(r.measurementCh), 3)
271+
cancel()
272+
273+
for range len(r.measurementCh) {
274+
// empty channel to ensure correctness in subsequent runs
275+
select {
276+
case <-r.measurementCh:
277+
default:
278+
}
279+
}
280+
}
281+
})
282+
}

internal/reaper/source_reaper_test.go

Lines changed: 6 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -307,52 +307,12 @@ func TestSourceReaper_FetchSpecialMetric(t *testing.T) {
307307
return NewSourceReaper(r, md), md, mock
308308
}
309309

310-
t.Run("metric not found in definitions", func(t *testing.T) {
311-
sr, _, mock := newSR(t)
312-
defer mock.Close()
313-
assert.Error(t, sr.fetchSpecialMetric(ctx, "no_such_special_xyz"))
314-
select {
315-
case <-sr.reaper.measurementCh:
316-
t.Error("expected no measurement")
317-
default:
318-
}
319-
assert.NoError(t, mock.ExpectationsWereMet())
320-
})
321-
322-
t.Run("primary-only metric skipped on standby", func(t *testing.T) {
323-
sr, md, mock := newSR(t)
324-
defer mock.Close()
325-
metricDefs.MetricDefs["sp_primary_only"] = metrics.Metric{NodeStatus: "primary"}
326-
md.IsInRecovery = true
327-
assert.NoError(t, sr.fetchSpecialMetric(ctx, "sp_primary_only"))
328-
select {
329-
case <-sr.reaper.measurementCh:
330-
t.Error("expected no measurement for primary-only on standby")
331-
default:
332-
}
333-
assert.NoError(t, mock.ExpectationsWereMet())
334-
})
335-
336-
t.Run("standby-only metric skipped on primary", func(t *testing.T) {
337-
sr, md, mock := newSR(t)
338-
defer mock.Close()
339-
metricDefs.MetricDefs["sp_standby_only"] = metrics.Metric{NodeStatus: "standby"}
340-
md.IsInRecovery = false
341-
assert.NoError(t, sr.fetchSpecialMetric(ctx, "sp_standby_only"))
342-
select {
343-
case <-sr.reaper.measurementCh:
344-
t.Error("expected no measurement for standby-only on primary")
345-
default:
346-
}
347-
assert.NoError(t, mock.ExpectationsWereMet())
348-
})
310+
sr, _, mock := newSR(t)
311+
defer mock.Close()
349312

350313
t.Run("instance_up dispatches measurement on ping success", func(t *testing.T) {
351-
sr, _, mock := newSR(t)
352-
defer mock.Close()
353-
metricDefs.MetricDefs[specialMetricInstanceUp] = metrics.Metric{}
354314
mock.ExpectPing()
355-
assert.NoError(t, sr.fetchSpecialMetric(ctx, specialMetricInstanceUp))
315+
assert.NoError(t, sr.fetchSpecialMetric(ctx, specialMetricInstanceUp, ""))
356316
select {
357317
case msg := <-sr.reaper.measurementCh:
358318
assert.Equal(t, specialMetricInstanceUp, msg.MetricName)
@@ -365,11 +325,8 @@ func TestSourceReaper_FetchSpecialMetric(t *testing.T) {
365325
})
366326

367327
t.Run("instance_up uses storage name when set", func(t *testing.T) {
368-
sr, _, mock := newSR(t)
369-
defer mock.Close()
370-
metricDefs.MetricDefs[specialMetricInstanceUp] = metrics.Metric{StorageName: "infra_up"}
371328
mock.ExpectPing()
372-
assert.NoError(t, sr.fetchSpecialMetric(ctx, specialMetricInstanceUp))
329+
assert.NoError(t, sr.fetchSpecialMetric(ctx, specialMetricInstanceUp, "infra_up"))
373330
select {
374331
case msg := <-sr.reaper.measurementCh:
375332
assert.Equal(t, "infra_up", msg.MetricName)
@@ -380,13 +337,9 @@ func TestSourceReaper_FetchSpecialMetric(t *testing.T) {
380337
})
381338

382339
t.Run("change_events dispatches no measurement when no hash defs present", func(t *testing.T) {
383-
sr, _, mock := newSR(t)
384-
defer mock.Close()
340+
// Doesn't contain additional defs for any of {"sproc_hashes", "table_hashes", "index_hashes", "configuration_hashes", "privilege_hashes"}
385341
metricDefs.MetricDefs[specialMetricChangeEvents] = metrics.Metric{}
386-
for _, name := range []string{"sproc_hashes", "table_hashes", "index_hashes", "configuration_hashes", "privilege_hashes"} {
387-
delete(metricDefs.MetricDefs, name)
388-
}
389-
assert.NoError(t, sr.fetchSpecialMetric(ctx, specialMetricChangeEvents))
342+
assert.NoError(t, sr.fetchSpecialMetric(ctx, specialMetricChangeEvents, ""))
390343
select {
391344
case <-sr.reaper.measurementCh:
392345
t.Error("expected no measurement when no changes detected")

0 commit comments

Comments
 (0)