Skip to content

[BUG] Ingester: panic 'send on closed channel' in ActiveQueriedSeriesService — Stop races concurrent UpdateSeriesBatch senders #7531

@sandy2008

Description

@sandy2008

Describe the bug

ActiveQueriedSeriesService.stopping() (pkg/ingester/active_queried_series.go:424-430) closes m.updateChan while concurrent callers of ActiveQueriedSeriesService.UpdateSeriesBatch (pkg/ingester/active_queried_series.go:455-476) can still be in the middle of sending to it. The non-blocking send uses select { case m.updateChan <- ... : default: }, which avoids blocking but does NOT protect against panicking when the channel is already closed — sends on a closed channel always panic, regardless of select.

Relevant code (current master):

// stopping waits for all worker goroutines to finish.
func (m *ActiveQueriedSeriesService) stopping(_ error) error {
    // Close the channel to signal workers to stop
    close(m.updateChan)              // pkg/ingester/active_queried_series.go:426
    // Wait for all workers to finish
    m.workers.Wait()
    return nil
}
// UpdateSeriesBatch sends an update to the update channel for processing.
// This method is non-blocking and will drop updates if the channel is full.
func (m *ActiveQueriedSeriesService) UpdateSeriesBatch(activeQueriedSeries *ActiveQueriedSeries, hashes []uint64, now time.Time, userID string) {
    if len(hashes) == 0 {
        return
    }
    // Non-blocking send to centralized update channel
    select {
    case m.updateChan <- activeQueriedSeriesUpdate{...}:  // pkg/ingester/active_queried_series.go:462
        // queued
    default:
        // dropped
    }
}

When the ingester service stops while a query path is still draining (calling UpdateSeriesBatch from a worker), the close + send race produces:

panic: send on closed channel

goroutine N [running]:
github.com/cortexproject/cortex/pkg/ingester.(*ActiveQueriedSeriesService).UpdateSeriesBatch(...)
        pkg/ingester/active_queried_series.go:462

Crash-on-shutdown bug; severity is operational (graceful shutdown can panic) and observable if any in-flight query goroutine outlives the moment of close().

To Reproduce

The race is timing-sensitive. Easiest reproduction is a focused unit test that runs many concurrent senders and stops the service while they are still sending. Drop the following into pkg/ingester/active_queried_series_test.go (or any test file in the package):

func TestActiveQueriedSeriesService_StopRaceWithConcurrentSenders(t *testing.T) {
    svc := NewActiveQueriedSeriesService(log.NewNopLogger(), prometheus.NewRegistry())
    require.NoError(t, services.StartAndAwaitRunning(context.Background(), svc))

    aqs := NewActiveQueriedSeries([]time.Duration{time.Minute}, time.Second, 1.0, log.NewNopLogger())

    var wg sync.WaitGroup
    stop := make(chan struct{})
    for i := 0; i < 64; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for {
                select {
                case <-stop:
                    return
                default:
                }
                // Repeatedly send; this is the call site that races with close().
                svc.UpdateSeriesBatch(aqs, []uint64{1, 2, 3, 4, 5}, time.Now(), \"user\")
            }
        }()
    }

    // Give senders a moment to fill the channel; the race window opens once
    // stopping() reaches close(m.updateChan).
    time.Sleep(10 * time.Millisecond)

    // This invokes stopping(), which closes updateChan while goroutines above
    // are still inside the select-send. Sends to a closed channel panic
    // regardless of the default branch.
    require.NoError(t, services.StopAndAwaitTerminated(context.Background(), svc))

    close(stop)
    wg.Wait()
}

Run it (under -race it tends to panic faster, but it can panic without -race too):

go test -race -tags \"netgo slicelabels\" -run TestActiveQueriedSeriesService_StopRaceWithConcurrentSenders ./pkg/ingester/... -count=50 -timeout 120s

Within a handful of iterations you should see:

--- FAIL: TestActiveQueriedSeriesService_StopRaceWithConcurrentSenders (...)
panic: send on closed channel [recovered]
    ...
goroutine ... [running]:
github.com/cortexproject/cortex/pkg/ingester.(*ActiveQueriedSeriesService).UpdateSeriesBatch(...)
        pkg/ingester/active_queried_series.go:462

Alternative production reproduction:

  1. Start an ingester with active queried series enabled.
  2. Drive sustained query load that triggers UpdateSeriesBatch calls.
  3. Send SIGTERM while load is in flight. With sufficient concurrency, the ingester panics on shutdown rather than terminating cleanly.

Expected behavior

Stop() should never panic, even with concurrent in-flight callers of UpdateSeriesBatch. Either the producer must check a "closed" signal before sending, or the channel must be drained / closed safely (e.g., via a sync.Once on close + an atomic.Bool flag the producer checks under a read lock, or a context whose cancellation gates sends).

Suggested fix direction

Two common patterns:

  1. atomic.Bool + producer check — set a closed flag before close(m.updateChan); producers check the flag first and skip the send:

    func (m *ActiveQueriedSeriesService) UpdateSeriesBatch(...) {
        if m.closed.Load() {
            putQueriedSeriesHashesSlice(hashes)
            return
        }
        select { case m.updateChan <- ...: default: ... }
    }
    func (m *ActiveQueriedSeriesService) stopping(_ error) error {
        m.closed.Store(true)
        close(m.updateChan)
        m.workers.Wait()
        return nil
    }

    Note: this still has a small race window between the load and the send — workable if producers also recover(), OR if the load+send is wrapped in a sync.RWMutex.RLock() paired with Lock() around closed.Store(true); close(...).

  2. context.Context cancellation — drop the channel close entirely; signal shutdown via a context, have workers select on ctx.Done() AND m.updateChan (already do), have producers select on ctx.Done() AND send-with-default. This avoids close(channel) and the inherent send-on-closed-channel hazard.

I'd suggest option 2 since processUpdates already selects on ctx.Done() — closing the channel is largely redundant.

Environment

  • Cortex master at 29b6167d6 (and reproducible on later commits).
  • Found during a static audit of CPU / memory / goroutine leak risks across pkg/.

Additional context

This is one of several lifecycle-race patterns surfaced by the audit; happy to send fixes if maintainers want.

🤖 Reported with help from Claude Code

Metadata

Metadata

Assignees

No one assigned

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions