Skip to content

Commit 54d8cfd

Browse files
committed
feat(controller): taint/toleration scheduling filter with class merge
Add tolerationMatchesTaint, toleratesTaint, nodeToleratedBy, filterByTolerations, and resolveTolerations helpers. Refactor schedule() to resolve ImpVMClassSpec once early (step 4) for both toleration merging and capacity; apply the taint filter as step 5 before capacity checks. System node.kubernetes.io/* taints are skipped since filterSchedulable already enforces the underlying node conditions.
1 parent 1d48b0d commit 54d8cfd

2 files changed

Lines changed: 213 additions & 16 deletions

File tree

internal/controller/impvm_coverage_test.go

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -722,6 +722,128 @@ var _ = Describe("filterSchedulable", func() {
722722
})
723723
})
724724

725+
// ─── taint/toleration helpers ────────────────────────────────────────────────
726+
727+
var _ = Describe("tolerationMatchesTaint", func() {
728+
It("matches Equal operator with correct key+value+effect", func() {
729+
tol := corev1.Toleration{Key: "env", Operator: corev1.TolerationOpEqual, Value: "prod", Effect: corev1.TaintEffectNoSchedule}
730+
tai := corev1.Taint{Key: "env", Value: "prod", Effect: corev1.TaintEffectNoSchedule}
731+
Expect(tolerationMatchesTaint(tol, tai)).To(BeTrue())
732+
})
733+
734+
It("does not match Equal operator with wrong value", func() {
735+
tol := corev1.Toleration{Key: "env", Operator: corev1.TolerationOpEqual, Value: "staging", Effect: corev1.TaintEffectNoSchedule}
736+
tai := corev1.Taint{Key: "env", Value: "prod", Effect: corev1.TaintEffectNoSchedule}
737+
Expect(tolerationMatchesTaint(tol, tai)).To(BeFalse())
738+
})
739+
740+
It("matches Exists operator regardless of value", func() {
741+
tol := corev1.Toleration{Key: "env", Operator: corev1.TolerationOpExists, Effect: corev1.TaintEffectNoSchedule}
742+
tai := corev1.Taint{Key: "env", Value: "anything", Effect: corev1.TaintEffectNoSchedule}
743+
Expect(tolerationMatchesTaint(tol, tai)).To(BeTrue())
744+
})
745+
746+
It("matches Exists with empty key (wildcard)", func() {
747+
tol := corev1.Toleration{Operator: corev1.TolerationOpExists}
748+
tai := corev1.Taint{Key: "any-key", Value: "any-value", Effect: corev1.TaintEffectNoExecute}
749+
Expect(tolerationMatchesTaint(tol, tai)).To(BeTrue())
750+
})
751+
752+
It("does not match when effect differs", func() {
753+
tol := corev1.Toleration{Key: "env", Operator: corev1.TolerationOpEqual, Value: "prod", Effect: corev1.TaintEffectNoExecute}
754+
tai := corev1.Taint{Key: "env", Value: "prod", Effect: corev1.TaintEffectNoSchedule}
755+
Expect(tolerationMatchesTaint(tol, tai)).To(BeFalse())
756+
})
757+
758+
It("matches when toleration effect is empty (tolerates all effects)", func() {
759+
tol := corev1.Toleration{Key: "env", Operator: corev1.TolerationOpEqual, Value: "prod"}
760+
tai := corev1.Taint{Key: "env", Value: "prod", Effect: corev1.TaintEffectNoSchedule}
761+
Expect(tolerationMatchesTaint(tol, tai)).To(BeTrue())
762+
})
763+
})
764+
765+
var _ = Describe("nodeToleratedBy", func() {
766+
It("returns true when node has no taints", func() {
767+
n := corev1.Node{}
768+
Expect(nodeToleratedBy(n, nil)).To(BeTrue())
769+
})
770+
771+
It("returns true when NoSchedule taint is tolerated", func() {
772+
n := corev1.Node{Spec: corev1.NodeSpec{Taints: []corev1.Taint{
773+
{Key: "key", Value: "val", Effect: corev1.TaintEffectNoSchedule},
774+
}}}
775+
tols := []corev1.Toleration{{Key: "key", Operator: corev1.TolerationOpEqual, Value: "val", Effect: corev1.TaintEffectNoSchedule}}
776+
Expect(nodeToleratedBy(n, tols)).To(BeTrue())
777+
})
778+
779+
It("returns false when NoSchedule taint is not tolerated", func() {
780+
n := corev1.Node{Spec: corev1.NodeSpec{Taints: []corev1.Taint{
781+
{Key: "key", Value: "val", Effect: corev1.TaintEffectNoSchedule},
782+
}}}
783+
Expect(nodeToleratedBy(n, nil)).To(BeFalse())
784+
})
785+
786+
It("returns true when only PreferNoSchedule taint is present (always allowed)", func() {
787+
n := corev1.Node{Spec: corev1.NodeSpec{Taints: []corev1.Taint{
788+
{Key: "key", Value: "val", Effect: corev1.TaintEffectPreferNoSchedule},
789+
}}}
790+
Expect(nodeToleratedBy(n, nil)).To(BeTrue())
791+
})
792+
793+
It("returns false when NoExecute taint is not tolerated", func() {
794+
n := corev1.Node{Spec: corev1.NodeSpec{Taints: []corev1.Taint{
795+
{Key: "dedicated", Value: "gpu", Effect: corev1.TaintEffectNoExecute},
796+
}}}
797+
Expect(nodeToleratedBy(n, nil)).To(BeFalse())
798+
})
799+
800+
It("returns true when only system node.kubernetes.io/* taints are present (handled by filterSchedulable)", func() {
801+
n := corev1.Node{Spec: corev1.NodeSpec{Taints: []corev1.Taint{
802+
{Key: "node.kubernetes.io/not-ready", Effect: corev1.TaintEffectNoSchedule},
803+
}}}
804+
Expect(nodeToleratedBy(n, nil)).To(BeTrue())
805+
})
806+
})
807+
808+
var _ = Describe("filterByTolerations", func() {
809+
It("returns only nodes whose taints are fully tolerated", func() {
810+
tols := []corev1.Toleration{{Key: "zone", Operator: corev1.TolerationOpEqual, Value: "us-east", Effect: corev1.TaintEffectNoSchedule}}
811+
nodes := []corev1.Node{
812+
{ObjectMeta: metav1.ObjectMeta{Name: "ok"}, Spec: corev1.NodeSpec{Taints: []corev1.Taint{
813+
{Key: "zone", Value: "us-east", Effect: corev1.TaintEffectNoSchedule},
814+
}}},
815+
{ObjectMeta: metav1.ObjectMeta{Name: "blocked"}, Spec: corev1.NodeSpec{Taints: []corev1.Taint{
816+
{Key: "zone", Value: "eu-west", Effect: corev1.TaintEffectNoSchedule},
817+
}}},
818+
{ObjectMeta: metav1.ObjectMeta{Name: "clean"}},
819+
}
820+
result := filterByTolerations(nodes, tols)
821+
Expect(result).To(HaveLen(2))
822+
names := []string{result[0].Name, result[1].Name}
823+
Expect(names).To(ConsistOf("ok", "clean"))
824+
})
825+
})
826+
827+
var _ = Describe("resolveTolerations", func() {
828+
It("returns VM tolerations when classSpec is nil", func() {
829+
vm := &impdevv1alpha1.ImpVM{}
830+
vm.Spec.Tolerations = []corev1.Toleration{
831+
{Key: "k", Operator: corev1.TolerationOpEqual, Value: "v"},
832+
}
833+
Expect(resolveTolerations(vm, nil)).To(HaveLen(1))
834+
})
835+
836+
It("merges class and VM tolerations additively", func() {
837+
vm := &impdevv1alpha1.ImpVM{}
838+
vm.Spec.Tolerations = []corev1.Toleration{{Key: "vm-key"}}
839+
classSpec := &impdevv1alpha1.ImpVMClassSpec{
840+
Tolerations: []corev1.Toleration{{Key: "class-key"}},
841+
}
842+
result := resolveTolerations(vm, classSpec)
843+
Expect(result).To(HaveLen(2))
844+
})
845+
})
846+
725847
// ─── syncStatus: node healthy path ───────────────────────────────────────────
726848

727849
var _ = Describe("ImpVM syncStatus: node healthy", func() {

internal/controller/impvm_scheduler.go

Lines changed: 91 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package controller
1919
import (
2020
"context"
2121
"sort"
22+
"strings"
2223

2324
corev1 "k8s.io/api/core/v1"
2425
apierrors "k8s.io/apimachinery/pkg/api/errors"
@@ -74,30 +75,43 @@ func (r *ImpVMReconciler) schedule(ctx context.Context, vm *impdevv1alpha1.ImpVM
7475
return "", nil
7576
}
7677

77-
// 2b. Filter out unready / unschedulable nodes
78+
// 3. Filter out unready / unschedulable nodes
7879
eligible = filterSchedulable(eligible)
7980
if len(eligible) == 0 {
8081
return "", nil
8182
}
8283

83-
// 3. Count running VMs per node
84+
// 4. Resolve VM class spec once (best-effort: tolerations + capacity)
85+
var classSpec *impdevv1alpha1.ImpVMClassSpec
86+
if cs, err := resolveClassSpec(ctx, r.Client, vm); err != nil {
87+
log.V(1).Info("could not resolve class spec; using VM-only tolerations and skipping capacity check",
88+
"vm", vm.Name, "err", err)
89+
} else {
90+
classSpec = cs
91+
}
92+
93+
// 5. Filter by taint tolerations
94+
tolerations := resolveTolerations(vm, classSpec)
95+
eligible = filterByTolerations(eligible, tolerations)
96+
if len(eligible) == 0 {
97+
return "", nil
98+
}
99+
100+
// 6. Count running VMs per node
84101
allVMs := &impdevv1alpha1.ImpVMList{}
85102
if err := r.List(ctx, allVMs); err != nil {
86103
return "", err
87104
}
88105
runningPerNode := countRunningVMs(allVMs.Items)
89106

90-
// 4. Resolve VM compute class (best-effort: skip capacity check if unresolvable)
107+
// 7. Extract compute requirements
91108
var vmVCPU, vmMemMiB int32
92-
if classSpec, err := resolveClassSpec(ctx, r.Client, vm); err != nil {
93-
log.V(1).Info("could not resolve class spec for capacity check; skipping compute limit",
94-
"vm", vm.Name, "err", err)
95-
} else {
109+
if classSpec != nil {
96110
vmVCPU = classSpec.VCPU
97111
vmMemMiB = classSpec.MemoryMiB
98112
}
99113

100-
// 5a. Explicit-capacity scheduling: use Schedule() for nodes that have VCPUCapacity set on profile.
114+
// 8a. Explicit-capacity scheduling
101115
usedResources := sumUsedResources(ctx, r.Client, allVMs.Items)
102116
var explicitNodes []NodeInfo
103117
for _, node := range eligible {
@@ -119,18 +133,17 @@ func (r *ImpVMReconciler) schedule(ctx context.Context, vm *impdevv1alpha1.ImpVM
119133
if err == nil {
120134
return chosen, nil
121135
}
122-
// ErrUnschedulable from explicit-capacity nodes — fall through to fraction-based
123-
// logic for any nodes without explicit profiles.
136+
// ErrUnschedulable from explicit-capacity nodes — fall through to fraction-based.
124137
}
125138

126-
// 5. Fetch global default fraction from ClusterImpConfig (best-effort)
139+
// 8. Fetch global default fraction from ClusterImpConfig (best-effort)
127140
globalFraction := 0.9
128141
cfg := &impdevv1alpha1.ClusterImpConfig{}
129142
if err := r.Get(ctx, client.ObjectKey{Name: "cluster"}, cfg); err == nil {
130143
globalFraction = parseFraction(cfg.Spec.Capacity.DefaultFraction)
131144
}
132145

133-
// 6. Apply capacity caps
146+
// 9. Apply capacity caps
134147
type candidate struct {
135148
name string
136149
running int
@@ -139,19 +152,16 @@ func (r *ImpVMReconciler) schedule(ctx context.Context, vm *impdevv1alpha1.ImpVM
139152
for _, node := range eligible {
140153
running := runningPerNode[node.Name]
141154

142-
// Fetch per-node profile (may be absent)
143155
profile := &impdevv1alpha1.ClusterImpNodeProfile{}
144156
err := r.Get(ctx, client.ObjectKey{Name: node.Name}, profile)
145157
if err != nil && !apierrors.IsNotFound(err) {
146158
return "", err
147159
}
148160

149-
// Hard count cap from profile.
150161
if err == nil && profile.Spec.MaxImpVMs > 0 && int32(running) >= profile.Spec.MaxImpVMs { //nolint:gosec
151162
continue
152163
}
153164

154-
// Compute-based cap (only when class was resolved and node has allocatable).
155165
if vmVCPU > 0 {
156166
fraction := globalFraction
157167
if err == nil && profile.Spec.CapacityFraction != "" {
@@ -172,7 +182,7 @@ func (r *ImpVMReconciler) schedule(ctx context.Context, vm *impdevv1alpha1.ImpVM
172182
return "", nil
173183
}
174184

175-
// 7. Least-loaded first; alphabetical tie-break
185+
// 10. Least-loaded first; alphabetical tie-break
176186
sort.Slice(candidates, func(i, j int) bool {
177187
if candidates[i].running != candidates[j].running {
178188
return candidates[i].running < candidates[j].running
@@ -182,6 +192,71 @@ func (r *ImpVMReconciler) schedule(ctx context.Context, vm *impdevv1alpha1.ImpVM
182192
return candidates[0].name, nil
183193
}
184194

195+
// tolerationMatchesTaint checks whether a single toleration covers a taint.
196+
func tolerationMatchesTaint(t corev1.Toleration, taint corev1.Taint) bool {
197+
// Effect must match, unless toleration effect is empty (matches all effects).
198+
if t.Effect != "" && t.Effect != taint.Effect {
199+
return false
200+
}
201+
if t.Operator == corev1.TolerationOpExists {
202+
// Empty key = wildcard; matches any taint key.
203+
return t.Key == "" || t.Key == taint.Key
204+
}
205+
// TolerationOpEqual (default)
206+
return t.Key == taint.Key && t.Value == taint.Value
207+
}
208+
209+
// toleratesTaint returns true if any toleration in the list covers the taint.
210+
func toleratesTaint(taint corev1.Taint, tolerations []corev1.Toleration) bool {
211+
for _, t := range tolerations {
212+
if tolerationMatchesTaint(t, taint) {
213+
return true
214+
}
215+
}
216+
return false
217+
}
218+
219+
// nodeToleratedBy returns true when all NoSchedule and NoExecute taints on
220+
// the node are covered by tolerations. PreferNoSchedule is always allowed.
221+
// System node-lifecycle taints (node.kubernetes.io/*) are skipped because
222+
// their corresponding conditions are already enforced by filterSchedulable.
223+
func nodeToleratedBy(node corev1.Node, tolerations []corev1.Toleration) bool {
224+
for _, taint := range node.Spec.Taints {
225+
if taint.Effect == corev1.TaintEffectPreferNoSchedule {
226+
continue
227+
}
228+
// Skip well-known system taints managed by the node lifecycle controller;
229+
// filterSchedulable already gates on the underlying node conditions.
230+
if strings.HasPrefix(taint.Key, "node.kubernetes.io/") {
231+
continue
232+
}
233+
if !toleratesTaint(taint, tolerations) {
234+
return false
235+
}
236+
}
237+
return true
238+
}
239+
240+
func filterByTolerations(nodes []corev1.Node, tolerations []corev1.Toleration) []corev1.Node {
241+
var result []corev1.Node
242+
for _, n := range nodes {
243+
if nodeToleratedBy(n, tolerations) {
244+
result = append(result, n)
245+
}
246+
}
247+
return result
248+
}
249+
250+
// resolveTolerations returns the additive union of class-level and VM-level tolerations.
251+
func resolveTolerations(vm *impdevv1alpha1.ImpVM, classSpec *impdevv1alpha1.ImpVMClassSpec) []corev1.Toleration {
252+
var result []corev1.Toleration
253+
if classSpec != nil {
254+
result = append(result, classSpec.Tolerations...)
255+
}
256+
result = append(result, vm.Spec.Tolerations...)
257+
return result
258+
}
259+
185260
func filterByNodeSelector(nodes []corev1.Node, selector map[string]string) []corev1.Node {
186261
if len(selector) == 0 {
187262
return nodes

0 commit comments

Comments
 (0)