Skip to content

Commit 2212918

Browse files
committed
Implement fallback strategy when there is no metrics
Add new field to the spec - `fallbackStrategy` that controlls what amount of resources should be assigned to the instance when it's impossible to run estimations due to missing metrics. Only two modes are implemented at the moment - min and max, with default being min (to preserve the existing behaviour). Fallback strategy assigns min/max value based on either of 2 inputs: * if there are more than half of the instances are running (and more than 10 in total), find min/max among those instances and use it as fallback value for new estimations. * otherwise, use min/max from the container policy set in the spec: ``` containerPolicies: - containerName: name minAllowed: cpu: 3 memory: 12Gi maxAllowed: cpu: 8 memory: 12Gi ```
1 parent 7711ca4 commit 2212918

11 files changed

Lines changed: 314 additions & 20 deletions

File tree

api/v1/consumer_types.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,11 @@ import (
2222
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2323
)
2424

25+
// FallbackStrategy specifies how resources should be assigned
26+
// in case of missing Prometheus metrics.
27+
// Currently, two strategies are implemented: min and max
28+
type FallbackStrategy string
29+
2530
type AutoscalerType string
2631

2732
// ContainerScalingMode controls whether autoscaler is enabled for a specific
@@ -37,6 +42,9 @@ const (
3742
ContainerScalingModeAuto ContainerScalingMode = "Auto"
3843
// ContainerScalingModeOff means autoscaling is disabled for a container.
3944
ContainerScalingModeOff ContainerScalingMode = "Off"
45+
46+
FallbackStrategyMin = "min"
47+
FallbackStrategyMax = "max"
4048
)
4149

4250
// ConsumerSpec defines the desired state of Consumer
@@ -110,6 +118,8 @@ type PrometheusAutoscalerSpec struct {
110118
Offset OffsetQuerySpec `json:"offset"`
111119
Production ProductionQuerySpec `json:"production"`
112120
Consumption ConsumptionQuerySpec `json:"consumption"`
121+
// +optional
122+
FallbackStrategy *FallbackStrategy `json:"fallbackStrategy,omitempty"`
113123

114124
RatePerCore *int64 `json:"ratePerCore"`
115125
RamPerCore resource.Quantity `json:"ramPerCore"`

api/v1/zz_generated.deepcopy.go

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

config/crd/bases/konsumerator.lwolf.org_consumers.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,11 @@ spec:
9898
x-kubernetes-int-or-string: true
9999
criticalLag:
100100
type: string
101+
fallbackStrategy:
102+
description: 'FallbackStrategy specifies how resources should
103+
be assigned in case of missing Prometheus metrics. Currently,
104+
two strategies are implemented: min and max'
105+
type: string
101106
minSyncPeriod:
102107
type: string
103108
offset:

controllers/consumer_test.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,6 @@ import (
1313
konsumeratorv1 "github.com/lwolf/konsumerator/api/v1"
1414
)
1515

16-
func TestEstimateResources(t *testing.T) {
17-
18-
}
19-
2016
func TestNewConsumerOperator(t *testing.T) {
2117
testCases := []struct {
2218
name string

controllers/operator.go

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,33 @@ func (o *operator) init(consumer *konsumeratorv1.Consumer, managedInstances apps
8484
o.toEstimateInstances = make([]*appsv1.Deployment, 0)
8585
o.toCreateInstances = make([]*appsv1.Deployment, 0)
8686

87+
fallbackValue := make(map[string]*corev1.ResourceRequirements)
88+
var strategy konsumeratorv1.FallbackStrategy
89+
if o.consumer.Spec.Autoscaler.Prometheus.FallbackStrategy != nil {
90+
strategy = *o.consumer.Spec.Autoscaler.Prometheus.FallbackStrategy
91+
} else {
92+
strategy = konsumeratorv1.FallbackStrategyMin
93+
}
94+
// calculate min/max request.cpu value to use it as "fallback" value when metrics are missing,
95+
// use it only when number of instances >10 and > 50% of instances are up and running, otherwise hard fail to min/max from the policy
96+
if len(o.assignments) > 10 && len(managedInstances.Items) > (len(o.assignments)/2) {
97+
fallbackValue = calculateFallbackFromRunningInstances(managedInstances.Items, strategy)
98+
} else {
99+
/*
100+
In case metrics are missing, use fallback strategy to assign
101+
min/max resources from the container policy
102+
containerPolicies:
103+
- containerName: name
104+
minAllowed:
105+
cpu: 3
106+
memory: 12Gi
107+
maxAllowed:
108+
cpu: 8
109+
memory: 12Gi
110+
*/
111+
fallbackValue = calculateFallbackFromPolicy(consumer.Spec.ResourcePolicy, strategy)
112+
}
113+
87114
o.limiter = limiters.NewInstanceLimiter(consumer.Spec.ResourcePolicy, o.log)
88115
o.globalLimiter = limiters.NewGlobalLimiter(consumer.Spec.ResourcePolicy, o.usedResources, o.log)
89116

@@ -97,7 +124,8 @@ func (o *operator) init(consumer *konsumeratorv1.Consumer, managedInstances apps
97124
if o.consumer.Spec.Autoscaler == nil || o.consumer.Spec.Autoscaler.Prometheus == nil {
98125
return fmt.Errorf("Spec.Autoscaler.Prometheus can't be empty")
99126
}
100-
o.predictor = predictors.NewNaivePredictor(o.mp, o.consumer.Spec.Autoscaler.Prometheus)
127+
128+
o.predictor = predictors.NewNaivePredictor(o.mp, o.consumer.Spec.Autoscaler.Prometheus, fallbackValue, o.log)
101129

102130
o.syncInstanceStates(managedInstances)
103131

controllers/utils.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,3 +198,56 @@ func sumAllRequestedResourcesInPod(containerSpecs []corev1.Container) *corev1.Re
198198
}
199199
return &result
200200
}
201+
202+
func calculateFallbackFromPolicy(policy *konsumeratorv1.ResourcePolicy, strategy konsumeratorv1.FallbackStrategy) map[string]*corev1.ResourceRequirements {
203+
if policy == nil {
204+
return nil
205+
}
206+
res := make(map[string]*corev1.ResourceRequirements)
207+
switch strategy {
208+
case konsumeratorv1.FallbackStrategyMax:
209+
for i := range policy.ContainerPolicies {
210+
cp := policy.ContainerPolicies[i]
211+
res[cp.ContainerName] = &corev1.ResourceRequirements{
212+
Requests: cp.MaxAllowed,
213+
Limits: cp.MaxAllowed,
214+
}
215+
}
216+
case konsumeratorv1.FallbackStrategyMin:
217+
fallthrough
218+
default:
219+
for i := range policy.ContainerPolicies {
220+
cp := policy.ContainerPolicies[i]
221+
res[cp.ContainerName] = &corev1.ResourceRequirements{
222+
Requests: cp.MinAllowed,
223+
Limits: cp.MinAllowed,
224+
}
225+
}
226+
}
227+
return res
228+
}
229+
230+
func calculateFallbackFromRunningInstances(instances []appsv1.Deployment, strategy konsumeratorv1.FallbackStrategy) map[string]*corev1.ResourceRequirements {
231+
res := make(map[string]*corev1.ResourceRequirements)
232+
for i := range instances {
233+
deploy := instances[i]
234+
for c := range deploy.Spec.Template.Spec.Containers {
235+
container := deploy.Spec.Template.Spec.Containers[c]
236+
if _, ok := res[container.Name]; !ok {
237+
res[container.Name] = &container.Resources
238+
continue
239+
}
240+
switch strategy {
241+
case konsumeratorv1.FallbackStrategyMin:
242+
if container.Resources.Requests.Cpu().MilliValue() < res[container.Name].Requests.Cpu().MilliValue() {
243+
res[container.Name] = &container.Resources
244+
}
245+
case konsumeratorv1.FallbackStrategyMax:
246+
if container.Resources.Requests.Cpu().MilliValue() > res[container.Name].Requests.Cpu().MilliValue() {
247+
res[container.Name] = &container.Resources
248+
}
249+
}
250+
}
251+
}
252+
return res
253+
}

controllers/utils_test.go

Lines changed: 146 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package controllers
22

33
import (
4+
"github.com/lwolf/konsumerator/pkg/helpers/tests"
5+
appsv1 "k8s.io/api/apps/v1"
46
"testing"
57
"time"
68

@@ -138,8 +140,6 @@ func TestUpdateStatusAnnotations(t *testing.T) {
138140

139141
}
140142

141-
func TestResourceRequirementsDiff(t *testing.T) {}
142-
func TestResourceRequirementsSum(t *testing.T) {}
143143
func TestResourceListSum(t *testing.T) {
144144
testCases := map[string]struct {
145145
a corev1.ResourceList
@@ -189,4 +189,147 @@ func isResourceListEqual(a, b corev1.ResourceList) bool {
189189
return true
190190
}
191191

192-
func TestResourceListDiff(t *testing.T) {}
192+
func TestCalculateFallbackFromPolicy(t *testing.T) {
193+
testCases := map[string]struct {
194+
strategy konsumeratorv1.FallbackStrategy
195+
containerName []string
196+
policy *konsumeratorv1.ResourcePolicy
197+
expFallback map[string]*corev1.ResourceRequirements
198+
}{
199+
"should return min allowed by containerName in the map": {
200+
strategy: konsumeratorv1.FallbackStrategyMin,
201+
containerName: []string{"test"},
202+
policy: &konsumeratorv1.ResourcePolicy{ContainerPolicies: []konsumeratorv1.ContainerResourcePolicy{
203+
tests.NewContainerResourcePolicy("test", "100m", "100M", "2", "150M"),
204+
}},
205+
expFallback: map[string]*corev1.ResourceRequirements{"test": tests.NewResourceRequirements("100m", "100M", "100m", "100M")},
206+
},
207+
"should return max allowed by containerName in the map": {
208+
strategy: konsumeratorv1.FallbackStrategyMax,
209+
containerName: []string{"test"},
210+
policy: &konsumeratorv1.ResourcePolicy{ContainerPolicies: []konsumeratorv1.ContainerResourcePolicy{
211+
tests.NewContainerResourcePolicy("test", "100m", "100M", "2", "150M"),
212+
}},
213+
expFallback: map[string]*corev1.ResourceRequirements{"test": tests.NewResourceRequirements("2", "150M", "2", "150M")},
214+
},
215+
"multi container setup should return correct values": {
216+
strategy: konsumeratorv1.FallbackStrategyMin,
217+
containerName: []string{"test", "test2"},
218+
policy: &konsumeratorv1.ResourcePolicy{ContainerPolicies: []konsumeratorv1.ContainerResourcePolicy{
219+
tests.NewContainerResourcePolicy("test", "100m", "100M", "2", "150M"),
220+
tests.NewContainerResourcePolicy("test2", "1", "1G", "5", "2G"),
221+
}},
222+
expFallback: map[string]*corev1.ResourceRequirements{
223+
"test": tests.NewResourceRequirements("100m", "100M", "100m", "100M"),
224+
"test2": tests.NewResourceRequirements("1", "1G", "1", "1G"),
225+
},
226+
},
227+
"should return nil if no such policy exists": {
228+
strategy: konsumeratorv1.FallbackStrategyMax,
229+
containerName: []string{"test"},
230+
policy: nil,
231+
expFallback: nil,
232+
},
233+
}
234+
for testName, tc := range testCases {
235+
t.Run(testName, func(t *testing.T) {
236+
fallback := calculateFallbackFromPolicy(tc.policy, tc.strategy)
237+
if tc.expFallback == nil && fallback != nil {
238+
t.Fatalf("Fallback is expected to be nil, got %v", fallback)
239+
}
240+
if tc.expFallback != nil {
241+
for _, containerName := range tc.containerName {
242+
if helpers.CmpResourceList(fallback[containerName].Requests, tc.expFallback[containerName].Requests) != 0 {
243+
t.Errorf("Fallback results mismatch. want %v, got %v", tc.expFallback[containerName].Requests, fallback[containerName].Requests)
244+
}
245+
if helpers.CmpResourceList(fallback[containerName].Limits, tc.expFallback[containerName].Limits) != 0 {
246+
t.Errorf("Fallback results mismatch. want %v, got %v", tc.expFallback[containerName].Requests, fallback[containerName].Requests)
247+
}
248+
}
249+
}
250+
})
251+
}
252+
}
253+
254+
func TestCalculateFallbackFromRunningInstances(t *testing.T) {
255+
instances := []appsv1.Deployment{
256+
{
257+
Spec: appsv1.DeploymentSpec{
258+
Template: corev1.PodTemplateSpec{
259+
Spec: corev1.PodSpec{
260+
Containers: []corev1.Container{
261+
{Name: "busybox", Resources: *tests.NewResourceRequirements("100m", "100M", "100m", "100M")},
262+
{Name: "test", Resources: *tests.NewResourceRequirements("600m", "600M", "600m", "600M")},
263+
}},
264+
},
265+
},
266+
},
267+
{
268+
Spec: appsv1.DeploymentSpec{
269+
Template: corev1.PodTemplateSpec{
270+
Spec: corev1.PodSpec{
271+
Containers: []corev1.Container{
272+
{Name: "busybox", Resources: *tests.NewResourceRequirements("150m", "150M", "1", "100G")},
273+
{Name: "test", Resources: *tests.NewResourceRequirements("50m", "50M", "2", "200G")},
274+
}},
275+
},
276+
},
277+
},
278+
{
279+
Spec: appsv1.DeploymentSpec{
280+
Template: corev1.PodTemplateSpec{
281+
Spec: corev1.PodSpec{
282+
Containers: []corev1.Container{
283+
{Name: "busybox", Resources: *tests.NewResourceRequirements("200m", "200M", "300m", "300M")},
284+
{Name: "test", Resources: *tests.NewResourceRequirements("100m", "100M", "300m", "300M")},
285+
}},
286+
},
287+
},
288+
},
289+
}
290+
291+
testCases := map[string]struct {
292+
strategy konsumeratorv1.FallbackStrategy
293+
containerName []string
294+
instances []appsv1.Deployment
295+
expFallback map[string]*corev1.ResourceRequirements
296+
}{
297+
"min": {
298+
strategy: konsumeratorv1.FallbackStrategyMin,
299+
containerName: []string{"busybox", "test"},
300+
instances: instances,
301+
expFallback: map[string]*corev1.ResourceRequirements{
302+
"busybox": tests.NewResourceRequirements("100m", "100M", "100m", "100M"),
303+
"test": tests.NewResourceRequirements("50m", "50M", "2", "200G"),
304+
},
305+
},
306+
"max": {
307+
strategy: konsumeratorv1.FallbackStrategyMax,
308+
containerName: []string{"busybox", "test"},
309+
instances: instances,
310+
expFallback: map[string]*corev1.ResourceRequirements{
311+
"busybox": tests.NewResourceRequirements("200m", "200M", "300m", "300M"),
312+
"test": tests.NewResourceRequirements("600m", "600M", "600m", "600M"),
313+
},
314+
},
315+
}
316+
for testName, tc := range testCases {
317+
t.Run(testName, func(t *testing.T) {
318+
fallback := calculateFallbackFromRunningInstances(tc.instances, tc.strategy)
319+
if tc.expFallback == nil && fallback != nil {
320+
t.Fatalf("Fallback is expected to be nil, got %v", fallback)
321+
}
322+
if tc.expFallback != nil {
323+
for _, containerName := range tc.containerName {
324+
if helpers.CmpResourceList(fallback[containerName].Requests, tc.expFallback[containerName].Requests) != 0 {
325+
t.Errorf("Fallback results mismatch. want %v, got %v", tc.expFallback[containerName].Requests, fallback[containerName].Requests)
326+
}
327+
if helpers.CmpResourceList(fallback[containerName].Limits, tc.expFallback[containerName].Limits) != 0 {
328+
t.Errorf("Fallback results mismatch. want %v, got %v", tc.expFallback[containerName].Requests, fallback[containerName].Requests)
329+
}
330+
}
331+
}
332+
333+
})
334+
}
335+
}

hack/ci/consumer-test-configmap.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ data:
3434
# available, consumer will be scaled up to recover during
3535
# during this time.
3636
recoveryTime: "30m"
37+
fallbackStrategy: "max"
3738
# prometheus addresses to query
3839
address:
3940
- "http://prometheus-server.kube-system:9090"

0 commit comments

Comments
 (0)