Skip to content

Commit 22d8fb6

Browse files
committed
feat(agent): lazy reattach on restart — restore procs + IP allocation + VTEP
1 parent 10fe606 commit 22d8fb6

4 files changed

Lines changed: 159 additions & 6 deletions

File tree

cmd/agent/main.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,11 @@ func main() {
9797
log.Info("Using FirecrackerDriver")
9898
}
9999

100+
var alloc *network.Allocator
101+
if fcDrv, ok := driver.(*agent.FirecrackerDriver); ok {
102+
alloc = fcDrv.Alloc
103+
}
104+
100105
if err := (&agent.ImpVMReconciler{
101106
Client: mgr.GetClient(),
102107
Scheme: mgr.GetScheme(),
@@ -105,6 +110,7 @@ func main() {
105110
Driver: driver,
106111
Metrics: mc,
107112
Net: prodNet,
113+
Alloc: alloc,
108114
}).SetupWithManager(mgr); err != nil {
109115
log.Error(err, "Unable to set up ImpVMReconciler")
110116
os.Exit(1)

internal/agent/firecracker_driver.go

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -642,12 +642,32 @@ func (d *FirecrackerDriver) applySnapshotBoot(ctx context.Context, vm *impdevv1a
642642
return nil
643643
}
644644

645-
// IsAlive is a placeholder — real implementation in Task 2.
646-
func (d *FirecrackerDriver) IsAlive(_ int64) bool {
647-
return false
645+
// IsAlive reports whether the process with the given PID is still running.
646+
// Uses kill(pid, 0) — succeeds if the process exists (even if zombie).
647+
func (d *FirecrackerDriver) IsAlive(pid int64) bool {
648+
p, err := os.FindProcess(int(pid))
649+
if err != nil {
650+
return false
651+
}
652+
return p.Signal(syscall.Signal(0)) == nil
648653
}
649654

650-
// Reattach is a placeholder — real implementation in Task 2.
651-
func (d *FirecrackerDriver) Reattach(_ context.Context, _ *impdevv1alpha1.ImpVM) error {
652-
return fmt.Errorf("not implemented")
655+
// Reattach re-registers an already-running Firecracker VM into the driver's
656+
// procs map without launching a new process. Called after an agent restart when
657+
// the Firecracker process survived but the in-memory procs map was lost.
658+
// Returns an error if the VM's API socket is not present on disk (which would
659+
// mean the PID has been reused by a different process).
660+
func (d *FirecrackerDriver) Reattach(_ context.Context, vm *impdevv1alpha1.ImpVM) error {
661+
sock := d.socketPath(vm)
662+
if _, err := os.Stat(sock); err != nil {
663+
return fmt.Errorf("reattach %s: socket %s not found — PID may be reused: %w",
664+
vmKey(vm), sock, err)
665+
}
666+
d.mu.Lock()
667+
d.procs[vmKey(vm)] = &fcProc{
668+
pid: vm.Status.RuntimePID,
669+
socket: sock,
670+
}
671+
d.mu.Unlock()
672+
return nil
653673
}

internal/agent/reconciler.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@ type ImpVMReconciler struct {
3131
Metrics *VMMetricsCollector
3232
// Net is optional. When non-nil, used for VXLAN/FDB operations after VTEP sync.
3333
Net network.NetManager
34+
// Alloc is the in-memory IP allocator. When non-nil, Reserve is called during
35+
// lazy reattach to restore IP state after agent restart.
36+
Alloc *network.Allocator
3437
}
3538

3639
// +kubebuilder:rbac:groups=imp.dev,resources=impvms,verbs=get;list;watch;update;patch
@@ -139,6 +142,32 @@ func (r *ImpVMReconciler) handleRunning(ctx context.Context, vm *impdevv1alpha1.
139142
return ctrl.Result{}, nil // watch-driven steady state
140143
}
141144

145+
// Inspect returned Running=false. Before declaring the VM dead, check whether
146+
// the Firecracker process is still alive (procs map may be empty after an
147+
// agent pod restart). If PID is alive, reattach and restore allocator state.
148+
if pid := vm.Status.RuntimePID; pid > 0 && r.Driver.IsAlive(pid) {
149+
if err := r.Driver.Reattach(ctx, vm); err != nil {
150+
log.Error(err, "Reattach failed — treating VM as dead")
151+
} else {
152+
// Restore in-memory IP allocation so Release works correctly later.
153+
if r.Alloc != nil && vm.Spec.NetworkRef != nil && vm.Status.IP != "" {
154+
netKey := vm.Namespace + "/" + vm.Spec.NetworkRef.Name
155+
r.Alloc.Reserve(netKey, vm.Status.IP)
156+
}
157+
// Re-publish VTEP entry and sync FDB in case they were lost.
158+
if vm.Spec.NetworkRef != nil && vm.Status.IP != "" && r.NodeIP != "" {
159+
macAddr := network.MACAddr(vm.Namespace + "/" + vm.Name)
160+
if err := r.registerVTEP(ctx, vm, vm.Status.IP, macAddr); err != nil {
161+
log.Error(err, "registerVTEP after reattach failed")
162+
} else if err := r.syncFDB(ctx, vm); err != nil {
163+
log.Error(err, "syncFDB after reattach failed")
164+
}
165+
}
166+
log.Info("VM reattached after agent restart", "pid", pid)
167+
return ctrl.Result{}, nil
168+
}
169+
}
170+
142171
log.Info("VM process exited", "lifecycle", vm.Spec.Lifecycle)
143172
if vm.Spec.Lifecycle == impdevv1alpha1.VMLifecycleEphemeral {
144173
return r.finishSucceeded(ctx, vm)

internal/agent/reconciler_test.go

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -401,6 +401,104 @@ var _ = Describe("ImpVM Agent: Reconcile non-existent VM (NotFound)", func() {
401401
})
402402
})
403403

404+
var _ = Describe("ImpVM Agent: Running — lazy reattach on agent restart", func() {
405+
ctx := context.Background()
406+
407+
It("reattaches and stays Running when PID is alive", func() {
408+
driver := NewStubDriver()
409+
driver.IsAliveResult = true
410+
411+
vm := &impdevv1alpha1.ImpVM{
412+
ObjectMeta: metav1.ObjectMeta{
413+
Name: "tc-reattach-alive", Namespace: "default",
414+
Finalizers: []string{"imp/finalizer"},
415+
},
416+
Spec: impdevv1alpha1.ImpVMSpec{NodeName: testNode},
417+
}
418+
Expect(k8sClient.Create(ctx, vm)).To(Succeed())
419+
DeferCleanup(func() { k8sClient.Delete(ctx, vm) }) //nolint:errcheck
420+
421+
base := vm.DeepCopy()
422+
vm.Status.Phase = impdevv1alpha1.VMPhaseRunning
423+
vm.Status.RuntimePID = 99999
424+
Expect(k8sClient.Status().Patch(ctx, vm, client.MergeFrom(base))).To(Succeed())
425+
426+
// Inspect returns Running=false (no entry in states map — simulates restart)
427+
// IsAliveResult=true means the PID check passes
428+
429+
_, err := newReconciler(driver).Reconcile(ctx, reconcile.Request{
430+
NamespacedName: types.NamespacedName{Name: "tc-reattach-alive", Namespace: "default"},
431+
})
432+
Expect(err).NotTo(HaveOccurred())
433+
434+
updated := &impdevv1alpha1.ImpVM{}
435+
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: "tc-reattach-alive", Namespace: "default"}, updated)).To(Succeed())
436+
Expect(updated.Status.Phase).To(Equal(impdevv1alpha1.VMPhaseRunning))
437+
Expect(driver.ReattachCalls).To(HaveLen(1))
438+
Expect(driver.ReattachCalls[0]).To(Equal("default/tc-reattach-alive"))
439+
})
440+
441+
It("transitions to Failed when PID is dead", func() {
442+
driver := NewStubDriver()
443+
driver.IsAliveResult = false
444+
445+
vm := &impdevv1alpha1.ImpVM{
446+
ObjectMeta: metav1.ObjectMeta{
447+
Name: "tc-reattach-dead", Namespace: "default",
448+
Finalizers: []string{"imp/finalizer"},
449+
},
450+
Spec: impdevv1alpha1.ImpVMSpec{NodeName: testNode},
451+
}
452+
Expect(k8sClient.Create(ctx, vm)).To(Succeed())
453+
DeferCleanup(func() { k8sClient.Delete(ctx, vm) }) //nolint:errcheck
454+
455+
base := vm.DeepCopy()
456+
vm.Status.Phase = impdevv1alpha1.VMPhaseRunning
457+
vm.Status.RuntimePID = 99999
458+
Expect(k8sClient.Status().Patch(ctx, vm, client.MergeFrom(base))).To(Succeed())
459+
460+
_, err := newReconciler(driver).Reconcile(ctx, reconcile.Request{
461+
NamespacedName: types.NamespacedName{Name: "tc-reattach-dead", Namespace: "default"},
462+
})
463+
Expect(err).NotTo(HaveOccurred())
464+
465+
updated := &impdevv1alpha1.ImpVM{}
466+
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: "tc-reattach-dead", Namespace: "default"}, updated)).To(Succeed())
467+
Expect(updated.Status.Phase).To(Equal(impdevv1alpha1.VMPhaseFailed))
468+
Expect(driver.ReattachCalls).To(BeEmpty())
469+
})
470+
471+
It("transitions to Failed when RuntimePID is zero", func() {
472+
driver := NewStubDriver()
473+
driver.IsAliveResult = true // shouldn't matter — PID is 0
474+
475+
vm := &impdevv1alpha1.ImpVM{
476+
ObjectMeta: metav1.ObjectMeta{
477+
Name: "tc-reattach-nopid", Namespace: "default",
478+
Finalizers: []string{"imp/finalizer"},
479+
},
480+
Spec: impdevv1alpha1.ImpVMSpec{NodeName: testNode},
481+
}
482+
Expect(k8sClient.Create(ctx, vm)).To(Succeed())
483+
DeferCleanup(func() { k8sClient.Delete(ctx, vm) }) //nolint:errcheck
484+
485+
base := vm.DeepCopy()
486+
vm.Status.Phase = impdevv1alpha1.VMPhaseRunning
487+
vm.Status.RuntimePID = 0
488+
Expect(k8sClient.Status().Patch(ctx, vm, client.MergeFrom(base))).To(Succeed())
489+
490+
_, err := newReconciler(driver).Reconcile(ctx, reconcile.Request{
491+
NamespacedName: types.NamespacedName{Name: "tc-reattach-nopid", Namespace: "default"},
492+
})
493+
Expect(err).NotTo(HaveOccurred())
494+
495+
updated := &impdevv1alpha1.ImpVM{}
496+
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: "tc-reattach-nopid", Namespace: "default"}, updated)).To(Succeed())
497+
Expect(updated.Status.Phase).To(Equal(impdevv1alpha1.VMPhaseFailed))
498+
Expect(driver.ReattachCalls).To(BeEmpty())
499+
})
500+
})
501+
404502
var _ = Describe("ImpVM Agent: handleTerminating driver.Stop error", func() {
405503
ctx := context.Background()
406504

0 commit comments

Comments
 (0)