Add row miner refresh RPC#437
Conversation
…rs-rpc # Conflicts: # server/internal/domain/telemetry/service.go
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 7b4286cf2a
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| snapshots, _, _, err := s.buildSnapshotsFromUnifiedQuery(ctx, orgID, "", int32(len(deviceIDs)), &interfaces.MinerFilter{ | ||
| DeviceIdentifiers: deviceIDs, | ||
| }, nil) |
There was a problem hiding this comment.
Filter refreshed snapshots by requested device IDs
When RefreshMiners asks for snapshots for a specific ID, this filter does not actually constrain the dynamic list query: buildSnapshotsFromUnifiedQuery goes through SQLDeviceStore.buildListQuerySQL, and appendFilterSQL never emits a device_identifier = ANY(...) predicate for MinerFilter.DeviceIdentifiers. As a result, refreshing any miner other than the first miner in the default ordering returns an unrelated snapshot (and useFleet.mergeMiners updates that unrelated row) while reporting no error for the requested device.
Useful? React with 👍 / 👎.
| processErr := s.processDevice(ctx, device) | ||
| flushErr := s.FlushStatusNow(ctx) |
There was a problem hiding this comment.
Flush metrics before reading refreshed snapshots
For successful metric polls, processDevice only enqueues the new metrics on s.metricsResults; this path flushes only status before RefreshMiners immediately calls GetLatestDeviceMetrics while building the response. In normal operation the metrics writer may not persist the sample until its next tick, so the returned refreshed snapshot can still contain stale hashrate/power/temperature values even though the RPC just fetched fresh telemetry.
Useful? React with 👍 / 👎.
🔐 Codex Security Review
Review SummaryOverall Risk: MEDIUM Findings[MEDIUM] Refresh response can return stale telemetry metrics
[MEDIUM] Refresh fan-out bypasses per-device in-flight protection
[LOW] Raw backend/plugin errors are returned as response data
NotesReviewed only Generated by Codex Security Review | |
There was a problem hiding this comment.
Pull request overview
Adds a row-scoped “Refresh” flow that lets ProtoFleet request an immediate telemetry/status collection for explicit device IDs, flush status updates, and return updated miner snapshots with per-device errors—so the UI can reflect remediation quickly without waiting for the normal polling cycle.
Changes:
- Introduces
RefreshMinersConnect-RPC (+ proto + permissions) and server-side fan-out/timeout/concurrency handling. - Extends the telemetry domain with a synchronous status flush mechanism and a refresh entry point.
- Wires the Fleet UI row action to call refresh, show in-flight indicators, and merge refreshed snapshots into the visible list; adds a small dev fault-proxy script for offline/recovery testing.
Reviewed changes
Copilot reviewed 24 out of 27 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| server/internal/handlers/middleware/rpc_permissions.go | Grants RefreshMiners RPC the miner-read permission gate. |
| server/internal/handlers/fleetmanagement/handler.go | Adds the RefreshMiners RPC handler wiring to the domain service. |
| server/internal/domain/telemetry/service.go | Adds status flush requests + RefreshDevice/FlushStatusNow; updates in-flight handling and status writer flushing. |
| server/internal/domain/fleetmanagement/telemetry.go | Extends TelemetryCollector interface with RefreshDevice for fleet mgmt. |
| server/internal/domain/fleetmanagement/service.go | Implements RefreshMiners server logic + helper to fetch snapshots by explicit IDs. |
| server/generated/grpc/fleetmanagement/v1/fleetmanagementv1connect/fleetmanagement.connect.go | Regenerated Connect client/server bindings for new RPC. |
| scripts/dev/miner_fault_proxy.py | Adds a tiny TCP fault-injection proxy for local refresh/offline testing. |
| proto/fleetmanagement/v1/fleetmanagement.proto | Adds RefreshMiners RPC and request/response messages. |
| docs/plans/2026-06-11-refresh-miners-rpc-and-row-bulk-actions-tdd.md | Adds TDD describing the refresh RPC and row action behavior. |
| client/src/protoFleet/features/fleetManagement/components/RowActionsMenu/RowActionsMenu.tsx | Adds per-action disabled support for row action items. |
| client/src/protoFleet/features/fleetManagement/components/MinerList/types.ts | Adds per-row isRefreshing flag to list item type. |
| client/src/protoFleet/features/fleetManagement/components/MinerList/MinerStatusCell.tsx | Plumbs refresh state through to status rendering. |
| client/src/protoFleet/features/fleetManagement/components/MinerList/MinerStatus.tsx | Adds a “Refreshing” UI state with spinner. |
| client/src/protoFleet/features/fleetManagement/components/MinerList/MinerStatus.test.tsx | Adds unit test coverage for refreshing UI state. |
| client/src/protoFleet/features/fleetManagement/components/MinerList/MinerName.tsx | Passes merge/refresh-state callbacks down to action menu. |
| client/src/protoFleet/features/fleetManagement/components/MinerList/MinerList.tsx | Tracks refreshing IDs and threads merge/refresh-state callbacks into column config. |
| client/src/protoFleet/features/fleetManagement/components/MinerList/minerColConfig.tsx | Shows skeleton telemetry cells while a row is refreshing. |
| client/src/protoFleet/features/fleetManagement/components/MinerActionsMenu/SingleMinerActionsMenu.tsx | Adds “Refresh” row action that calls the RPC, merges snapshots, and toggles refresh state. |
| client/src/protoFleet/features/fleetManagement/components/Fleet/Fleet.tsx | Exposes mergeMiners and ensures error refetch on global refetch. |
| client/src/protoFleet/features/fleetManagement/components/Fleet/Fleet.test.tsx | Updates mock shape for new mergeMiners hook return. |
| client/src/protoFleet/features/auth/components/AuthenticateMiners/AuthenticateMiners.test.tsx | Updates mocked useFleet return shape to include mergeMiners. |
| client/src/protoFleet/api/useRefreshMiners.ts | Adds a client hook for calling RefreshMiners and tracking in-flight IDs. |
| client/src/protoFleet/api/useRefreshMiners.test.ts | Adds unit tests for hook validation, return shape, and refreshing tracking. |
| client/src/protoFleet/api/useFleet.ts | Adds mergeMiners upsert helper for optimistic snapshot merges. |
| client/src/protoFleet/api/useAuthNeededMiners.test.ts | Updates mocked useFleet return shape to include mergeMiners. |
| client/src/protoFleet/api/generated/fleetmanagement/v1/fleetmanagement_pb.ts | Regenerated TS protobuf types including the new refresh messages/service method. |
| // RefreshDevice runs the same collection path used by scheduled telemetry for | ||
| // one device, then asks the status writer to flush pending status updates. | ||
| func (s *TelemetryService) RefreshDevice(ctx context.Context, device models.Device) error { | ||
| processErr := s.processDevice(ctx, device) | ||
| flushErr := s.FlushStatusNow(ctx) | ||
| if processErr != nil { | ||
| return processErr | ||
| } | ||
| return flushErr | ||
| } |
| // RefreshDevice runs the same collection path used by scheduled telemetry for | ||
| // one device, then asks the status writer to flush pending status updates. | ||
| func (s *TelemetryService) RefreshDevice(ctx context.Context, device models.Device) error { | ||
| processErr := s.processDevice(ctx, device) | ||
| flushErr := s.FlushStatusNow(ctx) | ||
| if processErr != nil { | ||
| return processErr | ||
| } | ||
| return flushErr | ||
| } | ||
|
|
||
| func (s *TelemetryService) FlushStatusNow(ctx context.Context) error { | ||
| req := statusFlushRequest{done: make(chan error, 1)} | ||
|
|
||
| select { | ||
| case s.statusFlushRequests <- req: | ||
| case <-ctx.Done(): | ||
| return ctx.Err() | ||
| } | ||
|
|
||
| select { | ||
| case err := <-req.done: | ||
| return err | ||
| case <-ctx.Done(): | ||
| return ctx.Err() | ||
| } | ||
| } |
| for _, id := range req.DeviceIds { | ||
| if strings.TrimSpace(id) == "" { | ||
| return nil, fleeterror.NewInvalidArgumentError("device_ids cannot contain empty device identifiers") | ||
| } | ||
| } |
| for result := range results { | ||
| if result.errMsg != "" { | ||
| resp.Errors[result.id] = result.errMsg | ||
| } | ||
| if result.snapshot != nil { | ||
| resp.Snapshots = append(resp.Snapshots, result.snapshot) | ||
| } | ||
| } |
| func (s *Service) RefreshMiners(ctx context.Context, req *pb.RefreshMinersRequest) (*pb.RefreshMinersResponse, error) { | ||
| if len(req.DeviceIds) == 0 { | ||
| return nil, fleeterror.NewInvalidArgumentError("device_ids must contain at least one device identifier") | ||
| } | ||
| if len(req.DeviceIds) > refreshMinersMaxDevices { | ||
| return nil, fleeterror.NewInvalidArgumentErrorf("device_ids must contain at most %d device identifiers", refreshMinersMaxDevices) | ||
| } | ||
| for _, id := range req.DeviceIds { | ||
| if strings.TrimSpace(id) == "" { | ||
| return nil, fleeterror.NewInvalidArgumentError("device_ids cannot contain empty device identifiers") | ||
| } | ||
| } |
| const mergeMiners = useCallback((snapshots: MinerStateSnapshot[]) => { | ||
| if (snapshots.length === 0) { | ||
| return; | ||
| } | ||
|
|
||
| setMiners((prev) => { | ||
| let next: Record<string, MinerStateSnapshot> | undefined; | ||
|
|
||
| snapshots.forEach((snapshot) => { | ||
| const existingMiner = prev[snapshot.deviceIdentifier]; | ||
| if (existingMiner && equals(MinerStateSnapshotSchema, existingMiner, snapshot)) { | ||
| return; | ||
| } | ||
|
|
||
| next ??= { ...prev }; | ||
| next[snapshot.deviceIdentifier] = snapshot; | ||
| }); | ||
|
|
||
| return next ?? prev; | ||
| }); | ||
| }, []); |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 2e215d417b
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| func (s *TelemetryService) RefreshDevice(ctx context.Context, device models.Device) error { | ||
| processErr := s.processDevice(ctx, device) |
There was a problem hiding this comment.
Prevent duplicate refresh collections
When a row refresh is requested while the scheduled worker is already polling the same device, this path bypasses the inFlight guard used by workers, so processDevice can run concurrently for that miner. That issues duplicate plugin calls and can race the queued status/metrics writes, and the TDD for this change explicitly relies on the in-flight collection being reused rather than starting another one. Please claim/wait on s.inFlight in RefreshDevice before calling processDevice.
Useful? React with 👍 / 👎.
This PR adds a row-scoped miner refresh path so operators can ask Fleet to re-read one miner immediately instead of waiting for the normal telemetry poll. Architecturally, it introduces
FleetManagementService.RefreshMiners, which accepts explicit device IDs, validates org scope, invokes the telemetry collector for each device with bounded concurrency, flushes pending status writes, and returns the latestMinerStateSnapshotrows alongside per-device errors.The data flow is: row action ->
useRefreshMiners-> generated Connect client -> fleetmanagement handler/service -> telemetryRefreshDevice-> plugin-backed miner collection/status update -> snapshot reload -> client snapshot merge plus current-page/error refetch. The UI adds a rowRefreshaction, status spinner, telemetry skeletons, and preserves failure visibility by showing per-device refresh errors while still merging any fresh snapshot returned by the server.The telemetry layer now has an explicit flush path for status results so row refreshes can reflect offline/recovered state without waiting for the writer batch interval, while normal polling and scheduler behavior remain intact. The PR also includes a tiny dev TCP fault proxy for validating miner offline/recovery behavior against simulator miners.
Non-goals: this does not add bulk/select-all refresh, does not change command dispatch semantics, and does not implement post-command verification loops; those are deferred follow-up work.