Skip to content

Commit 5553f2a

Browse files
committed
feat(controller): capacity-aware scheduling using node allocatable CPU/memory
1 parent 8d0a866 commit 5553f2a

2 files changed

Lines changed: 275 additions & 13 deletions

File tree

Lines changed: 211 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,211 @@
1+
/*
2+
Copyright 2026.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package controller
18+
19+
import (
20+
"context"
21+
22+
. "github.com/onsi/ginkgo/v2"
23+
. "github.com/onsi/gomega"
24+
corev1 "k8s.io/api/core/v1"
25+
"k8s.io/apimachinery/pkg/api/resource"
26+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
27+
"k8s.io/apimachinery/pkg/types"
28+
"k8s.io/client-go/tools/record"
29+
"sigs.k8s.io/controller-runtime/pkg/client"
30+
"sigs.k8s.io/controller-runtime/pkg/reconcile"
31+
32+
impdevv1alpha1 "github.com/syscode-labs/imp/api/v1alpha1"
33+
)
34+
35+
// makeNode creates a node with imp/enabled=true and the given allocatable resources.
36+
func makeNode(ctx context.Context, name string, cpu, memory string) *corev1.Node {
37+
node := &corev1.Node{
38+
ObjectMeta: metav1.ObjectMeta{
39+
Name: name,
40+
Labels: map[string]string{labelImpEnabled: "true"},
41+
},
42+
}
43+
Expect(k8sClient.Create(ctx, node)).To(Succeed())
44+
DeferCleanup(func() { k8sClient.Delete(ctx, node) }) //nolint:errcheck
45+
46+
// Set allocatable resources on node status.
47+
patch := client.MergeFrom(node.DeepCopy())
48+
node.Status.Allocatable = corev1.ResourceList{
49+
corev1.ResourceCPU: resource.MustParse(cpu),
50+
corev1.ResourceMemory: resource.MustParse(memory),
51+
}
52+
Expect(k8sClient.Status().Patch(ctx, node, patch)).To(Succeed())
53+
return node
54+
}
55+
56+
// makeClass creates an ImpVMClass with the given vcpu/mem.
57+
func makeClass(ctx context.Context, name string, vcpu int32, memMiB int32) *impdevv1alpha1.ImpVMClass {
58+
class := &impdevv1alpha1.ImpVMClass{
59+
ObjectMeta: metav1.ObjectMeta{Name: name},
60+
Spec: impdevv1alpha1.ImpVMClassSpec{
61+
VCPU: vcpu,
62+
MemoryMiB: memMiB,
63+
DiskGiB: 10,
64+
},
65+
}
66+
Expect(k8sClient.Create(ctx, class)).To(Succeed())
67+
DeferCleanup(func() { k8sClient.Delete(ctx, class) }) //nolint:errcheck
68+
return class
69+
}
70+
71+
var _ = Describe("ImpVM Capacity Scheduler", func() {
72+
ctx := context.Background()
73+
74+
reconcileVM := func(name string) error {
75+
r := &ImpVMReconciler{
76+
Client: k8sClient,
77+
Scheme: k8sClient.Scheme(),
78+
Recorder: record.NewFakeRecorder(32),
79+
}
80+
// First reconcile: adds finalizer.
81+
_, err := r.Reconcile(ctx, reconcile.Request{
82+
NamespacedName: types.NamespacedName{Name: name, Namespace: "default"},
83+
})
84+
if err != nil {
85+
return err
86+
}
87+
// Second reconcile: schedules.
88+
_, err = r.Reconcile(ctx, reconcile.Request{
89+
NamespacedName: types.NamespacedName{Name: name, Namespace: "default"},
90+
})
91+
return err
92+
}
93+
94+
It("schedules VM when node has sufficient allocatable resources", func() {
95+
// Node: 4 CPUs, 8GiB memory; Class: 1 vcpu, 512MiB; fraction 0.9
96+
// effectiveMax = min(floor(4000*0.9/1000), floor(8GiB*0.9/512MiB)) = min(3, 14) = 3
97+
makeNode(ctx, "cap-node-ok", "4", "8Gi")
98+
makeClass(ctx, "cap-small", 1, 512)
99+
100+
vm := &impdevv1alpha1.ImpVM{
101+
ObjectMeta: metav1.ObjectMeta{Name: "cap-vm-ok", Namespace: "default"},
102+
Spec: impdevv1alpha1.ImpVMSpec{
103+
ClassRef: &impdevv1alpha1.ClusterObjectRef{Name: "cap-small"},
104+
},
105+
}
106+
Expect(k8sClient.Create(ctx, vm)).To(Succeed())
107+
DeferCleanup(func() { k8sClient.Delete(ctx, vm) }) //nolint:errcheck
108+
109+
Expect(reconcileVM("cap-vm-ok")).To(Succeed())
110+
111+
updated := &impdevv1alpha1.ImpVM{}
112+
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: "cap-vm-ok", Namespace: "default"}, updated)).To(Succeed())
113+
Expect(updated.Spec.NodeName).To(Equal("cap-node-ok"))
114+
Expect(updated.Status.Phase).To(Equal(impdevv1alpha1.VMPhaseScheduled))
115+
})
116+
117+
It("refuses to schedule when VM class exceeds node allocatable CPU", func() {
118+
// Node: 1 CPU, 64GiB; Class: 4 vcpu → 0 fit; should be Unschedulable
119+
makeNode(ctx, "cap-node-small-cpu", "1", "64Gi")
120+
makeClass(ctx, "cap-big-cpu", 4, 256)
121+
122+
vm := &impdevv1alpha1.ImpVM{
123+
ObjectMeta: metav1.ObjectMeta{Name: "cap-vm-no-cpu", Namespace: "default"},
124+
Spec: impdevv1alpha1.ImpVMSpec{
125+
ClassRef: &impdevv1alpha1.ClusterObjectRef{Name: "cap-big-cpu"},
126+
},
127+
}
128+
Expect(k8sClient.Create(ctx, vm)).To(Succeed())
129+
DeferCleanup(func() { k8sClient.Delete(ctx, vm) }) //nolint:errcheck
130+
131+
Expect(reconcileVM("cap-vm-no-cpu")).To(Succeed())
132+
133+
updated := &impdevv1alpha1.ImpVM{}
134+
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: "cap-vm-no-cpu", Namespace: "default"}, updated)).To(Succeed())
135+
Expect(updated.Spec.NodeName).To(BeEmpty())
136+
Expect(updated.Status.Phase).To(Equal(impdevv1alpha1.VMPhasePending))
137+
})
138+
139+
It("refuses to schedule when VM class exceeds node allocatable memory", func() {
140+
// Node: 16 CPUs, 256MiB; Class: 1 vcpu, 512MiB → 0 fit by memory
141+
makeNode(ctx, "cap-node-small-mem", "16", "256Mi")
142+
makeClass(ctx, "cap-big-mem", 1, 512)
143+
144+
vm := &impdevv1alpha1.ImpVM{
145+
ObjectMeta: metav1.ObjectMeta{Name: "cap-vm-no-mem", Namespace: "default"},
146+
Spec: impdevv1alpha1.ImpVMSpec{
147+
ClassRef: &impdevv1alpha1.ClusterObjectRef{Name: "cap-big-mem"},
148+
},
149+
}
150+
Expect(k8sClient.Create(ctx, vm)).To(Succeed())
151+
DeferCleanup(func() { k8sClient.Delete(ctx, vm) }) //nolint:errcheck
152+
153+
Expect(reconcileVM("cap-vm-no-mem")).To(Succeed())
154+
155+
updated := &impdevv1alpha1.ImpVM{}
156+
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: "cap-vm-no-mem", Namespace: "default"}, updated)).To(Succeed())
157+
Expect(updated.Spec.NodeName).To(BeEmpty())
158+
Expect(updated.Status.Phase).To(Equal(impdevv1alpha1.VMPhasePending))
159+
})
160+
161+
It("respects per-node capacityFraction from ClusterImpNodeProfile", func() {
162+
// Node: 4 CPUs, 8GiB; fraction 0.1 → floor(4000*0.1/1000)=0 → unschedulable
163+
makeNode(ctx, "cap-node-low-frac", "4", "8Gi")
164+
makeClass(ctx, "cap-tiny", 1, 256)
165+
166+
profile := &impdevv1alpha1.ClusterImpNodeProfile{
167+
ObjectMeta: metav1.ObjectMeta{Name: "cap-node-low-frac"},
168+
Spec: impdevv1alpha1.ClusterImpNodeProfileSpec{CapacityFraction: "0.1"},
169+
}
170+
Expect(k8sClient.Create(ctx, profile)).To(Succeed())
171+
DeferCleanup(func() { k8sClient.Delete(ctx, profile) }) //nolint:errcheck
172+
173+
vm := &impdevv1alpha1.ImpVM{
174+
ObjectMeta: metav1.ObjectMeta{Name: "cap-vm-low-frac", Namespace: "default"},
175+
Spec: impdevv1alpha1.ImpVMSpec{
176+
ClassRef: &impdevv1alpha1.ClusterObjectRef{Name: "cap-tiny"},
177+
},
178+
}
179+
Expect(k8sClient.Create(ctx, vm)).To(Succeed())
180+
DeferCleanup(func() { k8sClient.Delete(ctx, vm) }) //nolint:errcheck
181+
182+
Expect(reconcileVM("cap-vm-low-frac")).To(Succeed())
183+
184+
updated := &impdevv1alpha1.ImpVM{}
185+
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: "cap-vm-low-frac", Namespace: "default"}, updated)).To(Succeed())
186+
Expect(updated.Spec.NodeName).To(BeEmpty())
187+
Expect(updated.Status.Phase).To(Equal(impdevv1alpha1.VMPhasePending))
188+
})
189+
190+
It("falls back to 0.9 when no ClusterImpConfig exists", func() {
191+
// No ClusterImpConfig created; node 4 CPUs / 8GiB; class 1vcpu/512MiB
192+
// effectiveMax with 0.9 = min(3, 14) = 3 → should schedule
193+
makeNode(ctx, "cap-node-no-cfg", "4", "8Gi")
194+
makeClass(ctx, "cap-def-frac", 1, 512)
195+
196+
vm := &impdevv1alpha1.ImpVM{
197+
ObjectMeta: metav1.ObjectMeta{Name: "cap-vm-no-cfg", Namespace: "default"},
198+
Spec: impdevv1alpha1.ImpVMSpec{
199+
ClassRef: &impdevv1alpha1.ClusterObjectRef{Name: "cap-def-frac"},
200+
},
201+
}
202+
Expect(k8sClient.Create(ctx, vm)).To(Succeed())
203+
DeferCleanup(func() { k8sClient.Delete(ctx, vm) }) //nolint:errcheck
204+
205+
Expect(reconcileVM("cap-vm-no-cfg")).To(Succeed())
206+
207+
updated := &impdevv1alpha1.ImpVM{}
208+
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: "cap-vm-no-cfg", Namespace: "default"}, updated)).To(Succeed())
209+
Expect(updated.Spec.NodeName).To(Equal("cap-node-no-cfg"))
210+
})
211+
})

internal/controller/impvm_scheduler.go

Lines changed: 64 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,19 @@
1+
/*
2+
Copyright 2026.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
117
package controller
218

319
import (
@@ -7,14 +23,18 @@ import (
723
corev1 "k8s.io/api/core/v1"
824
apierrors "k8s.io/apimachinery/pkg/api/errors"
925
"sigs.k8s.io/controller-runtime/pkg/client"
26+
logf "sigs.k8s.io/controller-runtime/pkg/log"
1027

1128
impdevv1alpha1 "github.com/syscode-labs/imp/api/v1alpha1"
1229
)
1330

1431
const labelImpEnabled = "imp/enabled"
1532

16-
// schedule selects a node for vm. Returns "" if no suitable node exists.
33+
// schedule selects a node for vm using a capacity-aware least-loaded strategy.
34+
// Returns "" and no error when no suitable node is available.
1735
func (r *ImpVMReconciler) schedule(ctx context.Context, vm *impdevv1alpha1.ImpVM) (string, error) {
36+
log := logf.FromContext(ctx)
37+
1838
// 1. List nodes with imp/enabled=true
1939
nodeList := &corev1.NodeList{}
2040
if err := r.List(ctx, nodeList, client.MatchingLabels{labelImpEnabled: "true"}); err != nil {
@@ -34,35 +54,66 @@ func (r *ImpVMReconciler) schedule(ctx context.Context, vm *impdevv1alpha1.ImpVM
3454
}
3555
runningPerNode := countRunningVMs(allVMs.Items)
3656

37-
// 4. Apply capacity cap from ClusterImpNodeProfile (if present)
57+
// 4. Resolve VM compute class (best-effort: skip capacity check if unresolvable)
58+
var vmVCPU, vmMemMiB int32
59+
if classSpec, err := resolveClassSpec(ctx, r.Client, vm); err != nil {
60+
log.V(1).Info("could not resolve class spec for capacity check; skipping compute limit",
61+
"vm", vm.Name, "err", err)
62+
} else {
63+
vmVCPU = classSpec.VCPU
64+
vmMemMiB = classSpec.MemoryMiB
65+
}
66+
67+
// 5. Fetch global default fraction from ClusterImpConfig (best-effort)
68+
globalFraction := 0.9
69+
cfg := &impdevv1alpha1.ClusterImpConfig{}
70+
if err := r.Get(ctx, client.ObjectKey{Name: "cluster"}, cfg); err == nil {
71+
globalFraction = parseFraction(cfg.Spec.Capacity.DefaultFraction)
72+
}
73+
74+
// 6. Apply capacity caps
3875
type candidate struct {
3976
name string
4077
running int
4178
}
4279
var candidates []candidate
4380
for _, node := range eligible {
81+
running := runningPerNode[node.Name]
82+
83+
// Fetch per-node profile (may be absent)
4484
profile := &impdevv1alpha1.ClusterImpNodeProfile{}
4585
err := r.Get(ctx, client.ObjectKey{Name: node.Name}, profile)
46-
if err != nil {
47-
if !apierrors.IsNotFound(err) {
48-
// Transient API error — propagate so controller-runtime retries.
49-
return "", err
50-
}
51-
// No profile → no hard cap
52-
candidates = append(candidates, candidate{name: node.Name, running: runningPerNode[node.Name]})
86+
if err != nil && !apierrors.IsNotFound(err) {
87+
return "", err
88+
}
89+
90+
// Hard count cap from profile.
91+
if err == nil && profile.Spec.MaxImpVMs > 0 && int32(running) >= profile.Spec.MaxImpVMs { //nolint:gosec
5392
continue
5493
}
55-
if profile.Spec.MaxImpVMs > 0 && int32(runningPerNode[node.Name]) >= profile.Spec.MaxImpVMs { //nolint:gosec
56-
continue // at capacity
94+
95+
// Compute-based cap (only when class was resolved and node has allocatable).
96+
if vmVCPU > 0 {
97+
fraction := globalFraction
98+
if err == nil && profile.Spec.CapacityFraction != "" {
99+
fraction = parseFraction(profile.Spec.CapacityFraction)
100+
}
101+
allocCPU := node.Status.Allocatable.Cpu().MilliValue()
102+
allocMem := node.Status.Allocatable.Memory().Value()
103+
maxVMs := effectiveMaxVMs(allocCPU, allocMem, vmVCPU, vmMemMiB, fraction)
104+
if int32(running) >= maxVMs { //nolint:gosec
105+
continue
106+
}
57107
}
58-
candidates = append(candidates, candidate{name: node.Name, running: runningPerNode[node.Name]})
108+
109+
candidates = append(candidates, candidate{name: node.Name, running: running})
59110
}
60111

61112
if len(candidates) == 0 {
62113
return "", nil
63114
}
64115

65-
// 5. Least-loaded first; alphabetical tie-break
116+
// 7. Least-loaded first; alphabetical tie-break
66117
sort.Slice(candidates, func(i, j int) bool {
67118
if candidates[i].running != candidates[j].running {
68119
return candidates[i].running < candidates[j].running

0 commit comments

Comments
 (0)