Skip to content

Commit b8a9404

Browse files
fix: handle OpAMP AgentDisconnect message (#6792)
* fix: handle OpAMP AgentDisconnect message When an enrolled agent sends an AgentDisconnect message, set its last_checkin_status to "disconnected" via the bulk checkin system. Return a BadRequest error if an unenrolled agent sends a disconnect message. Closes #6784 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Use const for status --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 20b13a6 commit b8a9404

4 files changed

Lines changed: 101 additions & 8 deletions

File tree

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
kind: bug-fix
2+
summary: Handle OpAMP AgentDisconnect message by setting agent status to disconnected
3+
component: fleet-server
4+
issue: https://github.com/elastic/fleet-server/issues/6784

docs/opamp.md

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,8 @@ Fleet-server operates in monitoring-only mode. The `ServerToAgent` response only
2727

2828
### Agent-to-server fields ignored
2929

30-
Fleet-server reads `instance_uid`, `agent_description`, `capabilities`, `health`, `effective_config`, and `sequence_num` from `AgentToServer` messages. The following fields are ignored:
30+
The following fields are ignored:
3131

32-
- `agent_disconnect` — The spec says this MUST be set in the agent's last message.
3332
- `remote_config_status`
3433
- `package_statuses`
3534
- `flags` (e.g., `RequestInstanceUid`)

internal/pkg/api/handleOpAMP.go

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@ import (
3535
)
3636

3737
const (
38-
kOpAMPMod = "opAMP"
38+
kOpAMPMod = "opAMP"
39+
statusDisconnected = "disconnected"
3940
)
4041

4142
type OpAMPT struct {
@@ -177,6 +178,26 @@ func (oa *OpAMPT) handleMessage(zlog zerolog.Logger, apiKey *apikey.APIKey) func
177178
Bool("is_enrolled", agent != nil).
178179
Msg("agent enrollment status")
179180

181+
// Handle agent disconnect: set status to offline for enrolled agents,
182+
// return an error for unenrolled agents.
183+
if message.AgentDisconnect != nil {
184+
if agent == nil {
185+
zlog.Debug().Msg("agent disconnect received from unenrolled agent")
186+
return &protobufs.ServerToAgent{
187+
InstanceUid: instanceUID.Bytes(),
188+
ErrorResponse: &protobufs.ServerErrorResponse{
189+
Type: protobufs.ServerErrorResponseType_ServerErrorResponseType_BadRequest,
190+
ErrorMessage: "agent is not enrolled",
191+
},
192+
}
193+
}
194+
zlog.Debug().Msg("agent disconnect received")
195+
_ = oa.bc.CheckIn(instanceUID.String(), checkin.WithStatus(statusDisconnected))
196+
return &protobufs.ServerToAgent{
197+
InstanceUid: instanceUID.Bytes(),
198+
}
199+
}
200+
180201
if agent == nil {
181202
if agent, err = oa.enrollAgent(zlog, instanceUID.String(), message, apiKey); err != nil {
182203
return &protobufs.ServerToAgent{

internal/pkg/api/handleOpAMP_test.go

Lines changed: 74 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,18 +12,21 @@ import (
1212
"testing"
1313
"unsafe"
1414

15+
"github.com/gofrs/uuid/v5"
16+
17+
"github.com/open-telemetry/opamp-go/protobufs"
18+
"github.com/rs/zerolog"
19+
"github.com/stretchr/testify/mock"
20+
"github.com/stretchr/testify/require"
21+
semconv "go.opentelemetry.io/otel/semconv/v1.37.0"
22+
1523
"github.com/elastic/fleet-server/v7/internal/pkg/apikey"
1624
"github.com/elastic/fleet-server/v7/internal/pkg/checkin"
1725
"github.com/elastic/fleet-server/v7/internal/pkg/config"
1826
"github.com/elastic/fleet-server/v7/internal/pkg/dl"
1927
"github.com/elastic/fleet-server/v7/internal/pkg/es"
2028
"github.com/elastic/fleet-server/v7/internal/pkg/model"
2129
ftesting "github.com/elastic/fleet-server/v7/internal/pkg/testing"
22-
"github.com/open-telemetry/opamp-go/protobufs"
23-
"github.com/rs/zerolog"
24-
"github.com/stretchr/testify/mock"
25-
"github.com/stretchr/testify/require"
26-
semconv "go.opentelemetry.io/otel/semconv/v1.37.0"
2730
)
2831

2932
func TestFeatureFlag(t *testing.T) {
@@ -275,6 +278,72 @@ func TestUpdateAgentWithAgentToServerMessage(t *testing.T) {
275278
require.Equal(t, float64(2), config["num"])
276279
}
277280

281+
func TestHandleMessageAgentDisconnect(t *testing.T) {
282+
cases := []struct {
283+
name string
284+
getBulker func(t *testing.T) *ftesting.MockBulk
285+
wantError bool
286+
}{
287+
{
288+
name: "enrolled agent sets status to offline",
289+
getBulker: func(t *testing.T) *ftesting.MockBulk {
290+
t.Helper()
291+
bulker := ftesting.NewMockBulk()
292+
agent := model.Agent{LastCheckinStatus: "online"}
293+
agentBytes, err := json.Marshal(agent)
294+
require.NoError(t, err)
295+
bulker.On("Search", mock.Anything, dl.FleetAgents, mock.Anything, mock.Anything).
296+
Return(&es.ResultT{HitsT: es.HitsT{Hits: []es.HitT{{ID: "agent-123", Source: agentBytes}}}}, nil)
297+
return bulker
298+
},
299+
wantError: false,
300+
},
301+
{
302+
name: "unenrolled agent returns error",
303+
getBulker: func(_ *testing.T) *ftesting.MockBulk {
304+
bulker := ftesting.NewMockBulk()
305+
bulker.On("Search", mock.Anything, dl.FleetAgents, mock.Anything, mock.Anything).
306+
Return(&es.ResultT{HitsT: es.HitsT{Hits: []es.HitT{}}}, nil)
307+
return bulker
308+
},
309+
wantError: true,
310+
},
311+
}
312+
for _, tc := range cases {
313+
t.Run(tc.name, func(t *testing.T) {
314+
bulker := tc.getBulker(t)
315+
checker := &mockCheckin{}
316+
oa := &OpAMPT{bulk: bulker, bc: checker}
317+
318+
agentUID := uuid.Must(uuid.NewV7())
319+
zlog := zerolog.New(io.Discard)
320+
apiKey := &apikey.APIKey{ID: "test-key"}
321+
322+
handler := oa.handleMessage(zlog, apiKey)
323+
msg := &protobufs.AgentToServer{
324+
InstanceUid: agentUID.Bytes(),
325+
AgentDisconnect: &protobufs.AgentDisconnect{},
326+
}
327+
328+
resp := handler(t.Context(), nil, msg)
329+
require.Equal(t, agentUID.Bytes(), resp.InstanceUid)
330+
331+
if tc.wantError {
332+
require.NotNil(t, resp.ErrorResponse)
333+
require.Equal(t, protobufs.ServerErrorResponseType_ServerErrorResponseType_BadRequest, resp.ErrorResponse.Type)
334+
require.Empty(t, checker.id, "CheckIn should not be called for unenrolled agent")
335+
} else {
336+
require.Nil(t, resp.ErrorResponse)
337+
require.Equal(t, uint64(0), resp.Capabilities)
338+
require.Equal(t, agentUID.String(), checker.id)
339+
340+
pending := pendingFromOptions(t, checker.opts)
341+
require.Equal(t, statusDisconnected, getUnexportedField(pending, "status").String())
342+
}
343+
})
344+
}
345+
}
346+
278347
type mockCheckin struct {
279348
id string
280349
opts []checkin.Option

0 commit comments

Comments
 (0)