feat(fleetnode): carry concurrent commands over the ControlStream#389
Conversation
Groundwork for routing operator miner commands to fleet-node-paired miners (RFC 0001's remote-node Miner adapter). No behavior change to discovery. - Wrap the ControlStream payload in a typed pairing.v1.AgentCommand envelope so the node can tell command kinds apart; discovery migrates into the `discover` arm. Fields 2 and 3 are left for the upcoming miner-command and pairing arms so the discovery payload is migrated exactly once. - Registry: replace the single-in-flight-command-per-node model with a command_id-keyed map. Report-bearing commands (discovery, later pairing) stream batches + a terminal ack on an events channel; ack-only commands take the terminal ack on a per-command channel via a new blocking SendCommand. The outbound queue is buffered and teardown frees every in-flight command. - Node: replace the single command worker with a bounded worker pool, so a long discovery no longer head-of-line-blocks quick commands; BUSY is acked only at the pool ceiling. handleCommand dispatches on envelope kind. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
🔐 Codex Security Review
Review SummaryOverall Risk: HIGH Findings[HIGH] ControlCommand payload change breaks mixed-version fleetd/fleetnode deployments
[MEDIUM] Commands are registered before enqueueing, allowing unbounded per-node inflight growth
NotesNo auth bypass, SQL injection, command injection, cryptostealing/pool hijack, or frontend XSS issues were apparent in the scoped diff. Generated by Codex Security Review | |
Condense the AgentCommand envelope doc (drop the future-arm field list), trim a speculative line on reportBearing, and remove changelog-style "unlike before" narration on Send. Regenerate pairing proto output for the reworded AgentCommand doc. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Send and SendCommand duplicated the outbound-queue select (enqueue, or fail on disconnect/ctx). Extract Registry.enqueue so both share one definition of the enqueue-or-fail semantics and error mapping. No behavior change. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Reconcile with #365 (fan-out nmap discovery), which refactored fleetnode discovery: - registry.go / registry_test.go: keep the concurrent-command generalization and re-add ConnectedFleetNodeIDs (+ its test) from main. - admin/handler.go: take main's slimmed handler; DiscoverOnFleetNode moved into the new fleetnode/discovery service. - discovery/service.go: wrap the discovery payload in the AgentCommand envelope (the migration follows the handler here) so the node decodes it; updated its test. - control_test.go: stub discoverer clones reports, since the worker pool runs probes concurrently. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Bring #390 onto the reconciled #389 (which now includes main + #365). Only the shared pairing proto conflicted: keep the MinerCommand additions and the AgentCommand envelope (field 2 = miner_command now implemented, 3 = pair still reserved), then regenerate pairing Go/TS with the pinned toolchain. All other files merged cleanly; #365's discovery refactor and the AgentCommand migration arrive via #389. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…mands Discovery is a heavy network scan; running two concurrently on one node doubles the load for no benefit. Give report-bearing commands (discovery, and the pairing effort's future pair) an exclusive single-flight slot per node, so a second concurrent discovery is acked BUSY. Quick per-miner commands keep the broader worker pool, so a long discovery never head-of-line-blocks them. The node classifies by AgentCommand arm (it has no registry). Adds a regression test asserting a second concurrent discovery gets ACK_CODE_BUSY. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
… discovery ack - decodeAgentCommand parses ControlCommand.payload once in the receive loop and hands it to handleCommand, removing the redundant second unmarshal. - deliverAck evicts one best-effort batch to deliver the terminal ack when a report-bearing event buffer is full, so RunOnNode no longer strands until DiscoverCommandTimeout. Safe under Registry.mu (every events producer holds it). - Add TestRegistry_TerminalAckDeliveredWhenEventBufferFull. - Drop em-dashes from comments. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: c7aade407a
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
There was a problem hiding this comment.
Pull request overview
This PR lays the groundwork for routing multiple concurrent server→fleet-node commands over the existing ControlStream by (1) introducing a typed AgentCommand envelope for the payload, (2) refactoring the server-side ControlStream registry to support many in-flight commands keyed by command_id, and (3) updating the fleet node to process commands concurrently (with a bounded pool) so long-running discovery doesn’t block other commands.
Changes:
- Wrap discovery payloads in a new
pairing.v1.AgentCommandoneof envelope and update decode paths/tests accordingly. - Replace the single in-flight command model with a
command_id→inflight map, addingSendCommandfor ack-only commands and improving teardown/unblock behavior. - Update fleet node control loop to a two-lane concurrency model (single-flight discovery + pooled quick commands), plus new/updated tests for concurrency and error cases.
Reviewed changes
Copilot reviewed 11 out of 13 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
| server/internal/handlers/fleetnode/admin/handler_discover_test.go | Updates admin discovery tests to decode AgentCommand envelope. |
| server/internal/domain/fleetnode/discovery/service.go | Sends discovery as AgentCommand{discover: ...} and updates comments. |
| server/internal/domain/fleetnode/discovery/service_test.go | Adjusts discovery service tests to decode the new envelope. |
| server/internal/domain/fleetnode/control/stream.go | Registry stream now supports multiple in-flight commands; routes ack by command kind. |
| server/internal/domain/fleetnode/control/session.go | Refactors report-bearing Send to use new in-flight command map and enqueue helper. |
| server/internal/domain/fleetnode/control/registry.go | Introduces per-connection outgoing buffer + in-flight map + add/remove helpers. |
| server/internal/domain/fleetnode/control/registry_test.go | Adds coverage for concurrent commands, ack routing, teardown, buffer-full ack delivery. |
| server/internal/domain/fleetnode/control/command.go | Adds SendCommand API for ack-only commands (blocking until terminal ack). |
| server/cmd/fleetnode/control.go | Replaces single worker with concurrency lanes + envelope decoding + dispatch by command kind. |
| server/cmd/fleetnode/control_test.go | Updates payload generation to AgentCommand envelope; adds new behavior tests. |
| proto/pairing/v1/pairing.proto | Adds AgentCommand message (oneof envelope). |
| server/generated/grpc/pairing/v1/pairing.pb.go | Generated Go output for the new proto message. |
| client/src/protoFleet/api/generated/pairing/v1/pairing_pb.ts | Generated TS output for the new proto message. |
…plicitly slot is already per-iteration (declared in the loop body), so the closure capture was correct, but cmd/env/parseErr were passed as args while the lane was captured. Pass the lane too, so each handler provably releases the lane it acquired and the pattern is uniform. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Summary
Groundwork for sending commands to miners paired via fleet nodes. Today the
fleet-node
ControlStreamcarries exactly one thing: server-initiated discovery(#235). This PR generalizes the control plane so it can carry many concurrent
commands per node, which is the prerequisite for routing operator miner commands
(reboot, curtail, …) down a node's stream via RFC 0001's remote-node
Mineradapter (a later PR).
No behavior change to discovery. This is Phase 1 of a 3-phase effort; it ships
no user-visible capability on its own.
What changed
AgentCommandenvelope (proto/pairing/v1): theControlCommand.payloadis now a
pairing.v1.AgentCommandoneof so the node can tell command kinds apart.Discovery migrates into the
discoverarm (done exactly once, on both the adminsend side and the node decode side). Fields
2/3are left for the upcomingminer-command and pairing arms, coordinated with the parallel pairing effort so the
discovery payload is never migrated twice.
internal/domain/fleetnode/control): replaces thesingle-in-flight-command-per-node model with a
command_id-keyed map.Report-bearing commands (discovery, and later pairing) stream batches + a terminal
ack on an
eventschannel and admit device reports against the existing scope/quota;ack-only commands (per-miner commands) take their terminal ack on a per-command
channel via a new blocking
SendCommand. The outbound queue is buffered, and teardownfrees every in-flight command. Channel-ownership invariants are preserved.
cmd/fleetnode/control.go): replaces the single command worker with abounded (16) worker pool, so a long (up to 10-min) discovery no longer
head-of-line-blocks quick commands and many commands run concurrently;
BUSYis ackedonly at the pool ceiling.
handleCommanddispatches on envelope kind.Test plan
go test -race ./internal/domain/fleetnode/control/... ./cmd/fleetnode/...— green.New/updated coverage: many concurrent commands not rejected, ack-routing by kind,
SendCommandblock/disconnect/ctx-cancel/teardown, node worker pool runs a quickcommand while a long discovery is in flight,
BUSYpast the pool ceiling, unknownenvelope kind →
BAD_REQUEST.DiscoverOnFleetNodeintegration suite — green (end-to-end enveloperound-trip through the real handler + registry).
serverbuild,golangci-lint, and pre-pushtsc— clean.Follow-ups (later PRs)
MinerCommandproto + remote-nodeMineradapter +fleet_node_deviceresolution branch + node-side executor (end-to-end for the
virtualdriver).🤖 Generated with Claude Code