Skip to content

Commit 372021f

Browse files
committed
feat(agent): finish vxlan sync path, add OTEL traces, and cilium IPAM delegation
1 parent 0453122 commit 372021f

12 files changed

Lines changed: 522 additions & 31 deletions

cmd/agent/main.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,12 @@ func main() {
7171
os.Exit(1)
7272
}
7373
defer func() { _ = shutdownTelemetry(context.Background()) }()
74+
_, shutdownTraces, err := telemetry.SetupTracerProvider(context.Background())
75+
if err != nil {
76+
log.Error(err, "unable to set up traces")
77+
os.Exit(1)
78+
}
79+
defer func() { _ = shutdownTraces(context.Background()) }()
7480
mc := agent.NewVMMetricsCollector(mp.Meter("imp.agent"), agentReg)
7581

7682
// IMP_STUB_DRIVER=true: StubDriver (CI, test clusters, no KVM needed).
@@ -113,6 +119,17 @@ func main() {
113119
os.Exit(1)
114120
}
115121

122+
if err := (&agent.ImpNetworkReconciler{
123+
Client: mgr.GetClient(),
124+
Scheme: mgr.GetScheme(),
125+
NodeName: nodeName,
126+
NodeIP: nodeIP,
127+
Net: prodNet,
128+
}).SetupWithManager(mgr); err != nil {
129+
log.Error(err, "unable to create controller", "controller", "ImpNetwork(agent)")
130+
os.Exit(1)
131+
}
132+
116133
if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
117134
log.Error(err, "Unable to set up health check")
118135
os.Exit(1)

cmd/operator/main.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,12 @@ func main() {
133133
os.Exit(1)
134134
}
135135
defer func() { _ = shutdownTelemetry(context.Background()) }()
136+
_, shutdownTraces, err := telemetry.SetupTracerProvider(context.Background())
137+
if err != nil {
138+
setupLog.Error(err, "unable to set up traces")
139+
os.Exit(1)
140+
}
141+
defer func() { _ = shutdownTraces(context.Background()) }()
136142
controller.InitMetrics(mp.Meter("imp.controller"))
137143

138144
cfg := ctrl.GetConfigOrDie()

go.mod

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,18 @@ require (
1111
github.com/onsi/ginkgo/v2 v2.27.2
1212
github.com/onsi/gomega v1.38.2
1313
github.com/prometheus/client_golang v1.23.2
14-
github.com/prometheus/client_model v0.6.2
1514
github.com/robfig/cron/v3 v3.0.1
1615
github.com/stretchr/testify v1.11.1
1716
github.com/vishvananda/netlink v1.1.1-0.20210330154013-f5de75959ad5
1817
github.com/xanzy/go-gitlab v0.115.0
18+
go.opentelemetry.io/otel v1.42.0
1919
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.42.0
2020
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.42.0
21+
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.42.0
22+
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.42.0
2123
go.opentelemetry.io/otel/exporters/prometheus v0.64.0
24+
go.opentelemetry.io/otel/metric v1.42.0
25+
go.opentelemetry.io/otel/sdk v1.42.0
2226
go.opentelemetry.io/otel/sdk/metric v1.42.0
2327
golang.org/x/oauth2 v0.35.0
2428
google.golang.org/grpc v1.79.2
@@ -88,6 +92,7 @@ require (
8892
github.com/opentracing/opentracing-go v1.2.0 // indirect
8993
github.com/pkg/errors v0.9.1 // indirect
9094
github.com/pmezard/go-difflib v1.0.0 // indirect
95+
github.com/prometheus/client_model v0.6.2 // indirect
9196
github.com/prometheus/common v0.67.5 // indirect
9297
github.com/prometheus/otlptranslator v1.0.0 // indirect
9398
github.com/prometheus/procfs v0.19.2 // indirect
@@ -98,9 +103,7 @@ require (
98103
github.com/x448/float16 v0.8.4 // indirect
99104
go.mongodb.org/mongo-driver v1.8.3 // indirect
100105
go.opentelemetry.io/auto/sdk v1.2.1 // indirect
101-
go.opentelemetry.io/otel v1.42.0 // indirect
102-
go.opentelemetry.io/otel/metric v1.42.0 // indirect
103-
go.opentelemetry.io/otel/sdk v1.42.0 // indirect
106+
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.42.0 // indirect
104107
go.opentelemetry.io/otel/trace v1.42.0 // indirect
105108
go.opentelemetry.io/proto/otlp v1.9.0 // indirect
106109
go.uber.org/multierr v1.11.0 // indirect

go.sum

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -849,6 +849,12 @@ go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.42.0 h1:MdK
849849
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.42.0/go.mod h1:RolT8tWtfHcjajEH5wFIZ4Dgh5jpPdFXYV9pTAk/qjc=
850850
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.42.0 h1:H7O6RlGOMTizyl3R08Kn5pdM06bnH8oscSj7o11tmLA=
851851
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.42.0/go.mod h1:mBFWu/WOVDkWWsR7Tx7h6EpQB8wsv7P0Yrh0Pb7othc=
852+
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.42.0 h1:THuZiwpQZuHPul65w4WcwEnkX2QIuMT+UFoOrygtoJw=
853+
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.42.0/go.mod h1:J2pvYM5NGHofZ2/Ru6zw/TNWnEQp5crgyDeSrYpXkAw=
854+
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.42.0 h1:zWWrB1U6nqhS/k6zYB74CjRpuiitRtLLi68VcgmOEto=
855+
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.42.0/go.mod h1:2qXPNBX1OVRC0IwOnfo1ljoid+RD0QK3443EaqVlsOU=
856+
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.42.0 h1:uLXP+3mghfMf7XmV4PkGfFhFKuNWoCvvx5wP/wOXo0o=
857+
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.42.0/go.mod h1:v0Tj04armyT59mnURNUJf7RCKcKzq+lgJs6QSjHjaTc=
852858
go.opentelemetry.io/otel/exporters/prometheus v0.64.0 h1:g0LRDXMX/G1SEZtK8zl8Chm4K6GBwRkjPKE36LxiTYs=
853859
go.opentelemetry.io/otel/exporters/prometheus v0.64.0/go.mod h1:UrgcjnarfdlBDP3GjDIJWe6HTprwSazNjwsI+Ru6hro=
854860
go.opentelemetry.io/otel/metric v1.42.0 h1:2jXG+3oZLNXEPfNmnpxKDeZsFI5o4J+nz6xUlaFdF/4=

internal/agent/firecracker_driver.go

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -430,9 +430,14 @@ func (d *FirecrackerDriver) setupNetwork(ctx context.Context, vm *impdevv1alpha1
430430
tapName := network.TAPName(vKey)
431431
macAddr := network.MACAddr(vKey)
432432

433-
_, cidr, err := gonet.ParseCIDR(impNet.Spec.Subnet)
433+
allocSubnet, err := resolveAllocationSubnet(ctx, d.Client, &impNet)
434434
if err != nil {
435-
return nil, fmt.Errorf("parse subnet %q: %w", impNet.Spec.Subnet, err)
435+
return nil, fmt.Errorf("resolve allocation subnet: %w", err)
436+
}
437+
438+
_, cidr, err := gonet.ParseCIDR(allocSubnet)
439+
if err != nil {
440+
return nil, fmt.Errorf("parse subnet %q: %w", allocSubnet, err)
436441
}
437442
prefixLen, _ := cidr.Mask.Size()
438443

@@ -445,7 +450,7 @@ func (d *FirecrackerDriver) setupNetwork(ctx context.Context, vm *impdevv1alpha1
445450
}
446451

447452
// Allocate VM IP.
448-
ip, err := d.Alloc.Allocate(netKey, impNet.Spec.Subnet, gateway)
453+
ip, err := d.Alloc.Allocate(netKey, allocSubnet, gateway)
449454
if err != nil {
450455
return nil, fmt.Errorf("allocate IP: %w", err)
451456
}
@@ -464,7 +469,7 @@ func (d *FirecrackerDriver) setupNetwork(ctx context.Context, vm *impdevv1alpha1
464469

465470
// Install NAT if requested (best-effort — don't block VM start on NAT failure).
466471
if impNet.Spec.NAT.Enabled {
467-
if natErr := d.Net.EnsureNAT(ctx, impNet.Spec.Subnet, impNet.Spec.NAT.EgressInterface); natErr != nil {
472+
if natErr := d.Net.EnsureNAT(ctx, allocSubnet, impNet.Spec.NAT.EgressInterface); natErr != nil {
468473
logf.FromContext(ctx).Error(natErr, "EnsureNAT failed — VM will start without NAT")
469474
}
470475
}
@@ -477,7 +482,7 @@ func (d *FirecrackerDriver) setupNetwork(ctx context.Context, vm *impdevv1alpha1
477482
PrefixLen: prefixLen,
478483
Gateway: gateway,
479484
DNS: impNet.Spec.DNS,
480-
Subnet: impNet.Spec.Subnet,
485+
Subnet: allocSubnet,
481486
NetworkKey: netKey,
482487
NATEnabled: impNet.Spec.NAT.Enabled,
483488
EgressInterface: impNet.Spec.NAT.EgressInterface,
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
//go:build linux
2+
3+
package agent
4+
5+
import (
6+
"context"
7+
8+
"k8s.io/apimachinery/pkg/runtime"
9+
ctrl "sigs.k8s.io/controller-runtime"
10+
"sigs.k8s.io/controller-runtime/pkg/client"
11+
logf "sigs.k8s.io/controller-runtime/pkg/log"
12+
13+
impdevv1alpha1 "github.com/syscode-labs/imp/api/v1alpha1"
14+
"github.com/syscode-labs/imp/internal/agent/network"
15+
)
16+
17+
// ImpNetworkReconciler watches ImpNetwork objects on behalf of the node agent.
18+
// When VTEPTable changes, it syncs the local VXLAN FDB for running local VMs.
19+
type ImpNetworkReconciler struct {
20+
client.Client
21+
Scheme *runtime.Scheme
22+
NodeName string
23+
NodeIP string
24+
Net network.NetManager
25+
}
26+
27+
// +kubebuilder:rbac:groups=imp.dev,resources=impnetworks,verbs=get;list;watch
28+
// +kubebuilder:rbac:groups=imp.dev,resources=impnetworks/status,verbs=get;update;patch
29+
30+
func (r *ImpNetworkReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
31+
var impNet impdevv1alpha1.ImpNetwork
32+
if err := r.Get(ctx, req.NamespacedName, &impNet); err != nil {
33+
return ctrl.Result{}, client.IgnoreNotFound(err)
34+
}
35+
36+
// Only sync FDB if this node has at least one running VM on this network.
37+
var vmList impdevv1alpha1.ImpVMList
38+
if err := r.List(ctx, &vmList, client.InNamespace(req.Namespace)); err != nil {
39+
return ctrl.Result{}, err
40+
}
41+
hasLocalVM := false
42+
for i := range vmList.Items {
43+
vm := &vmList.Items[i]
44+
if vm.Spec.NodeName == r.NodeName &&
45+
vm.Spec.NetworkRef != nil &&
46+
vm.Spec.NetworkRef.Name == impNet.Name &&
47+
vm.Status.Phase == impdevv1alpha1.VMPhaseRunning {
48+
hasLocalVM = true
49+
break
50+
}
51+
}
52+
if !hasLocalVM {
53+
return ctrl.Result{}, nil
54+
}
55+
56+
if err := syncNetworkFDB(ctx, &impNet, r.NodeIP, r.Net); err != nil {
57+
logf.FromContext(ctx).WithValues("node", r.NodeName).Error(err, "syncNetworkFDB failed")
58+
return ctrl.Result{}, err
59+
}
60+
return ctrl.Result{}, nil
61+
}
62+
63+
func (r *ImpNetworkReconciler) SetupWithManager(mgr ctrl.Manager) error {
64+
return ctrl.NewControllerManagedBy(mgr).
65+
For(&impdevv1alpha1.ImpNetwork{}).
66+
Named("agent-impnetwork").
67+
Complete(r)
68+
}
69+
70+
// syncNetworkFDB ensures VXLAN exists for the network and reconciles FDB with
71+
// remote (non-local) VTEP entries.
72+
func syncNetworkFDB(ctx context.Context, netObj *impdevv1alpha1.ImpNetwork, nodeIP string, mgr network.NetManager) error {
73+
if mgr == nil || nodeIP == "" {
74+
return nil
75+
}
76+
77+
netKey := netObj.Namespace + "/" + netObj.Name
78+
bridgeName := network.BridgeName(netKey)
79+
vni, ifaceName := network.VXLANParams(string(netObj.UID))
80+
81+
if err := mgr.EnsureVXLAN(ctx, vni, ifaceName, nodeIP, bridgeName); err != nil {
82+
return err
83+
}
84+
85+
var remoteEntries []network.FDBEntry
86+
for _, e := range netObj.Status.VTEPTable {
87+
if e.NodeIP == nodeIP {
88+
continue
89+
}
90+
remoteEntries = append(remoteEntries, network.FDBEntry{
91+
MAC: e.VMMAC,
92+
DstIP: e.NodeIP,
93+
})
94+
}
95+
return mgr.SyncFDB(ctx, ifaceName, remoteEntries)
96+
}
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
//go:build linux
2+
3+
package agent_test
4+
5+
import (
6+
"context"
7+
"testing"
8+
9+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
10+
"k8s.io/apimachinery/pkg/runtime"
11+
"k8s.io/apimachinery/pkg/types"
12+
ctrl "sigs.k8s.io/controller-runtime"
13+
"sigs.k8s.io/controller-runtime/pkg/client/fake"
14+
15+
impdevv1alpha1 "github.com/syscode-labs/imp/api/v1alpha1"
16+
"github.com/syscode-labs/imp/internal/agent"
17+
"github.com/syscode-labs/imp/internal/agent/network"
18+
)
19+
20+
func newImpNetworkScheme() *runtime.Scheme {
21+
s := runtime.NewScheme()
22+
_ = impdevv1alpha1.AddToScheme(s)
23+
return s
24+
}
25+
26+
func TestImpNetworkReconciler_noLocalVMs(t *testing.T) {
27+
impNet := &impdevv1alpha1.ImpNetwork{
28+
ObjectMeta: metav1.ObjectMeta{
29+
Name: "net1",
30+
Namespace: "default",
31+
UID: types.UID("uid-net1"),
32+
},
33+
Spec: impdevv1alpha1.ImpNetworkSpec{Subnet: "10.0.0.0/24"},
34+
}
35+
impNet.Status.VTEPTable = []impdevv1alpha1.VTEPEntry{
36+
{NodeIP: "192.168.1.2", VMIP: "10.0.0.5", VMMAC: "02:aa:bb:cc:dd:ee"},
37+
}
38+
39+
fakeClient := fake.NewClientBuilder().
40+
WithScheme(newImpNetworkScheme()).
41+
WithObjects(impNet).
42+
WithStatusSubresource(impNet).
43+
Build()
44+
45+
stub := &network.StubNetManager{}
46+
r := &agent.ImpNetworkReconciler{
47+
Client: fakeClient,
48+
NodeName: "node-a",
49+
NodeIP: "192.168.1.1",
50+
Net: stub,
51+
}
52+
53+
_, err := r.Reconcile(context.Background(), ctrl.Request{
54+
NamespacedName: types.NamespacedName{Name: "net1", Namespace: "default"},
55+
})
56+
if err != nil {
57+
t.Fatalf("unexpected error: %v", err)
58+
}
59+
if len(stub.EnsureVXLANCalls) != 0 {
60+
t.Errorf("expected no EnsureVXLAN calls (no local VMs), got %d", len(stub.EnsureVXLANCalls))
61+
}
62+
}
63+
64+
func TestImpNetworkReconciler_withLocalVM(t *testing.T) {
65+
impNet := &impdevv1alpha1.ImpNetwork{
66+
ObjectMeta: metav1.ObjectMeta{
67+
Name: "net1",
68+
Namespace: "default",
69+
UID: types.UID("uid-net1"),
70+
},
71+
Spec: impdevv1alpha1.ImpNetworkSpec{Subnet: "10.0.0.0/24"},
72+
}
73+
impNet.Status.VTEPTable = []impdevv1alpha1.VTEPEntry{
74+
{NodeIP: "192.168.1.1", VMIP: "10.0.0.2", VMMAC: "02:aa:00:00:00:01"},
75+
{NodeIP: "192.168.1.2", VMIP: "10.0.0.5", VMMAC: "02:bb:00:00:00:02"},
76+
}
77+
78+
localVM := &impdevv1alpha1.ImpVM{
79+
ObjectMeta: metav1.ObjectMeta{Name: "vm-local", Namespace: "default"},
80+
Spec: impdevv1alpha1.ImpVMSpec{
81+
NodeName: "node-a",
82+
NetworkRef: &impdevv1alpha1.NetworkRef{Name: "net1"},
83+
},
84+
}
85+
localVM.Status.Phase = impdevv1alpha1.VMPhaseRunning
86+
87+
fakeClient := fake.NewClientBuilder().
88+
WithScheme(newImpNetworkScheme()).
89+
WithObjects(impNet, localVM).
90+
WithStatusSubresource(impNet, localVM).
91+
Build()
92+
93+
stub := &network.StubNetManager{}
94+
r := &agent.ImpNetworkReconciler{
95+
Client: fakeClient,
96+
NodeName: "node-a",
97+
NodeIP: "192.168.1.1",
98+
Net: stub,
99+
}
100+
101+
_, err := r.Reconcile(context.Background(), ctrl.Request{
102+
NamespacedName: types.NamespacedName{Name: "net1", Namespace: "default"},
103+
})
104+
if err != nil {
105+
t.Fatalf("unexpected error: %v", err)
106+
}
107+
if len(stub.EnsureVXLANCalls) != 1 {
108+
t.Fatalf("expected 1 EnsureVXLAN call, got %d", len(stub.EnsureVXLANCalls))
109+
}
110+
if len(stub.SyncFDBCalls) != 1 {
111+
t.Fatalf("expected 1 SyncFDB call, got %d", len(stub.SyncFDBCalls))
112+
}
113+
}

0 commit comments

Comments
 (0)