Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions internal/exporter/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"crypto/rand"
"encoding/hex"
"errors"
"fmt"
"time"

Expand Down Expand Up @@ -124,33 +125,55 @@ func (c *collector) Collect(ctx context.Context) (*HealthData, error) {
Timestamp: time.Now().UTC(),
}

if err := ctx.Err(); err != nil {
return data, err
}

// Collect machine info if enabled
if c.config.IncludeMachineInfo {
if err := c.collectMachineInfo(data); err != nil {
log.Logger.Errorw("Failed to collect machine info", "error", err)
}
}
if err := ctx.Err(); err != nil {
return data, err
Comment on lines 139 to +145
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Treat machine-info cancellation like the other partial-data phases.

This branch still logs context.Canceled / context.DeadlineExceeded as Errorw before returning the same partial-data error path. Metrics, events, and component data already suppress that noise; machine info should match.

🔧 Suggested adjustment
 	if c.config.IncludeMachineInfo {
 		if err := c.collectMachineInfo(ctx, data); err != nil {
+			if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
+				return data, err
+			}
 			log.Logger.Errorw("Failed to collect machine info", "error", err)
 		}
 	}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if c.config.IncludeMachineInfo {
if err := c.collectMachineInfo(data); err != nil {
if err := c.collectMachineInfo(ctx, data); err != nil {
log.Logger.Errorw("Failed to collect machine info", "error", err)
}
}
if err := ctx.Err(); err != nil {
return data, err
if c.config.IncludeMachineInfo {
if err := c.collectMachineInfo(ctx, data); err != nil {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return data, err
}
log.Logger.Errorw("Failed to collect machine info", "error", err)
}
}
if err := ctx.Err(); err != nil {
return data, err
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/exporter/collector/collector.go` around lines 139 - 145, The
machine-info collection currently logs all errors from c.collectMachineInfo as
Errorw and then lets the function hit the ctx.Err() return path; change this to
mirror metrics/events/component behavior: after calling
c.collectMachineInfo(ctx, data) check the returned err, and if err is
context.Canceled or context.DeadlineExceeded (use errors.Is(err,
context.Canceled) or errors.Is(err, context.DeadlineExceeded)) do NOT call
log.Logger.Errorw — instead return the partial data and the ctx error
immediately; for other non-nil errors keep the existing Errorw call and
continue. Ensure you update the logic around c.config.IncludeMachineInfo and the
ctx.Err() handling so machine-info cancellation is suppressed like the other
partial-data phases.

}

// Collect metrics if enabled
if c.config.IncludeMetrics {
if err := c.collectMetrics(ctx, data); err != nil {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return data, err
}
log.Logger.Errorw("Failed to collect metrics", "error", err)
}
}
if err := ctx.Err(); err != nil {
return data, err
}

// Collect events if enabled
if c.config.IncludeEvents {
if err := c.collectEvents(ctx, data); err != nil {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return data, err
}
log.Logger.Errorw("Failed to collect events", "error", err)
}
}
if err := ctx.Err(); err != nil {
return data, err
}

// Collect component data if enabled
if c.config.IncludeComponentData {
if err := c.collectComponentData(data); err != nil {
log.Logger.Errorw("Failed to collect component data", "error", err)
}
}
if err := ctx.Err(); err != nil {
return data, err
}

// Collect attestation data if provider is available
// Attestation is always enabled if manager is available
Expand Down Expand Up @@ -219,8 +242,17 @@ func (c *collector) collectEvents(ctx context.Context, data *HealthData) error {
}

for _, component := range components {
if err := ctx.Err(); err != nil {
data.Events = allEvents
return err
}

componentEvents, err := component.Events(ctx, since)
if err != nil {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
data.Events = allEvents
return err
}
log.Logger.Errorw("Failed to get events from component",
"component", component.Name(), "error", err)
continue
Expand Down
58 changes: 58 additions & 0 deletions internal/exporter/collector/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,60 @@ func TestNew(t *testing.T) {
var _ = c
}

func TestCollectReturnsPartialDataWhenEventCollectionTimesOut(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())

firstEvent := apiv1.Event{
Component: "cpu",
Time: metav1.NewTime(time.Now().UTC()),
Name: "test_event",
Type: apiv1.EventTypeWarning,
Message: "first event",
}

registry := &mockRegistry{
components: []components.Component{
&mockComponent{
name: "cpu",
events: []apiv1.Event{firstEvent},
},
&mockComponent{
name: "memory",
eventFunc: func(ctx context.Context, since time.Time) (apiv1.Events, error) {
cancel()
<-ctx.Done()
return nil, ctx.Err()
},
},
},
}

c := New(
&config.HealthExporterConfig{
IncludeEvents: true,
EventsLookback: metav1.Duration{
Duration: time.Minute,
},
},
nil,
nil,
nil,
&mockEventStore{},
registry,
nil,
nil,
"test-machine-id",
nil,
)

data, err := c.Collect(ctx)
require.ErrorIs(t, err, context.Canceled)
require.NotNil(t, data)
require.Len(t, data.Events, 1)
assert.Equal(t, "cpu", data.Events[0].Component)
assert.Equal(t, "test_event", data.Events[0].Name)
}

func TestCollector_Collect_BasicFlow(t *testing.T) {
ctx := context.Background()
cfg := &config.HealthExporterConfig{
Expand Down Expand Up @@ -843,13 +897,17 @@ type mockComponent struct {
events []apiv1.Event
healthStates []apiv1.HealthState
shouldError bool
eventFunc func(ctx context.Context, since time.Time) (apiv1.Events, error)
}

func (m *mockComponent) Name() string {
return m.name
}

func (m *mockComponent) Events(ctx context.Context, since time.Time) (apiv1.Events, error) {
if m.eventFunc != nil {
return m.eventFunc(ctx, since)
}
if m.shouldError {
return nil, errors.New("mock component error")
}
Expand Down
24 changes: 17 additions & 7 deletions internal/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ package exporter

import (
"context"
"errors"
"fmt"
"time"

Expand Down Expand Up @@ -173,26 +174,35 @@ func (e *healthExporter) ExportNow(ctx context.Context) error {
// export performs the actual data export operation
func (e *healthExporter) export() error {
log.Logger.Infow("Starting health export")
ctx, cancel := context.WithTimeout(e.ctx, e.options.timeout)
defer cancel()
collectCtx, cancelCollect := context.WithTimeout(e.ctx, e.options.timeout)
defer cancelCollect()
Comment on lines +180 to +181
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Verify that ExportNow accepts a caller context while export() roots phase contexts in e.ctx.
rg -n -C2 --type go \
  -e 'func \(e \*healthExporter\) ExportNow' \
  -e 'func \(e \*healthExporter\) export\(' \
  -e 'context\.WithTimeout\(e\.ctx'

Repository: NVIDIA/fleet-intelligence-agent

Length of output: 1364


🏁 Script executed:

#!/bin/bash
# Find all calls to export() within the file
rg -n 'e\.export\(\)' internal/exporter/exporter.go

# Check the Start() method to understand context lifecycle
rg -n -A 20 'func \(e \*healthExporter\) Start' internal/exporter/exporter.go

# Check Stop/Close methods if they exist
rg -n -B 2 -A 10 'func \(e \*healthExporter\) Stop\|func \(e \*healthExporter\) Close' internal/exporter/exporter.go

Repository: NVIDIA/fleet-intelligence-agent

Length of output: 739


🏁 Script executed:

#!/bin/bash
# Show more of the Start() method, particularly around line 147
sed -n '123,160p' internal/exporter/exporter.go

# Search for where e.ctx is created (likely in constructor or New function)
rg -n 'e\.ctx.*=' internal/exporter/exporter.go | head -20

# Show the healthExporter struct definition
rg -n -B 5 -A 15 'type healthExporter struct' internal/exporter/exporter.go

Repository: NVIDIA/fleet-intelligence-agent

Length of output: 1737


🏁 Script executed:

#!/bin/bash
# Show the complete New() function
sed -n '61,100p' internal/exporter/exporter.go

# Show the complete Stop() method
rg -n -A 20 'func \(e \*healthExporter\) Stop\(\)' internal/exporter/exporter.go

Repository: NVIDIA/fleet-intelligence-agent

Length of output: 2028


Pass the caller context to the export method.

ExportNow(ctx) accepts a context parameter but ignores it—the export() method roots both phase contexts in e.ctx instead. This prevents caller deadlines and cancellations from taking effect for on-demand exports.

Additionally, when Stop() calls e.cancel(), it cancels e.ctx. Any in-flight export() call will then create timeout contexts from an already-canceled parent, immediately failing both collection and upload phases and defeating the intended independent timeout budgets. Thread the caller context into export() for on-demand invocations (line 171) and return early when collection is canceled to avoid propagating cancellation into the upload phase.

Also applies to: 193-198, 205-206

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/exporter/exporter.go` around lines 180 - 181, ExportNow(ctx)
currently ignores the caller context and export() roots phase contexts off
e.ctx, so thread the caller-provided ctx into export() (call export(ctx, ...))
and within export use the caller ctx as the parent for phase timeouts (use
context.WithTimeout(ctx, e.options.timeout) for collection and likewise for
upload) instead of context.WithTimeout(e.ctx,...); after the collection phase
returns, check collectCtx.Err() (or context cancellation) and return early if
cancelled/expired so you don't start upload when collection was cancelled; this
also ensures Stop() canceling e.ctx won't immediately cancel on-demand exports
that used the caller ctx.


// Refresh configuration from metadata on every export
// If the endpoints/auth token are not empty, export will continue
// If the endpoints/auth token are empty, exportHTTP will skip
e.refreshConfigFromMetadata(ctx)
e.refreshConfigFromMetadata(collectCtx)

// Collect health data
healthData, err := e.collector.Collect(ctx)
if err != nil {
healthData, err := e.collector.Collect(collectCtx)
if err != nil && healthData == nil {
return fmt.Errorf("collection failed: %w", err)
}
if err != nil {
log.Logger.Warnw("Collection completed with partial data",
"error", err,
"timed_out", errors.Is(err, context.DeadlineExceeded),
"canceled", errors.Is(err, context.Canceled))
}

// Export data based on mode
if e.options.config.OfflineMode {
return e.exportToFile(healthData)
} else {
return e.exportToHTTP(ctx, healthData)
}

uploadCtx, cancelUpload := context.WithTimeout(e.ctx, e.options.timeout)
defer cancelUpload()

return e.exportToHTTP(uploadCtx, healthData)
}

// exportToFile writes health data to files
Expand Down
58 changes: 58 additions & 0 deletions internal/exporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (

"github.com/NVIDIA/fleet-intelligence-agent/internal/config"
"github.com/NVIDIA/fleet-intelligence-agent/internal/exporter/collector"
"github.com/NVIDIA/fleet-intelligence-agent/internal/exporter/writer"
"github.com/NVIDIA/fleet-intelligence-agent/internal/machineinfo"
)

Expand All @@ -58,6 +59,16 @@ func (m *MockCollector) Collect(ctx context.Context) (*collector.HealthData, err
return args.Get(0).(*collector.HealthData), args.Error(1)
}

type mockHTTPWriter struct {
sendFunc func(ctx context.Context, data *collector.HealthData, metricsEndpoint string, logsEndpoint string, maxRetries int, authToken string) (string, error)
}

func (m *mockHTTPWriter) Send(ctx context.Context, data *collector.HealthData, metricsEndpoint string, logsEndpoint string, maxRetries int, authToken string) (string, error) {
return m.sendFunc(ctx, data, metricsEndpoint, logsEndpoint, maxRetries, authToken)
}

func (m *mockHTTPWriter) SetJWTRefreshFunc(refreshFunc writer.JWTRefreshFunc) {}

// MockMetricsStore is a mock implementation of pkgmetrics.Store
type MockMetricsStore struct {
mock.Mock
Expand Down Expand Up @@ -1115,6 +1126,53 @@ func TestExportWithCollectorError(t *testing.T) {
require.NoError(t, err)
}

func TestExportUsesFreshUploadContextAfterPartialCollection(t *testing.T) {
cfg := &config.HealthExporterConfig{
Interval: metav1.Duration{Duration: 1 * time.Minute},
Timeout: metav1.Duration{Duration: 20 * time.Millisecond},
MetricsEndpoint: "https://metrics.example.com",
LogsEndpoint: "https://logs.example.com",
AuthToken: "test-token",
}

ctx := context.Background()
exporter, err := New(ctx, WithConfig(cfg), WithMachineID("test-machine-id"))
require.NoError(t, err)

he := exporter.(*healthExporter)
healthData := &collector.HealthData{
CollectionID: "test-collection",
MachineID: "test-machine-id",
Timestamp: time.Now(),
}

mockCollector := &MockCollector{}
mockCollector.
On("Collect", mock.Anything).
Run(func(args mock.Arguments) {
<-args.Get(0).(context.Context).Done()
}).
Return(healthData, context.DeadlineExceeded)
he.collector = mockCollector

sendCalled := false
he.httpWriter = &mockHTTPWriter{
sendFunc: func(ctx context.Context, data *collector.HealthData, metricsEndpoint string, logsEndpoint string, maxRetries int, authToken string) (string, error) {
sendCalled = true
assert.NoError(t, ctx.Err(), "upload should use a fresh context")
assert.Equal(t, healthData, data)
return "", nil
},
}

err = he.export()
require.NoError(t, err)
assert.True(t, sendCalled, "upload should still run with partial collection data")

err = exporter.Stop()
require.NoError(t, err)
}

// TestExportHTTPWithReturnedToken tests handling of JWT token returned from server
func TestExportHTTPWithReturnedToken(t *testing.T) {
newJWT := "new-jwt-token-from-server"
Expand Down
Loading