Skip to content

Commit cfe5516

Browse files
Refine pipeline crd and respective controller implementation (#464)
Change summary: - Knowledge dependencies of scheduler steps are no longer handled with the `knowledges:` spec in the pipeline crd. Instead, scheduler steps may fail during the `Init` step and the corresponding error is propagated to the pipeline readiness. If `weighers` fail, a non-critical error is propagated and the `AllStepsReady` condition set to `false`. If `filters` fail, the pipeline condition `Ready` is set to `false`. - The `impl` spec has been removed and scheduler steps are now identified by the `name` spec. - The `type` spec has been removed, instead we now identify weighers and filters with the `filters:` and `weighers:` lists. - Filters are now executed strictly in order and the result is passed down. Weighers are executed in parallel. - Added a `multiplier` to weighers which is applied during weights aggregation. - Besides `filters:` and `weighers:` also added compatibility with `detectors:` which are used by the descheduling pipeline controller to detect vms to deschedule. - Removed the `mandatory` flag. Weighers and detectors are implicitly optional, and the filters are mandatory. - Rename `opts:` to `params:` keeping the runtime.Rawextension to support all json primitives as configuration objects for the scheduler steps. - Pulled out descheduling code from the nova subfolder into the library folder. - Renamed pipeline for filters and weighers into `FilterWeigherPipeline`. Renamed pipeline for detectors into `DetectorPipeline`. - Improved the overall code structuring and coverage of the scheduling module. Implements part of the target spec proposed in #442
1 parent eed02e1 commit cfe5516

198 files changed

Lines changed: 8070 additions & 4773 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

api/delegation/cinder/messages.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,11 @@
33

44
package api
55

6-
import "log/slog"
6+
import (
7+
"log/slog"
8+
9+
"github.com/cobaltcore-dev/cortex/internal/scheduling/lib"
10+
)
711

812
// Host object from the Cinder scheduler pipeline.
913
type ExternalSchedulerHost struct {
@@ -46,6 +50,16 @@ func (r ExternalSchedulerRequest) GetTraceLogArgs() []slog.Attr {
4650
slog.String("project", r.Context.ProjectID),
4751
}
4852
}
53+
func (r ExternalSchedulerRequest) FilterSubjects(includedSubjects map[string]float64) lib.FilterWeigherPipelineRequest {
54+
filteredHosts := make([]ExternalSchedulerHost, 0, len(includedSubjects))
55+
for _, host := range r.Hosts {
56+
if _, exists := includedSubjects[host.VolumeHost]; exists {
57+
filteredHosts = append(filteredHosts, host)
58+
}
59+
}
60+
r.Hosts = filteredHosts
61+
return r
62+
}
4963

5064
// Response generated by cortex for the Cinder scheduler.
5165
// Cortex returns an ordered list of hosts that the share should be scheduled on.

api/delegation/ironcore/messages.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"log/slog"
88

99
ironcorev1alpha1 "github.com/cobaltcore-dev/cortex/api/delegation/ironcore/v1alpha1"
10+
"github.com/cobaltcore-dev/cortex/internal/scheduling/lib"
1011
)
1112

1213
type MachinePipelineRequest struct {
@@ -31,3 +32,13 @@ func (r MachinePipelineRequest) GetWeights() map[string]float64 {
3132
func (r MachinePipelineRequest) GetTraceLogArgs() []slog.Attr {
3233
return []slog.Attr{}
3334
}
35+
func (r MachinePipelineRequest) FilterSubjects(includedSubjects map[string]float64) lib.FilterWeigherPipelineRequest {
36+
filteredPools := make([]ironcorev1alpha1.MachinePool, 0, len(includedSubjects))
37+
for _, pool := range r.Pools {
38+
if _, exists := includedSubjects[pool.Name]; exists {
39+
filteredPools = append(filteredPools, pool)
40+
}
41+
}
42+
r.Pools = filteredPools
43+
return r
44+
}

api/delegation/manila/messages.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,11 @@
33

44
package api
55

6-
import "log/slog"
6+
import (
7+
"log/slog"
8+
9+
"github.com/cobaltcore-dev/cortex/internal/scheduling/lib"
10+
)
711

812
// Host object from the Manila scheduler pipeline.
913
type ExternalSchedulerHost struct {
@@ -46,6 +50,16 @@ func (r ExternalSchedulerRequest) GetTraceLogArgs() []slog.Attr {
4650
slog.String("project", r.Context.ProjectID),
4751
}
4852
}
53+
func (r ExternalSchedulerRequest) FilterSubjects(includedSubjects map[string]float64) lib.FilterWeigherPipelineRequest {
54+
filteredHosts := make([]ExternalSchedulerHost, 0, len(includedSubjects))
55+
for _, host := range r.Hosts {
56+
if _, exists := includedSubjects[host.ShareHost]; exists {
57+
filteredHosts = append(filteredHosts, host)
58+
}
59+
}
60+
r.Hosts = filteredHosts
61+
return r
62+
}
4963

5064
// Response generated by cortex for the Manila scheduler.
5165
// Cortex returns an ordered list of hosts that the share should be scheduled on.

api/delegation/nova/messages.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ import (
77
"errors"
88
"fmt"
99
"log/slog"
10+
11+
"github.com/cobaltcore-dev/cortex/internal/scheduling/lib"
1012
)
1113

1214
// Host object from the Nova scheduler pipeline.
@@ -69,6 +71,16 @@ func (r ExternalSchedulerRequest) GetTraceLogArgs() []slog.Attr {
6971
slog.String("project", r.Context.ProjectID),
7072
}
7173
}
74+
func (r ExternalSchedulerRequest) FilterSubjects(includedSubjects map[string]float64) lib.FilterWeigherPipelineRequest {
75+
filteredHosts := make([]ExternalSchedulerHost, 0, len(includedSubjects))
76+
for _, host := range r.Hosts {
77+
if _, exists := includedSubjects[host.ComputeHost]; exists {
78+
filteredHosts = append(filteredHosts, host)
79+
}
80+
}
81+
r.Hosts = filteredHosts
82+
return r
83+
}
7284

7385
// Response generated by cortex for the Nova scheduler.
7486
// Cortex returns an ordered list of hosts that the VM should be scheduled on.

api/delegation/pods/messages.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package pods
66
import (
77
"log/slog"
88

9+
"github.com/cobaltcore-dev/cortex/internal/scheduling/lib"
910
corev1 "k8s.io/api/core/v1"
1011
)
1112

@@ -33,3 +34,13 @@ func (r PodPipelineRequest) GetWeights() map[string]float64 {
3334
func (r PodPipelineRequest) GetTraceLogArgs() []slog.Attr {
3435
return []slog.Attr{}
3536
}
37+
func (r PodPipelineRequest) FilterSubjects(includedSubjects map[string]float64) lib.FilterWeigherPipelineRequest {
38+
filteredNodes := make([]corev1.Node, 0, len(includedSubjects))
39+
for _, node := range r.Nodes {
40+
if _, exists := includedSubjects[node.Name]; exists {
41+
filteredNodes = append(filteredNodes, node)
42+
}
43+
}
44+
r.Nodes = filteredNodes
45+
return r
46+
}

api/v1alpha1/pipeline_types.go

Lines changed: 122 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -4,63 +4,59 @@
44
package v1alpha1
55

66
import (
7-
corev1 "k8s.io/api/core/v1"
87
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
98
runtime "k8s.io/apimachinery/pkg/runtime"
109
)
1110

12-
type DisabledValidationsSpec struct {
13-
// Whether to validate that no subjects are removed or added from the scheduler
14-
// step. This should only be disabled for scheduler steps that remove subjects.
15-
// Thus, if no value is provided, the default is false.
16-
SameSubjectNumberInOut bool `json:"sameSubjectNumberInOut,omitempty"`
17-
// Whether to validate that, after running the step, there are remaining subjects.
18-
// This should only be disabled for scheduler steps that are expected to
19-
// remove all subjects.
20-
SomeSubjectsRemain bool `json:"someSubjectsRemain,omitempty"`
21-
}
11+
type FilterSpec struct {
12+
// The name of the scheduler step in the cortex implementation.
13+
// Must match to a step implemented by the pipeline controller.
14+
Name string `json:"name"`
2215

23-
type StepType string
16+
// Additional configuration for the step that can be used
17+
// +kubebuilder:validation:Optional
18+
Params runtime.RawExtension `json:"params,omitempty"`
2419

25-
const (
26-
// Step for assigning weights to hosts.
27-
StepTypeWeigher StepType = "weigher"
28-
// Step for filtering hosts.
29-
StepTypeFilter StepType = "filter"
30-
// Step for generating descheduling recommendations.
31-
StepTypeDescheduler StepType = "descheduler"
32-
)
20+
// Additional description of the step which helps understand its purpose
21+
// and decisions made by it.
22+
// +kubebuilder:validation:Optional
23+
Description string `json:"description,omitempty"`
24+
}
3325

3426
type WeigherSpec struct {
35-
// The validations to disable for this step. If none are provided, all
36-
// applied validations are enabled.
27+
// The name of the scheduler step in the cortex implementation.
28+
// Must match to a step implemented by the pipeline controller.
29+
Name string `json:"name"`
30+
31+
// Additional configuration for the step that can be used
3732
// +kubebuilder:validation:Optional
38-
DisabledValidations DisabledValidationsSpec `json:"disabledValidations,omitempty"`
39-
}
33+
Params runtime.RawExtension `json:"params,omitempty"`
34+
35+
// Additional description of the step which helps understand its purpose
36+
// and decisions made by it.
37+
// +kubebuilder:validation:Optional
38+
Description string `json:"description,omitempty"`
4039

41-
type StepSpec struct {
42-
// The type of the scheduler step.
43-
Type StepType `json:"type"`
44-
// If the type is "weigher", this contains additional configuration for it.
40+
// Optional multiplier to apply to the step's output.
41+
// This can be used to increase or decrease the weight of a step
42+
// relative to other steps in the same pipeline.
4543
// +kubebuilder:validation:Optional
46-
Weigher *WeigherSpec `json:"weigher,omitempty"`
44+
Multiplier *float64 `json:"multiplier,omitempty"`
45+
}
4746

47+
type DetectorSpec struct {
4848
// The name of the scheduler step in the cortex implementation.
49-
Impl string `json:"impl"`
50-
// Additional configuration for the extractor that can be used
51-
// +kubebuilder:validation:Optional
52-
Opts runtime.RawExtension `json:"opts,omitempty"`
53-
// Knowledges this step depends on to be ready.
49+
// Must match to a step implemented by the pipeline controller.
50+
Name string `json:"name"`
51+
52+
// Additional configuration for the step that can be used
5453
// +kubebuilder:validation:Optional
55-
Knowledges []corev1.ObjectReference `json:"knowledges,omitempty"`
54+
Params runtime.RawExtension `json:"params,omitempty"`
55+
5656
// Additional description of the step which helps understand its purpose
5757
// and decisions made by it.
5858
// +kubebuilder:validation:Optional
5959
Description string `json:"description,omitempty"`
60-
61-
// Whether this step is mandatory for the pipeline to be runnable.
62-
// +kubebuilder:default=true
63-
Mandatory bool `json:"mandatory"`
6460
}
6561

6662
type PipelineType string
@@ -69,41 +65,113 @@ const (
6965
// Pipeline containing filter-weigher steps for initial placement,
7066
// migration, etc. of instances.
7167
PipelineTypeFilterWeigher PipelineType = "filter-weigher"
72-
// Pipeline containing descheduler steps for generating descheduling
68+
// Pipeline containing detector steps, e.g. for generating descheduling
7369
// recommendations.
74-
PipelineTypeDescheduler PipelineType = "descheduler"
70+
PipelineTypeDetector PipelineType = "detector"
7571
)
7672

7773
type PipelineSpec struct {
7874
// SchedulingDomain defines in which scheduling domain this pipeline
7975
// is used (e.g., nova, cinder, manila).
8076
SchedulingDomain SchedulingDomain `json:"schedulingDomain"`
81-
// An optional description of the pipeline.
77+
78+
// An optional description of the pipeline, helping understand its purpose.
8279
// +kubebuilder:validation:Optional
8380
Description string `json:"description,omitempty"`
81+
8482
// If this pipeline should create decision objects.
8583
// When this is false, the pipeline will still process requests.
8684
// +kubebuilder:default=false
8785
CreateDecisions bool `json:"createDecisions,omitempty"`
88-
// The type of the pipeline.
86+
87+
// The type of the pipeline, used to differentiate between
88+
// filter-weigher and detector pipelines within the same
89+
// scheduling domain.
90+
//
91+
// If the type is filter-weigher, the filter and weigher attributes
92+
// must be set. If the type is detector, the detectors attribute
93+
// must be set.
94+
//
95+
// +kubebuilder:validation:Enum=filter-weigher;detector
8996
Type PipelineType `json:"type"`
90-
// The ordered list of steps that make up this pipeline.
91-
Steps []StepSpec `json:"steps,omitempty"`
97+
98+
// Ordered list of filters to apply in a scheduling pipeline.
99+
//
100+
// This attribute is set only if the pipeline type is filter-weigher.
101+
// Filters remove host candidates from an initial set, leaving
102+
// valid candidates. Filters are run before weighers are applied.
103+
// +kubebuilder:validation:Optional
104+
Filters []FilterSpec `json:"filters,omitempty"`
105+
106+
// Ordered list of weighers to apply in a scheduling pipeline.
107+
//
108+
// This attribute is set only if the pipeline type is filter-weigher.
109+
// These weighers are run after filters are applied.
110+
// +kubebuilder:validation:Optional
111+
Weighers []WeigherSpec `json:"weighers,omitempty"`
112+
113+
// Ordered list of detectors to apply in a descheduling pipeline.
114+
//
115+
// This attribute is set only if the pipeline type is detector.
116+
// Detectors find candidates for descheduling (migration off current host).
117+
// These detectors are run after weighers are applied.
118+
// +kubebuilder:validation:Optional
119+
Detectors []DetectorSpec `json:"detectors,omitempty"`
120+
}
121+
122+
const (
123+
FilterConditionReady = "Ready"
124+
WeigherConditionReady = "Ready"
125+
DetectorConditionReady = "Ready"
126+
)
127+
128+
type FilterStatus struct {
129+
// The name of the filter.
130+
Name string `json:"name"`
131+
132+
// The current status conditions of the filter.
133+
// +kubebuilder:validation:Optional
134+
Conditions []metav1.Condition `json:"conditions,omitempty" patchStrategy:"merge" patchMergeKey:"type"`
135+
}
136+
137+
type WeigherStatus struct {
138+
// The name of the weigher.
139+
Name string `json:"name"`
140+
141+
// The current status conditions of the weigher.
142+
// +kubebuilder:validation:Optional
143+
Conditions []metav1.Condition `json:"conditions,omitempty" patchStrategy:"merge" patchMergeKey:"type"`
144+
}
145+
146+
type DetectorStatus struct {
147+
// The name of the detector.
148+
Name string `json:"name"`
149+
150+
// The current status conditions of the detector.
151+
// +kubebuilder:validation:Optional
152+
Conditions []metav1.Condition `json:"conditions,omitempty" patchStrategy:"merge" patchMergeKey:"type"`
92153
}
93154

94155
const (
95156
// The pipeline is ready to be used.
96157
PipelineConditionReady = "Ready"
158+
// All steps in the pipeline are ready.
159+
PipelineConditionAllStepsReady = "AllStepsReady"
97160
)
98161

99162
type PipelineStatus struct {
100-
// The total number of steps configured in the pipeline.
101-
TotalSteps int `json:"totalSteps"`
102-
// The number of steps that are ready.
103-
ReadySteps int `json:"readySteps"`
104-
// An overview of the readiness of the steps in the pipeline.
105-
// Format: "ReadySteps / TotalSteps steps ready".
106-
StepsReadyFrac string `json:"stepsReadyFrac,omitempty"`
163+
// List of statuses for each filter in the pipeline.
164+
// +kubebuilder:validation:Optional
165+
Filters []FilterStatus `json:"filters,omitempty"`
166+
167+
// List of statuses for each weigher in the pipeline.
168+
// +kubebuilder:validation:Optional
169+
Weighers []WeigherStatus `json:"weighers,omitempty"`
170+
171+
// List of statuses for each detector in the pipeline.
172+
// +kubebuilder:validation:Optional
173+
Detectors []DetectorStatus `json:"detectors,omitempty"`
174+
107175
// The current status conditions of the pipeline.
108176
// +kubebuilder:validation:Optional
109177
Conditions []metav1.Condition `json:"conditions,omitempty" patchStrategy:"merge" patchMergeKey:"type"`
@@ -115,8 +183,8 @@ type PipelineStatus struct {
115183
// +kubebuilder:printcolumn:name="Created",type="date",JSONPath=".metadata.creationTimestamp"
116184
// +kubebuilder:printcolumn:name="Domain",type="string",JSONPath=".spec.schedulingDomain"
117185
// +kubebuilder:printcolumn:name="Type",type="string",JSONPath=".spec.type"
118-
// +kubebuilder:printcolumn:name="Steps",type="string",JSONPath=".status.stepsReadyFrac"
119-
// +kubebuilder:printcolumn:name="Ready",type="string",JSONPath=".status.conditions[?(@.type=='Ready')].status"
186+
// +kubebuilder:printcolumn:name="All Steps Ready",type="string",JSONPath=".status.conditions[?(@.type=='AllStepsReady')].status"
187+
// +kubebuilder:printcolumn:name="Pipeline Ready",type="string",JSONPath=".status.conditions[?(@.type=='Ready')].status"
120188

121189
// Pipeline is the Schema for the decisions API
122190
type Pipeline struct {

0 commit comments

Comments
 (0)