-
Notifications
You must be signed in to change notification settings - Fork 1
fix: isolate exporter timeout phases #154
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,7 +20,9 @@ import ( | |
| "context" | ||
| "crypto/rand" | ||
| "encoding/hex" | ||
| "errors" | ||
| "fmt" | ||
| "sync" | ||
| "time" | ||
|
|
||
| apiv1 "github.com/NVIDIA/fleet-intelligence-sdk/api/v1" | ||
|
|
@@ -72,6 +74,10 @@ type collector struct { | |
| lastAttestationCollection time.Time | ||
| machineID string // Agent's stable identity from server initialization | ||
| dcgmGPUIndexes map[string]string // UUID → DCGM device ID override for GPU indices | ||
| machineInfoMu sync.RWMutex | ||
| cachedMachineInfo *machineinfo.MachineInfo | ||
| machineInfoCollecting bool | ||
| machineInfoFetcher func(nvidianvml.Instance, ...machineinfo.MachineInfoOption) (*machineinfo.MachineInfo, error) | ||
| } | ||
|
|
||
| // New creates a new health data collector | ||
|
|
@@ -104,6 +110,7 @@ func New( | |
| attestationManager: attestationManager, | ||
| machineID: machineID, | ||
| dcgmGPUIndexes: dcgmGPUIndexes, | ||
| machineInfoFetcher: machineinfo.GetMachineInfo, | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -124,33 +131,58 @@ func (c *collector) Collect(ctx context.Context) (*HealthData, error) { | |
| Timestamp: time.Now().UTC(), | ||
| } | ||
|
|
||
| if err := ctx.Err(); err != nil { | ||
| return data, err | ||
| } | ||
|
|
||
| // Collect machine info if enabled | ||
| if c.config.IncludeMachineInfo { | ||
| if err := c.collectMachineInfo(data); err != nil { | ||
| if err := c.collectMachineInfo(ctx, data); err != nil { | ||
| log.Logger.Errorw("Failed to collect machine info", "error", err) | ||
| } | ||
| } | ||
| if err := ctx.Err(); err != nil { | ||
| return data, err | ||
| } | ||
|
|
||
| // Collect metrics if enabled | ||
| if c.config.IncludeMetrics { | ||
| if err := c.collectMetrics(ctx, data); err != nil { | ||
| if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { | ||
| return data, err | ||
| } | ||
| log.Logger.Errorw("Failed to collect metrics", "error", err) | ||
| } | ||
| } | ||
| if err := ctx.Err(); err != nil { | ||
| return data, err | ||
| } | ||
|
|
||
| // Collect events if enabled | ||
| if c.config.IncludeEvents { | ||
| if err := c.collectEvents(ctx, data); err != nil { | ||
| if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { | ||
| return data, err | ||
| } | ||
| log.Logger.Errorw("Failed to collect events", "error", err) | ||
| } | ||
| } | ||
| if err := ctx.Err(); err != nil { | ||
| return data, err | ||
| } | ||
|
|
||
| // Collect component data if enabled | ||
| if c.config.IncludeComponentData { | ||
| if err := c.collectComponentData(data); err != nil { | ||
| if err := c.collectComponentData(ctx, data); err != nil { | ||
| if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { | ||
| return data, err | ||
| } | ||
| log.Logger.Errorw("Failed to collect component data", "error", err) | ||
| } | ||
| } | ||
| if err := ctx.Err(); err != nil { | ||
| return data, err | ||
| } | ||
|
|
||
| // Collect attestation data if provider is available | ||
| // Attestation is always enabled if manager is available | ||
|
|
@@ -166,27 +198,67 @@ func (c *collector) Collect(ctx context.Context) (*HealthData, error) { | |
| return data, nil | ||
| } | ||
|
|
||
| // collectMachineInfo collects machine hardware information | ||
| func (c *collector) collectMachineInfo(data *HealthData) error { | ||
| // collectMachineInfo attaches cached machine info when available and triggers | ||
| // a background refresh only until the cache has been populated. | ||
| func (c *collector) collectMachineInfo(ctx context.Context, data *HealthData) error { | ||
| if c.nvmlInstance == nil { | ||
| return fmt.Errorf("NVML instance not available") | ||
| } | ||
|
|
||
| var opts []machineinfo.MachineInfoOption | ||
| if len(c.dcgmGPUIndexes) > 0 { | ||
| opts = append(opts, machineinfo.WithDCGMGPUIndexes(c.dcgmGPUIndexes)) | ||
| if cached := c.getCachedMachineInfo(); cached != nil { | ||
| data.MachineInfo = cached | ||
| log.Logger.Debugw("Using cached machine info") | ||
| } | ||
|
|
||
| machineInfo, err := machineinfo.GetMachineInfo(c.nvmlInstance, opts...) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to get machine info: %w", err) | ||
| c.startMachineInfoRefresh() | ||
| if err := ctx.Err(); err != nil { | ||
| return err | ||
|
Comment on lines
+203
to
+215
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Don't enqueue uncancellable machine-info work from a dead collection.
Also applies to: 221-252 🤖 Prompt for AI Agents |
||
| } | ||
|
|
||
| data.MachineInfo = machineInfo | ||
| log.Logger.Debugw("Collected machine info", "machine_info", data.MachineInfo) | ||
| return nil | ||
| } | ||
|
|
||
| func (c *collector) startMachineInfoRefresh() { | ||
| c.machineInfoMu.Lock() | ||
| if c.machineInfoCollecting || c.cachedMachineInfo != nil { | ||
| c.machineInfoMu.Unlock() | ||
| return | ||
| } | ||
| c.machineInfoCollecting = true | ||
| c.machineInfoMu.Unlock() | ||
|
|
||
| go func() { | ||
| defer func() { | ||
| c.machineInfoMu.Lock() | ||
| c.machineInfoCollecting = false | ||
| c.machineInfoMu.Unlock() | ||
| }() | ||
|
|
||
| var opts []machineinfo.MachineInfoOption | ||
| if len(c.dcgmGPUIndexes) > 0 { | ||
| opts = append(opts, machineinfo.WithDCGMGPUIndexes(c.dcgmGPUIndexes)) | ||
| } | ||
|
|
||
| machineInfo, err := c.machineInfoFetcher(c.nvmlInstance, opts...) | ||
| if err != nil { | ||
| log.Logger.Errorw("Failed to refresh machine info", "error", err) | ||
| return | ||
| } | ||
|
|
||
| c.machineInfoMu.Lock() | ||
| c.cachedMachineInfo = machineInfo | ||
| c.machineInfoMu.Unlock() | ||
| log.Logger.Debugw("Refreshed machine info", "machine_info", machineInfo) | ||
| }() | ||
| } | ||
|
|
||
| func (c *collector) getCachedMachineInfo() *machineinfo.MachineInfo { | ||
| c.machineInfoMu.RLock() | ||
| defer c.machineInfoMu.RUnlock() | ||
|
|
||
| return c.cachedMachineInfo | ||
| } | ||
|
|
||
| // collectMetrics collects metrics data from the metrics store | ||
| func (c *collector) collectMetrics(ctx context.Context, data *HealthData) error { | ||
| if c.metricsStore == nil { | ||
|
|
@@ -219,8 +291,17 @@ func (c *collector) collectEvents(ctx context.Context, data *HealthData) error { | |
| } | ||
|
|
||
| for _, component := range components { | ||
| if err := ctx.Err(); err != nil { | ||
| data.Events = allEvents | ||
| return err | ||
| } | ||
|
|
||
| componentEvents, err := component.Events(ctx, since) | ||
| if err != nil { | ||
| if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { | ||
| data.Events = allEvents | ||
| return err | ||
| } | ||
| log.Logger.Errorw("Failed to get events from component", | ||
| "component", component.Name(), "error", err) | ||
| continue | ||
|
|
@@ -250,7 +331,7 @@ func (c *collector) collectEvents(ctx context.Context, data *HealthData) error { | |
| } | ||
|
|
||
| // collectComponentData collects health states from all components | ||
| func (c *collector) collectComponentData(data *HealthData) error { | ||
| func (c *collector) collectComponentData(ctx context.Context, data *HealthData) error { | ||
| if c.componentsRegistry == nil { | ||
| return fmt.Errorf("components registry not available") | ||
| } | ||
|
|
@@ -259,10 +340,19 @@ func (c *collector) collectComponentData(data *HealthData) error { | |
| components := c.componentsRegistry.All() | ||
|
|
||
| for _, component := range components { | ||
| if err := ctx.Err(); err != nil { | ||
| data.ComponentData = componentData | ||
| return err | ||
| } | ||
|
|
||
| componentName := component.Name() | ||
|
|
||
| // Get health states | ||
| healthStates := component.LastHealthStates() | ||
| if err := ctx.Err(); err != nil { | ||
| data.ComponentData = componentData | ||
| return err | ||
| } | ||
| log.Logger.Debugw("Collecting health states", | ||
| "component", componentName, "health_states", healthStates) | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Treat machine-info cancellation like the other partial-data phases.
This branch still logs
context.Canceled/context.DeadlineExceededasErrorwbefore returning the same partial-data error path. Metrics, events, and component data already suppress that noise; machine info should match.🔧 Suggested adjustment
if c.config.IncludeMachineInfo { if err := c.collectMachineInfo(ctx, data); err != nil { + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + return data, err + } log.Logger.Errorw("Failed to collect machine info", "error", err) } }📝 Committable suggestion
🤖 Prompt for AI Agents