Skip to content

Commit 744c7f3

Browse files
committed
feat(controller): ImpVMMigration skeleton — CPU-compatible node selection, node drain watcher
1 parent 71d8f1e commit 744c7f3

3 files changed

Lines changed: 224 additions & 0 deletions

File tree

cmd/operator/main.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,14 @@ func main() {
170170
os.Exit(1)
171171
}
172172

173+
if err = (&controller.ImpVMMigrationReconciler{
174+
Client: mgr.GetClient(),
175+
Scheme: mgr.GetScheme(),
176+
}).SetupWithManager(mgr); err != nil {
177+
setupLog.Error(err, "unable to create controller", "controller", "ImpVMMigration")
178+
os.Exit(1)
179+
}
180+
173181
if err = builder.WebhookManagedBy(mgr, &impv1alpha1.ImpVM{}).
174182
WithDefaulter(&webhookv1alpha1.ImpVMWebhook{}).
175183
WithValidator(&webhookv1alpha1.ImpVMWebhook{}).
Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
package controller
2+
3+
import (
4+
"context"
5+
6+
corev1 "k8s.io/api/core/v1"
7+
apierrors "k8s.io/apimachinery/pkg/api/errors"
8+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
9+
"k8s.io/apimachinery/pkg/runtime"
10+
ctrl "sigs.k8s.io/controller-runtime"
11+
"sigs.k8s.io/controller-runtime/pkg/client"
12+
"sigs.k8s.io/controller-runtime/pkg/handler"
13+
logf "sigs.k8s.io/controller-runtime/pkg/log"
14+
"sigs.k8s.io/controller-runtime/pkg/reconcile"
15+
16+
impv1alpha1 "github.com/syscode-labs/imp/api/v1alpha1"
17+
)
18+
19+
// ImpVMMigrationReconciler reconciles ImpVMMigration objects.
20+
type ImpVMMigrationReconciler struct {
21+
client.Client
22+
Scheme *runtime.Scheme
23+
}
24+
25+
// +kubebuilder:rbac:groups=imp.dev,resources=impvmmigrations,verbs=get;list;watch;create;update;patch;delete
26+
// +kubebuilder:rbac:groups=imp.dev,resources=impvmmigrations/status,verbs=get;update;patch
27+
// +kubebuilder:rbac:groups=imp.dev,resources=impvmmigrations/finalizers,verbs=update
28+
// +kubebuilder:rbac:groups=core,resources=nodes,verbs=get;list;watch
29+
30+
func (r *ImpVMMigrationReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
31+
log := logf.FromContext(ctx)
32+
33+
mig := &impv1alpha1.ImpVMMigration{}
34+
if err := r.Get(ctx, req.NamespacedName, mig); err != nil {
35+
return ctrl.Result{}, client.IgnoreNotFound(err)
36+
}
37+
38+
if mig.Status.Phase != "" {
39+
return ctrl.Result{}, nil // already initialised
40+
}
41+
42+
base := mig.DeepCopy()
43+
mig.Status.Phase = "Pending"
44+
45+
// Validate source VM exists
46+
vm := &impv1alpha1.ImpVM{}
47+
err := r.Get(ctx, client.ObjectKey{
48+
Namespace: mig.Spec.SourceVMNamespace, Name: mig.Spec.SourceVMName,
49+
}, vm)
50+
if apierrors.IsNotFound(err) {
51+
mig.Status.Phase = "Failed"
52+
mig.Status.Message = "source VM not found"
53+
} else if err != nil {
54+
return ctrl.Result{}, err
55+
} else if mig.Spec.TargetNode != "" {
56+
mig.Status.SelectedNode = mig.Spec.TargetNode
57+
} else {
58+
// CPU-compatible node selection
59+
selectedNode, selErr := r.selectMigrationTarget(ctx, vm)
60+
if selErr != nil {
61+
return ctrl.Result{}, selErr
62+
}
63+
if selectedNode == "" {
64+
mig.Status.Phase = "Failed"
65+
mig.Status.Message = "no CPU-compatible node available (NoCPUCompatibleNode)"
66+
} else {
67+
mig.Status.SelectedNode = selectedNode
68+
}
69+
}
70+
71+
if err := r.Status().Patch(ctx, mig, client.MergeFrom(base)); err != nil {
72+
return ctrl.Result{}, err
73+
}
74+
log.Info("ImpVMMigration initialised", "name", mig.Name, "phase", mig.Status.Phase,
75+
"targetNode", mig.Status.SelectedNode)
76+
77+
// TODO: Phase 2 impl: pause VM → snapshot → restore on target → delete source.
78+
79+
return ctrl.Result{}, nil
80+
}
81+
82+
// selectMigrationTarget finds a node with a CPU model compatible with the source VM's node.
83+
// Returns "" when no compatible node is found.
84+
func (r *ImpVMMigrationReconciler) selectMigrationTarget(ctx context.Context, vm *impv1alpha1.ImpVM) (string, error) {
85+
log := logf.FromContext(ctx)
86+
if vm.Spec.NodeName == "" {
87+
return "", nil
88+
}
89+
90+
// Get source node CPU model
91+
sourceProfile := &impv1alpha1.ClusterImpNodeProfile{}
92+
if err := r.Get(ctx, client.ObjectKey{Name: vm.Spec.NodeName}, sourceProfile); err != nil {
93+
log.V(1).Info("no ClusterImpNodeProfile for source node", "node", vm.Spec.NodeName)
94+
return "", nil
95+
}
96+
sourceCPU := sourceProfile.Spec.CPUModel
97+
98+
// Find all profiles, pick first with matching CPU model on a different node
99+
profiles := &impv1alpha1.ClusterImpNodeProfileList{}
100+
if err := r.List(ctx, profiles); err != nil {
101+
return "", err
102+
}
103+
for i := range profiles.Items {
104+
p := &profiles.Items[i]
105+
if p.Name == vm.Spec.NodeName {
106+
continue // skip source
107+
}
108+
if sourceCPU == "" || p.Spec.CPUModel == sourceCPU {
109+
log.V(1).Info("selected migration target", "node", p.Name, "cpuModel", p.Spec.CPUModel)
110+
return p.Name, nil
111+
}
112+
}
113+
return "", nil
114+
}
115+
116+
// nodeDrainMapper creates ImpVMMigration resources for all VMs on a node
117+
// when it is tainted as unschedulable (node drain).
118+
func (r *ImpVMMigrationReconciler) nodeDrainMapper(ctx context.Context, obj client.Object) []reconcile.Request {
119+
log := logf.FromContext(ctx)
120+
k8sNode := &corev1.Node{}
121+
if err := r.Get(ctx, client.ObjectKey{Name: obj.GetName()}, k8sNode); err != nil {
122+
return nil
123+
}
124+
// Only act on nodes with the unschedulable taint
125+
draining := false
126+
for _, t := range k8sNode.Spec.Taints {
127+
if t.Key == "node.kubernetes.io/unschedulable" {
128+
draining = true
129+
break
130+
}
131+
}
132+
if !draining {
133+
return nil
134+
}
135+
136+
vmList := &impv1alpha1.ImpVMList{}
137+
if err := r.List(ctx, vmList); err != nil {
138+
log.Error(err, "failed to list ImpVMs for drain migration")
139+
return nil
140+
}
141+
for i := range vmList.Items {
142+
vm := &vmList.Items[i]
143+
if vm.Spec.NodeName != obj.GetName() {
144+
continue
145+
}
146+
mig := &impv1alpha1.ImpVMMigration{
147+
ObjectMeta: metav1.ObjectMeta{
148+
Name: "drain-" + vm.Name,
149+
Namespace: vm.Namespace,
150+
},
151+
Spec: impv1alpha1.ImpVMMigrationSpec{
152+
SourceVMName: vm.Name,
153+
SourceVMNamespace: vm.Namespace,
154+
},
155+
}
156+
if err := r.Create(ctx, mig); err != nil && !apierrors.IsAlreadyExists(err) {
157+
log.Error(err, "failed to create drain migration", "vm", vm.Name)
158+
}
159+
}
160+
return nil // migrations trigger their own reconcile via Create events
161+
}
162+
163+
func (r *ImpVMMigrationReconciler) SetupWithManager(mgr ctrl.Manager) error {
164+
return ctrl.NewControllerManagedBy(mgr).
165+
For(&impv1alpha1.ImpVMMigration{}).
166+
Watches(&corev1.Node{},
167+
handler.EnqueueRequestsFromMapFunc(r.nodeDrainMapper)).
168+
Complete(r)
169+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package controller
2+
3+
import (
4+
"context"
5+
6+
. "github.com/onsi/ginkgo/v2"
7+
. "github.com/onsi/gomega"
8+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
9+
"k8s.io/apimachinery/pkg/types"
10+
"sigs.k8s.io/controller-runtime/pkg/reconcile"
11+
12+
impv1alpha1 "github.com/syscode-labs/imp/api/v1alpha1"
13+
)
14+
15+
func newMigrationReconciler() *ImpVMMigrationReconciler {
16+
return &ImpVMMigrationReconciler{
17+
Client: k8sClient,
18+
Scheme: k8sClient.Scheme(),
19+
}
20+
}
21+
22+
var _ = Describe("ImpVMMigration controller", func() {
23+
ctx := context.Background()
24+
25+
It("sets phase to Failed when source VM is missing", func() {
26+
mig := &impv1alpha1.ImpVMMigration{
27+
ObjectMeta: metav1.ObjectMeta{Name: "mig-ctrl-test", Namespace: "default"},
28+
Spec: impv1alpha1.ImpVMMigrationSpec{
29+
SourceVMName: "vm-that-does-not-exist",
30+
SourceVMNamespace: "default",
31+
},
32+
}
33+
Expect(k8sClient.Create(ctx, mig)).To(Succeed())
34+
DeferCleanup(func() { k8sClient.Delete(ctx, mig) }) //nolint:errcheck
35+
36+
r := newMigrationReconciler()
37+
_, err := r.Reconcile(ctx, reconcile.Request{
38+
NamespacedName: types.NamespacedName{Name: "mig-ctrl-test", Namespace: "default"},
39+
})
40+
Expect(err).NotTo(HaveOccurred())
41+
42+
updated := &impv1alpha1.ImpVMMigration{}
43+
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: "mig-ctrl-test", Namespace: "default"}, updated)).To(Succeed())
44+
Expect(updated.Status.Phase).To(Equal("Failed"))
45+
Expect(updated.Status.Message).To(ContainSubstring("source VM not found"))
46+
})
47+
})

0 commit comments

Comments
 (0)