Skip to content

Commit 8f2ed3f

Browse files
committed
feat(tracing): add agent reattach, vtep_register, fdb_sync, and snapshot spans
1 parent 6df7cde commit 8f2ed3f

2 files changed

Lines changed: 79 additions & 19 deletions

File tree

internal/agent/impvmsnapshot_reconciler.go

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,22 +9,26 @@ import (
99
"path/filepath"
1010
"time"
1111

12+
"go.opentelemetry.io/otel"
13+
"go.opentelemetry.io/otel/attribute"
14+
"go.opentelemetry.io/otel/trace"
1215
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1316
ctrl "sigs.k8s.io/controller-runtime"
1417
"sigs.k8s.io/controller-runtime/pkg/client"
1518
logf "sigs.k8s.io/controller-runtime/pkg/log"
1619

1720
impdevv1alpha1 "github.com/syscode-labs/imp/api/v1alpha1"
1821
"github.com/syscode-labs/imp/internal/agent/snapshot"
22+
"github.com/syscode-labs/imp/internal/tracing"
1923
)
2024

2125
const snapshotTempDirPrefix = "imp-snapshot-"
2226

23-
func (r *ImpVMSnapshotReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
27+
func (r *ImpVMSnapshotReconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, err error) {
2428
log := logf.FromContext(ctx)
2529

2630
snap := &impdevv1alpha1.ImpVMSnapshot{}
27-
if err := r.Get(ctx, req.NamespacedName, snap); err != nil {
31+
if err = r.Get(ctx, req.NamespacedName, snap); err != nil {
2832
return ctrl.Result{}, client.IgnoreNotFound(err)
2933
}
3034

@@ -40,7 +44,7 @@ func (r *ImpVMSnapshotReconciler) Reconcile(ctx context.Context, req ctrl.Reques
4044

4145
// Resolve source VM and verify it is on this node.
4246
vm := &impdevv1alpha1.ImpVM{}
43-
if err := r.Get(ctx, client.ObjectKey{
47+
if err = r.Get(ctx, client.ObjectKey{
4448
Namespace: snap.Spec.SourceVMNamespace,
4549
Name: snap.Spec.SourceVMName,
4650
}, vm); err != nil {
@@ -58,16 +62,27 @@ func (r *ImpVMSnapshotReconciler) Reconcile(ctx context.Context, req ctrl.Reques
5862
return ctrl.Result{}, nil
5963
}
6064

65+
ctx, span := otel.Tracer("imp.agent").Start(ctx, "agent.impvm.snapshot",
66+
trace.WithAttributes(
67+
attribute.String("vm.name", snap.Spec.SourceVMName),
68+
attribute.String("vm.namespace", snap.Spec.SourceVMNamespace),
69+
),
70+
)
71+
defer func() {
72+
tracing.RecordError(span, err)
73+
span.End()
74+
}()
75+
6176
// Mark Running.
6277
if snap.Status.Phase != "Running" {
6378
base := snap.DeepCopy()
6479
snap.Status.Phase = "Running"
65-
if err := r.Status().Patch(ctx, snap, client.MergeFrom(base)); err != nil {
80+
if err = r.Status().Patch(ctx, snap, client.MergeFrom(base)); err != nil {
6681
return ctrl.Result{}, err
6782
}
6883
}
6984

70-
result, termErr := r.executeSnapshot(ctx, snap, vm)
85+
execResult, termErr := r.executeSnapshot(ctx, snap, vm)
7186

7287
// Always set TerminatedAt — even on failure — after all cleanup is complete.
7388
now := metav1.Now()
@@ -80,9 +95,9 @@ func (r *ImpVMSnapshotReconciler) Reconcile(ctx context.Context, req ctrl.Reques
8095
snap.Status.Phase = "Succeeded"
8196
snap.Status.CompletedAt = &now
8297
if snap.Spec.Storage.Type == "oci-registry" {
83-
snap.Status.Digest = result.digest
98+
snap.Status.Digest = execResult.digest
8499
} else {
85-
snap.Status.SnapshotPath = result.path
100+
snap.Status.SnapshotPath = execResult.path
86101
}
87102
}
88103
return ctrl.Result{}, r.Status().Patch(ctx, snap, client.MergeFrom(base))

internal/agent/reconciler.go

Lines changed: 57 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"context"
77
"errors"
88
"net/http"
9+
"strconv"
910
"time"
1011

1112
"go.opentelemetry.io/otel"
@@ -169,12 +170,23 @@ func (r *ImpVMReconciler) handleScheduled(ctx context.Context, vm *impdevv1alpha
169170
// Register VTEP entry so the operator and other nodes know where this VM lives.
170171
if vm.Spec.NetworkRef != nil && state.IP != "" && r.NodeIP != "" {
171172
macAddr := network.MACAddr(vm.Namespace + "/" + vm.Name)
172-
if err := r.registerVTEP(ctx, vm, state.IP, macAddr); err != nil {
173-
log.Error(err, "registerVTEP failed — FDB sync may be incomplete")
174-
} else {
175-
// Sync local FDB now that this node has a VTEP entry.
176-
if err := r.syncFDB(ctx, vm); err != nil {
177-
log.Error(err, "syncFDB after registerVTEP failed")
173+
{
174+
vCtx, vSpan := otel.Tracer("imp.agent").Start(ctx, "agent.impvm.vtep_register",
175+
trace.WithAttributes(
176+
attribute.String("vm.name", vm.Name),
177+
attribute.String("vm.ip", state.IP),
178+
),
179+
)
180+
vtepErr := r.registerVTEP(vCtx, vm, state.IP, macAddr)
181+
tracing.RecordError(vSpan, vtepErr)
182+
vSpan.End()
183+
if vtepErr != nil {
184+
log.Error(vtepErr, "registerVTEP failed — FDB sync may be incomplete")
185+
} else {
186+
// Sync local FDB now that this node has a VTEP entry.
187+
if err := r.syncFDB(ctx, vm); err != nil {
188+
log.Error(err, "syncFDB after registerVTEP failed")
189+
}
178190
}
179191
}
180192
}
@@ -200,8 +212,18 @@ func (r *ImpVMReconciler) handleRunning(ctx context.Context, vm *impdevv1alpha1.
200212
// the Firecracker process is still alive (procs map may be empty after an
201213
// agent pod restart). If PID is alive, reattach and restore allocator state.
202214
if pid := vm.Status.RuntimePID; pid > 0 && r.Driver.IsAlive(pid) {
203-
if err := r.Driver.Reattach(ctx, vm); err != nil {
204-
log.Error(err, "Reattach failed — treating VM as dead")
215+
rCtx, rSpan := otel.Tracer("imp.agent").Start(ctx, "agent.impvm.reattach",
216+
trace.WithAttributes(
217+
attribute.String("vm.name", vm.Name),
218+
attribute.String("vm.namespace", vm.Namespace),
219+
attribute.String("vm.pid", strconv.FormatInt(pid, 10)),
220+
),
221+
)
222+
reattachErr := r.Driver.Reattach(rCtx, vm)
223+
if reattachErr != nil {
224+
tracing.RecordError(rSpan, reattachErr)
225+
rSpan.End()
226+
log.Error(reattachErr, "Reattach failed — treating VM as dead")
205227
} else {
206228
// Restore in-memory IP allocation so Release works correctly later.
207229
if r.Alloc != nil && vm.Spec.NetworkRef != nil && vm.Status.IP != "" {
@@ -211,12 +233,35 @@ func (r *ImpVMReconciler) handleRunning(ctx context.Context, vm *impdevv1alpha1.
211233
// Re-publish VTEP entry and sync FDB in case they were lost.
212234
if vm.Spec.NetworkRef != nil && vm.Status.IP != "" && r.NodeIP != "" {
213235
macAddr := network.MACAddr(vm.Namespace + "/" + vm.Name)
214-
if err := r.registerVTEP(ctx, vm, vm.Status.IP, macAddr); err != nil {
215-
log.Error(err, "registerVTEP after reattach failed")
216-
} else if err := r.syncFDB(ctx, vm); err != nil {
217-
log.Error(err, "syncFDB after reattach failed")
236+
{
237+
vCtx, vSpan := otel.Tracer("imp.agent").Start(rCtx, "agent.impvm.vtep_register",
238+
trace.WithAttributes(
239+
attribute.String("vm.name", vm.Name),
240+
attribute.String("vm.ip", vm.Status.IP),
241+
),
242+
)
243+
vtepErr := r.registerVTEP(vCtx, vm, vm.Status.IP, macAddr)
244+
tracing.RecordError(vSpan, vtepErr)
245+
vSpan.End()
246+
if vtepErr != nil {
247+
log.Error(vtepErr, "registerVTEP after reattach failed")
248+
} else {
249+
fCtx, fSpan := otel.Tracer("imp.agent").Start(rCtx, "agent.impvm.fdb_sync",
250+
trace.WithAttributes(
251+
attribute.String("vm.name", vm.Name),
252+
attribute.String("net.name", vm.Spec.NetworkRef.Name),
253+
),
254+
)
255+
fdbErr := r.syncFDB(fCtx, vm)
256+
tracing.RecordError(fSpan, fdbErr)
257+
fSpan.End()
258+
if fdbErr != nil {
259+
log.Error(fdbErr, "syncFDB after reattach failed")
260+
}
261+
}
218262
}
219263
}
264+
rSpan.End()
220265
log.Info("VM reattached after agent restart", "pid", pid)
221266
return ctrl.Result{}, nil
222267
}

0 commit comments

Comments
 (0)