Skip to content

Commit 26abf45

Browse files
committed
fix(agent): VTEPTable +listType=map + optimistic retry in registerVTEP
Change VTEPTable list type annotation from +listType=atomic to +listType=map (key: vmIP) so concurrent status patches from multiple agent nodes are merged by the API server rather than last-write-wins. Wrap registerVTEP in retry.RetryOnConflict with optimistic lock (MergeFromWithOptimisticLock) so a conflict on the status patch causes a re-fetch and retry instead of a silent data loss. Regenerate CRDs; vtepTable now carries x-kubernetes-list-type: map and x-kubernetes-list-map-keys: [vmIP] in the schema. Add concurrent-write unit test that exercises two goroutines calling registerVTEP simultaneously and asserts both entries survive.
1 parent 2a4cb2a commit 26abf45

4 files changed

Lines changed: 90 additions & 32 deletions

File tree

api/v1alpha1/impnetwork_types.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,8 @@ type ImpNetworkStatus struct {
126126
// VTEPTable contains VTEP entries for cross-node VXLAN FDB population.
127127
// Each entry maps a VM's IP and MAC to the node IP hosting it.
128128
// +optional
129-
// +listType=atomic
129+
// +listType=map
130+
// +listMapKey=vmIP
130131
VTEPTable []VTEPEntry `json:"vtepTable,omitempty"`
131132
}
132133

config/crd/bases/imp.dev_impnetworks.yaml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,9 @@ spec:
256256
- vmMAC
257257
type: object
258258
type: array
259-
x-kubernetes-list-type: atomic
259+
x-kubernetes-list-map-keys:
260+
- vmIP
261+
x-kubernetes-list-type: map
260262
type: object
261263
type: object
262264
served: true

internal/agent/reconciler.go

Lines changed: 35 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010

1111
apierrors "k8s.io/apimachinery/pkg/api/errors"
1212
"k8s.io/apimachinery/pkg/runtime"
13+
"k8s.io/client-go/util/retry"
1314
ctrl "sigs.k8s.io/controller-runtime"
1415
"sigs.k8s.io/controller-runtime/pkg/client"
1516
logf "sigs.k8s.io/controller-runtime/pkg/log"
@@ -278,46 +279,50 @@ func (r *ImpVMReconciler) SetupWithManager(mgr ctrl.Manager) error {
278279
}
279280

280281
// registerVTEP adds or updates the VTEPEntry for vm in ImpNetwork.status.vtepTable.
282+
// It uses optimistic-lock retries to handle concurrent patches from multiple agents.
281283
func (r *ImpVMReconciler) registerVTEP(ctx context.Context, vm *impdevv1alpha1.ImpVM, vmIP, vmMAC string) error {
282-
var impNet impdevv1alpha1.ImpNetwork
283-
if err := r.Get(ctx, client.ObjectKey{
284-
Namespace: vm.Namespace,
285-
Name: vm.Spec.NetworkRef.Name,
286-
}, &impNet); err != nil {
287-
return err
288-
}
284+
return retry.RetryOnConflict(retry.DefaultBackoff, func() error {
285+
var impNet impdevv1alpha1.ImpNetwork
286+
if err := r.Get(ctx, client.ObjectKey{
287+
Namespace: vm.Namespace,
288+
Name: vm.Spec.NetworkRef.Name,
289+
}, &impNet); err != nil {
290+
return err
291+
}
289292

290-
// Check if an up-to-date entry already exists.
291-
for _, e := range impNet.Status.VTEPTable {
292-
if e.VMIP == vmIP && e.VMMAC == vmMAC && e.NodeIP == r.NodeIP {
293-
return nil // already registered
293+
// Check if an up-to-date entry already exists.
294+
for _, e := range impNet.Status.VTEPTable {
295+
if e.VMIP == vmIP && e.VMMAC == vmMAC && e.NodeIP == r.NodeIP {
296+
return nil // already registered
297+
}
294298
}
295-
}
296299

297-
base := impNet.DeepCopy()
300+
base := impNet.DeepCopy()
298301

299-
// Replace or append entry for this VM IP.
300-
found := false
301-
for i, e := range impNet.Status.VTEPTable {
302-
if e.VMIP == vmIP {
303-
impNet.Status.VTEPTable[i] = impdevv1alpha1.VTEPEntry{
302+
// Replace or append entry for this VM IP.
303+
found := false
304+
for i, e := range impNet.Status.VTEPTable {
305+
if e.VMIP == vmIP {
306+
impNet.Status.VTEPTable[i] = impdevv1alpha1.VTEPEntry{
307+
NodeIP: r.NodeIP,
308+
VMIP: vmIP,
309+
VMMAC: vmMAC,
310+
}
311+
found = true
312+
break
313+
}
314+
}
315+
if !found {
316+
impNet.Status.VTEPTable = append(impNet.Status.VTEPTable, impdevv1alpha1.VTEPEntry{
304317
NodeIP: r.NodeIP,
305318
VMIP: vmIP,
306319
VMMAC: vmMAC,
307-
}
308-
found = true
309-
break
320+
})
310321
}
311-
}
312-
if !found {
313-
impNet.Status.VTEPTable = append(impNet.Status.VTEPTable, impdevv1alpha1.VTEPEntry{
314-
NodeIP: r.NodeIP,
315-
VMIP: vmIP,
316-
VMMAC: vmMAC,
317-
})
318-
}
319322

320-
return r.Status().Patch(ctx, &impNet, client.MergeFrom(base))
323+
return r.Status().Patch(ctx, &impNet,
324+
client.MergeFromWithOptions(base, client.MergeFromWithOptimisticLock{}))
325+
})
321326
}
322327

323328
// deregisterVTEP removes the VTEPEntry for vm.Status.IP from ImpNetwork.status.vtepTable.

internal/agent/reconciler_test.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package agent
55
import (
66
"context"
77
"errors"
8+
"sync"
89
"time"
910

1011
. "github.com/onsi/ginkgo/v2"
@@ -499,6 +500,55 @@ var _ = Describe("ImpVM Agent: Running — lazy reattach on agent restart", func
499500
})
500501
})
501502

503+
var _ = Describe("ImpVM Agent: registerVTEP — concurrent writes", func() {
504+
ctx := context.Background()
505+
506+
It("preserves both entries when two goroutines register simultaneously", func() {
507+
impNet := &impdevv1alpha1.ImpNetwork{
508+
ObjectMeta: metav1.ObjectMeta{Name: "test-net-concurrent", Namespace: "default"},
509+
Spec: impdevv1alpha1.ImpNetworkSpec{Subnet: "10.200.0.0/24"},
510+
}
511+
Expect(k8sClient.Create(ctx, impNet)).To(Succeed())
512+
DeferCleanup(func() { k8sClient.Delete(ctx, impNet) }) //nolint:errcheck
513+
514+
r1 := &ImpVMReconciler{Client: k8sClient, NodeIP: "10.0.0.1"}
515+
r2 := &ImpVMReconciler{Client: k8sClient, NodeIP: "10.0.0.2"}
516+
517+
vm1 := &impdevv1alpha1.ImpVM{
518+
ObjectMeta: metav1.ObjectMeta{Name: "vm1-conc", Namespace: "default"},
519+
Spec: impdevv1alpha1.ImpVMSpec{NetworkRef: &impdevv1alpha1.LocalObjectRef{Name: "test-net-concurrent"}},
520+
}
521+
vm2 := &impdevv1alpha1.ImpVM{
522+
ObjectMeta: metav1.ObjectMeta{Name: "vm2-conc", Namespace: "default"},
523+
Spec: impdevv1alpha1.ImpVMSpec{NetworkRef: &impdevv1alpha1.LocalObjectRef{Name: "test-net-concurrent"}},
524+
}
525+
526+
var wg sync.WaitGroup
527+
var err1, err2 error
528+
wg.Add(2)
529+
go func() {
530+
defer wg.Done()
531+
err1 = r1.registerVTEP(ctx, vm1, "10.200.0.2", "02:00:00:00:00:01")
532+
}()
533+
go func() {
534+
defer wg.Done()
535+
err2 = r2.registerVTEP(ctx, vm2, "10.200.0.3", "02:00:00:00:00:02")
536+
}()
537+
wg.Wait()
538+
539+
Expect(err1).NotTo(HaveOccurred())
540+
Expect(err2).NotTo(HaveOccurred())
541+
542+
updated := &impdevv1alpha1.ImpNetwork{}
543+
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: "test-net-concurrent", Namespace: "default"}, updated)).To(Succeed())
544+
vmIPs := make([]string, 0, len(updated.Status.VTEPTable))
545+
for _, e := range updated.Status.VTEPTable {
546+
vmIPs = append(vmIPs, e.VMIP)
547+
}
548+
Expect(vmIPs).To(ConsistOf("10.200.0.2", "10.200.0.3"))
549+
})
550+
})
551+
502552
var _ = Describe("ImpVM Agent: handleTerminating driver.Stop error", func() {
503553
ctx := context.Background()
504554

0 commit comments

Comments
 (0)