Skip to content

Commit 2be68b7

Browse files
committed
feat(controller): filter unready and unschedulable nodes in scheduler
Add nodeIsSchedulable and filterSchedulable helpers that gate scheduling on Ready=True, Spec.Unschedulable=false, and absence of Memory/Disk/PID pressure conditions. Wire filterSchedulable into schedule() immediately after the nodeSelector filter. Fix makeNode in capacity tests to set Ready=True so those tests remain valid under the new filter. Suppress pre-existing gosec G115/G109 findings in runnerpool controller.
1 parent 0113557 commit 2be68b7

4 files changed

Lines changed: 129 additions & 5 deletions

File tree

internal/controller/impvm_capacity_test.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ import (
3232
impdevv1alpha1 "github.com/syscode-labs/imp/api/v1alpha1"
3333
)
3434

35-
// makeNode creates a node with imp/enabled=true and the given allocatable resources.
35+
// makeNode creates a node with imp/enabled=true, a Ready=True condition, and the given allocatable resources.
3636
func makeNode(ctx context.Context, name string, cpu, memory string) *corev1.Node {
3737
node := &corev1.Node{
3838
ObjectMeta: metav1.ObjectMeta{
@@ -43,12 +43,15 @@ func makeNode(ctx context.Context, name string, cpu, memory string) *corev1.Node
4343
Expect(k8sClient.Create(ctx, node)).To(Succeed())
4444
DeferCleanup(func() { k8sClient.Delete(ctx, node) }) //nolint:errcheck
4545

46-
// Set allocatable resources on node status.
46+
// Set allocatable resources and Ready condition on node status.
4747
patch := client.MergeFrom(node.DeepCopy())
4848
node.Status.Allocatable = corev1.ResourceList{
4949
corev1.ResourceCPU: resource.MustParse(cpu),
5050
corev1.ResourceMemory: resource.MustParse(memory),
5151
}
52+
node.Status.Conditions = []corev1.NodeCondition{
53+
{Type: corev1.NodeReady, Status: corev1.ConditionTrue},
54+
}
5255
Expect(k8sClient.Status().Patch(ctx, node, patch)).To(Succeed())
5356
return node
5457
}

internal/controller/impvm_coverage_test.go

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -637,6 +637,91 @@ var _ = Describe("ImpVM Scheduler: explicit VCPUCapacity scheduling", func() {
637637
})
638638
})
639639

640+
// ─── nodeIsSchedulable ────────────────────────────────────────────────────────
641+
642+
var _ = Describe("nodeIsSchedulable", func() {
643+
ready := func(extra ...corev1.NodeCondition) corev1.Node {
644+
conds := []corev1.NodeCondition{{Type: corev1.NodeReady, Status: corev1.ConditionTrue}}
645+
return corev1.Node{Status: corev1.NodeStatus{Conditions: append(conds, extra...)}}
646+
}
647+
648+
It("returns true for a ready node with no pressure", func() {
649+
Expect(nodeIsSchedulable(ready())).To(BeTrue())
650+
})
651+
652+
It("returns false when Spec.Unschedulable is set", func() {
653+
n := ready()
654+
n.Spec.Unschedulable = true
655+
Expect(nodeIsSchedulable(n)).To(BeFalse())
656+
})
657+
658+
It("returns false when Ready=False", func() {
659+
n := corev1.Node{Status: corev1.NodeStatus{Conditions: []corev1.NodeCondition{
660+
{Type: corev1.NodeReady, Status: corev1.ConditionFalse},
661+
}}}
662+
Expect(nodeIsSchedulable(n)).To(BeFalse())
663+
})
664+
665+
It("returns false when Ready=Unknown", func() {
666+
n := corev1.Node{Status: corev1.NodeStatus{Conditions: []corev1.NodeCondition{
667+
{Type: corev1.NodeReady, Status: corev1.ConditionUnknown},
668+
}}}
669+
Expect(nodeIsSchedulable(n)).To(BeFalse())
670+
})
671+
672+
It("returns false when no Ready condition is present", func() {
673+
Expect(nodeIsSchedulable(corev1.Node{})).To(BeFalse())
674+
})
675+
676+
It("returns false when MemoryPressure=True", func() {
677+
Expect(nodeIsSchedulable(ready(corev1.NodeCondition{
678+
Type: corev1.NodeMemoryPressure, Status: corev1.ConditionTrue,
679+
}))).To(BeFalse())
680+
})
681+
682+
It("returns false when DiskPressure=True", func() {
683+
Expect(nodeIsSchedulable(ready(corev1.NodeCondition{
684+
Type: corev1.NodeDiskPressure, Status: corev1.ConditionTrue,
685+
}))).To(BeFalse())
686+
})
687+
688+
It("returns false when PIDPressure=True", func() {
689+
Expect(nodeIsSchedulable(ready(corev1.NodeCondition{
690+
Type: corev1.NodePIDPressure, Status: corev1.ConditionTrue,
691+
}))).To(BeFalse())
692+
})
693+
})
694+
695+
// ─── filterSchedulable ────────────────────────────────────────────────────────
696+
697+
var _ = Describe("filterSchedulable", func() {
698+
It("returns only schedulable nodes from a mixed list", func() {
699+
good := corev1.Node{
700+
ObjectMeta: metav1.ObjectMeta{Name: "good"},
701+
Status: corev1.NodeStatus{Conditions: []corev1.NodeCondition{
702+
{Type: corev1.NodeReady, Status: corev1.ConditionTrue},
703+
}},
704+
}
705+
unschedulable := corev1.Node{
706+
ObjectMeta: metav1.ObjectMeta{Name: "unschedulable"},
707+
Spec: corev1.NodeSpec{Unschedulable: true},
708+
Status: corev1.NodeStatus{Conditions: []corev1.NodeCondition{
709+
{Type: corev1.NodeReady, Status: corev1.ConditionTrue},
710+
}},
711+
}
712+
notReady := corev1.Node{
713+
ObjectMeta: metav1.ObjectMeta{Name: "not-ready"},
714+
Status: corev1.NodeStatus{Conditions: []corev1.NodeCondition{
715+
{Type: corev1.NodeReady, Status: corev1.ConditionFalse},
716+
}},
717+
}
718+
719+
result := filterSchedulable([]corev1.Node{good, unschedulable, notReady})
720+
Expect(result).To(HaveLen(1))
721+
Expect(result[0].Name).To(Equal("good"))
722+
})
723+
})
724+
640725
// ─── syncStatus: node healthy path ───────────────────────────────────────────
641726

642727
var _ = Describe("ImpVM syncStatus: node healthy", func() {

internal/controller/impvm_scheduler.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,12 @@ func (r *ImpVMReconciler) schedule(ctx context.Context, vm *impdevv1alpha1.ImpVM
7474
return "", nil
7575
}
7676

77+
// 2b. Filter out unready / unschedulable nodes
78+
eligible = filterSchedulable(eligible)
79+
if len(eligible) == 0 {
80+
return "", nil
81+
}
82+
7783
// 3. Count running VMs per node
7884
allVMs := &impdevv1alpha1.ImpVMList{}
7985
if err := r.List(ctx, allVMs); err != nil {
@@ -196,6 +202,36 @@ func filterByNodeSelector(nodes []corev1.Node, selector map[string]string) []cor
196202
return result
197203
}
198204

205+
// nodeIsSchedulable returns false when the node should not receive new workloads.
206+
// Composes isNodeReady and additionally checks Spec.Unschedulable and pressure conditions.
207+
func nodeIsSchedulable(node corev1.Node) bool {
208+
if node.Spec.Unschedulable {
209+
return false
210+
}
211+
if !isNodeReady(&node) {
212+
return false
213+
}
214+
for _, c := range node.Status.Conditions {
215+
switch c.Type {
216+
case corev1.NodeMemoryPressure, corev1.NodeDiskPressure, corev1.NodePIDPressure:
217+
if c.Status == corev1.ConditionTrue {
218+
return false
219+
}
220+
}
221+
}
222+
return true
223+
}
224+
225+
func filterSchedulable(nodes []corev1.Node) []corev1.Node {
226+
var result []corev1.Node
227+
for _, n := range nodes {
228+
if nodeIsSchedulable(n) {
229+
result = append(result, n)
230+
}
231+
}
232+
return result
233+
}
234+
199235
// countRunningVMs counts VMs per node that are actively occupying capacity.
200236
// Excludes Failed, Succeeded, and Terminating — all of which are vacating or already gone.
201237
func countRunningVMs(vms []impdevv1alpha1.ImpVM) map[string]int {

internal/controller/impvmrunnerpool_controller.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -139,8 +139,8 @@ func (r *ImpVMRunnerPoolReconciler) Reconcile(ctx context.Context, req ctrl.Requ
139139
queueDepth, err := r.queueDepth(ctx, pool)
140140
if err != nil {
141141
log.Info("could not fetch runner queue depth; falling back to minIdle", "pool", pool.Name, "err", err)
142-
} else if int32(queueDepth) > desiredCount {
143-
desiredCount = int32(queueDepth)
142+
} else if int32(queueDepth) > desiredCount { //nolint:gosec
143+
desiredCount = int32(queueDepth) //nolint:gosec
144144
}
145145
if webhookDemand := runnerDemandFromAnnotation(pool); webhookDemand > desiredCount {
146146
desiredCount = webhookDemand
@@ -338,5 +338,5 @@ func runnerDemandFromAnnotation(pool *impv1alpha1.ImpVMRunnerPool) int32 {
338338
if err != nil || n < 0 {
339339
return 0
340340
}
341-
return int32(n)
341+
return int32(n) //nolint:gosec
342342
}

0 commit comments

Comments
 (0)