Skip to content

Commit 88d0d7d

Browse files
committed
feat(agent): add native auditd collector for Linux
- Implement native auditd collector using go-libaudit v2 with netlink multicast - Add enterprise-ready auditd configuration (50-utmstack.rules) - Respect existing customer audit rules (additive approach) - Add cleanup on agent uninstall (removes UTMStack rules only) - Support automatic auditd installation on Debian/Ubuntu/RHEL/Fedora - Handle migration path for existing auditd installations - Add distro detection for package manager selection - Remove legacy beats/filebeat commented code
1 parent 36d07d7 commit 88d0d7d

22 files changed

Lines changed: 1267 additions & 111 deletions

agent/cmd/uninstall.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,15 @@ var uninstallCmd = &cobra.Command{
4747
if err = pb.DeleteAgent(cnf); err != nil {
4848
utils.Logger.ErrorF("error deleting agent: %v", err)
4949
}
50+
51+
// Uninstall dependencies (cleanup auditd rules, etc.)
52+
fmt.Print("Cleaning up dependencies... ")
53+
if err = dependency.UninstallAll(); err != nil {
54+
fmt.Printf("Warning: %v\n", err)
55+
} else {
56+
fmt.Println("[OK]")
57+
}
58+
5059
if err = collector.UninstallAll(); err != nil {
5160
fmt.Printf("error uninstalling collectors: %v\n", err)
5261
os.Exit(1)

agent/collector/auditd/auditd.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
// Package auditd provides a native collector for Linux Audit Framework events.
2+
// It uses go-libaudit to receive events via netlink multicast and reassembles
3+
// them before sending to the log queue.
4+
package auditd
5+
6+
import "time"
7+
8+
const (
9+
// auditdRestartDelay is the initial delay between reconnection attempts
10+
auditdRestartDelay = 5 * time.Second
11+
12+
// auditdMaxRestartDelay is the maximum backoff delay for reconnection
13+
auditdMaxRestartDelay = 5 * time.Minute
14+
15+
// reassemblerMaxInFlight is the maximum number of events held for reassembly
16+
reassemblerMaxInFlight = 50
17+
18+
// reassemblerTimeout is how long to wait for related messages before flushing
19+
reassemblerTimeout = 2 * time.Second
20+
21+
// maintainInterval is how often to run Reassembler.Maintain() to flush stale events
22+
maintainInterval = 500 * time.Millisecond
23+
)
Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
//go:build linux
2+
// +build linux
3+
4+
package auditd
5+
6+
import (
7+
"context"
8+
"os"
9+
"sync"
10+
"time"
11+
12+
libaudit "github.com/elastic/go-libaudit/v2"
13+
"github.com/elastic/go-libaudit/v2/auparse"
14+
"github.com/threatwinds/go-sdk/plugins"
15+
"github.com/utmstack/UTMStack/agent/utils"
16+
)
17+
18+
// AuditdCollector collects Linux Audit events via netlink multicast
19+
type AuditdCollector struct {
20+
client auditReceiver
21+
reassembler *libaudit.Reassembler
22+
cancel context.CancelFunc
23+
mu sync.Mutex
24+
}
25+
26+
// New creates a new AuditdCollector
27+
func New() *AuditdCollector {
28+
return &AuditdCollector{}
29+
}
30+
31+
// Name returns the collector name
32+
func (a *AuditdCollector) Name() string {
33+
return "auditd"
34+
}
35+
36+
// Start begins collecting audit events and sending them to the queue
37+
func (a *AuditdCollector) Start(ctx context.Context, queue chan *plugins.Log) {
38+
// Preflight check for audit capability
39+
if err := checkAuditCapability(); err != nil {
40+
utils.Logger.ErrorF("auditd: preflight check failed: %v", err)
41+
return
42+
}
43+
44+
host, err := os.Hostname()
45+
if err != nil {
46+
utils.Logger.ErrorF("auditd: error getting hostname: %v", err)
47+
host = "unknown"
48+
}
49+
50+
restartDelay := auditdRestartDelay
51+
52+
for {
53+
select {
54+
case <-ctx.Done():
55+
utils.Logger.Info("auditd collector stopping due to context cancellation")
56+
return
57+
default:
58+
}
59+
60+
exitCode := a.runAuditClient(ctx, host, queue)
61+
62+
if exitCode == 0 {
63+
utils.Logger.Info("auditd client exited normally")
64+
} else {
65+
utils.Logger.ErrorF("auditd client exited with code %d, restarting in %v", exitCode, restartDelay)
66+
}
67+
68+
select {
69+
case <-ctx.Done():
70+
return
71+
case <-time.After(restartDelay):
72+
}
73+
74+
// Exponential backoff
75+
restartDelay *= 2
76+
if restartDelay > auditdMaxRestartDelay {
77+
restartDelay = auditdMaxRestartDelay
78+
}
79+
}
80+
}
81+
82+
// runAuditClient creates the audit client and runs the receive loop
83+
func (a *AuditdCollector) runAuditClient(ctx context.Context, host string, queue chan *plugins.Log) int {
84+
a.mu.Lock()
85+
clientCtx, cancel := context.WithCancel(ctx)
86+
a.cancel = cancel
87+
88+
// Create multicast audit client
89+
client, err := newAuditClient()
90+
if err != nil {
91+
a.mu.Unlock()
92+
utils.Logger.ErrorF("auditd: error creating audit client: %v", err)
93+
return -1
94+
}
95+
a.client = client
96+
97+
// Create event stream for reassembled events
98+
stream := newEventStream(queue, host)
99+
100+
// Create reassembler
101+
reassembler, err := libaudit.NewReassembler(reassemblerMaxInFlight, reassemblerTimeout, stream)
102+
if err != nil {
103+
client.Close()
104+
a.mu.Unlock()
105+
utils.Logger.ErrorF("auditd: error creating reassembler: %v", err)
106+
return -1
107+
}
108+
a.reassembler = reassembler
109+
a.mu.Unlock()
110+
111+
utils.Logger.Info("auditd collector started (netlink multicast)")
112+
113+
// Start maintenance goroutine for reassembler
114+
go a.runMaintenance(clientCtx)
115+
116+
// Main receive loop
117+
for {
118+
select {
119+
case <-clientCtx.Done():
120+
a.cleanup()
121+
return 0
122+
default:
123+
}
124+
125+
// Receive with non-blocking to allow checking context
126+
msg, err := client.Receive(false)
127+
if err != nil {
128+
utils.Logger.ErrorF("auditd: error receiving message: %v", err)
129+
a.cleanup()
130+
return -1
131+
}
132+
133+
if msg == nil {
134+
// No message available, brief sleep to avoid busy loop
135+
time.Sleep(10 * time.Millisecond)
136+
continue
137+
}
138+
139+
// Parse message type from raw data
140+
msgType := auparse.AuditMessageType(msg.Type)
141+
142+
// Push to reassembler for event grouping
143+
if err := reassembler.Push(msgType, msg.Data); err != nil {
144+
utils.Logger.ErrorF("auditd: error pushing to reassembler: %v", err)
145+
}
146+
}
147+
}
148+
149+
// runMaintenance periodically calls Maintain() to flush stale events
150+
func (a *AuditdCollector) runMaintenance(ctx context.Context) {
151+
ticker := time.NewTicker(maintainInterval)
152+
defer ticker.Stop()
153+
154+
for {
155+
select {
156+
case <-ctx.Done():
157+
return
158+
case <-ticker.C:
159+
a.mu.Lock()
160+
if a.reassembler != nil {
161+
if err := a.reassembler.Maintain(); err != nil {
162+
utils.Logger.ErrorF("auditd: error in reassembler maintenance: %v", err)
163+
}
164+
}
165+
a.mu.Unlock()
166+
}
167+
}
168+
}
169+
170+
// cleanup closes the client and reassembler
171+
func (a *AuditdCollector) cleanup() {
172+
a.mu.Lock()
173+
defer a.mu.Unlock()
174+
175+
if a.reassembler != nil {
176+
a.reassembler.Close()
177+
a.reassembler = nil
178+
}
179+
if a.client != nil {
180+
a.client.Close()
181+
a.client = nil
182+
}
183+
}
184+
185+
// Stop stops the collector
186+
func (a *AuditdCollector) Stop() {
187+
a.mu.Lock()
188+
defer a.mu.Unlock()
189+
190+
if a.cancel != nil {
191+
a.cancel()
192+
}
193+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
//go:build !linux
2+
// +build !linux
3+
4+
package auditd
5+
6+
import (
7+
"context"
8+
9+
"github.com/threatwinds/go-sdk/plugins"
10+
"github.com/utmstack/UTMStack/agent/utils"
11+
)
12+
13+
// AuditdCollector is a no-op stub for non-Linux platforms
14+
type AuditdCollector struct{}
15+
16+
// New creates a new AuditdCollector (no-op on non-Linux)
17+
func New() *AuditdCollector {
18+
return &AuditdCollector{}
19+
}
20+
21+
// Name returns the collector name
22+
func (a *AuditdCollector) Name() string {
23+
return "auditd"
24+
}
25+
26+
// Start is a no-op on non-Linux platforms
27+
func (a *AuditdCollector) Start(ctx context.Context, queue chan *plugins.Log) {
28+
utils.Logger.Info("auditd collector not supported on this platform, skipping")
29+
}
30+
31+
// Stop is a no-op on non-Linux platforms
32+
func (a *AuditdCollector) Stop() {}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
//go:build linux
2+
// +build linux
3+
4+
package auditd
5+
6+
import (
7+
"os/exec"
8+
"strings"
9+
10+
"github.com/utmstack/UTMStack/agent/utils"
11+
)
12+
13+
// checkAuditCapability checks if the audit system is available and enabled.
14+
// Uses auditctl -s to verify audit status since /proc/sys/kernel/auditing
15+
// doesn't exist on all kernel versions.
16+
func checkAuditCapability() error {
17+
// Check if auditctl exists
18+
auditctlPath, err := exec.LookPath("auditctl")
19+
if err != nil {
20+
utils.Logger.ErrorF("auditd: auditctl not found in PATH: %v", err)
21+
return err
22+
}
23+
24+
// Run auditctl -s to check audit status
25+
cmd := exec.Command(auditctlPath, "-s")
26+
output, err := cmd.Output()
27+
if err != nil {
28+
utils.Logger.ErrorF("auditd: failed to run auditctl -s: %v", err)
29+
return err
30+
}
31+
32+
// Check if enabled=1 in output
33+
if !strings.Contains(string(output), "enabled 1") && !strings.Contains(string(output), "enabled=1") {
34+
utils.Logger.Info("auditd: kernel auditing is disabled (enabled != 1), collector will not start")
35+
return nil
36+
}
37+
38+
utils.Logger.Info("auditd: audit system is enabled and ready")
39+
return nil
40+
}

agent/collector/auditd/client.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
//go:build linux
2+
// +build linux
3+
4+
package auditd
5+
6+
import (
7+
libaudit "github.com/elastic/go-libaudit/v2"
8+
)
9+
10+
// auditReceiver interface wraps the go-libaudit client for testability
11+
type auditReceiver interface {
12+
Receive(nonBlocking bool) (*libaudit.RawAuditMessage, error)
13+
Close() error
14+
}
15+
16+
// auditClientWrapper wraps the go-libaudit AuditClient to implement auditReceiver
17+
type auditClientWrapper struct {
18+
client *libaudit.AuditClient
19+
}
20+
21+
// Receive receives a raw audit message from the netlink socket
22+
func (w *auditClientWrapper) Receive(nonBlocking bool) (*libaudit.RawAuditMessage, error) {
23+
return w.client.Receive(nonBlocking)
24+
}
25+
26+
// Close closes the underlying audit client
27+
func (w *auditClientWrapper) Close() error {
28+
return w.client.Close()
29+
}
30+
31+
// newAuditClient creates a new multicast audit client wrapped in the auditReceiver interface
32+
func newAuditClient() (auditReceiver, error) {
33+
client, err := libaudit.NewMulticastAuditClient(nil)
34+
if err != nil {
35+
return nil, err
36+
}
37+
return &auditClientWrapper{client: client}, nil
38+
}

0 commit comments

Comments
 (0)