Skip to content

Commit 2f5f11a

Browse files
committed
Extract aggregate logic to ApplyAggregates
Almost the same logic code was used in the aggregates and the onboarding controller, with the variation that the onboarding controller was also creating the zone aggregate, and the aggregates controller kept aggregates modified outside of the controller untouched. Let's drop that functionality and expect that all aggregates are somehow created externally (i.e. by another controller), and then we can combine that logic in one function, that also can be called by the offboarding controller. Cortex wants to rely on the aggregates in the status, so we keeping extra aggregates is unsupported either way.
1 parent 511e3cc commit 2f5f11a

8 files changed

Lines changed: 427 additions & 194 deletions

internal/controller/aggregates_controller.go

Lines changed: 2 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@ type AggregatesController struct {
5252
// +kubebuilder:rbac:groups=kvm.cloud.sap,resources=hypervisors/status,verbs=get;list;watch;create;update;patch;delete
5353

5454
func (ac *AggregatesController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
55-
log := logger.FromContext(ctx)
5655
hv := &kvmv1.Hypervisor{}
5756
if err := ac.Get(ctx, req.NamespacedName, hv); err != nil {
5857
return ctrl.Result{}, k8sclient.IgnoreNotFound(err)
@@ -69,44 +68,9 @@ func (ac *AggregatesController) Reconcile(ctx context.Context, req ctrl.Request)
6968
return ctrl.Result{}, nil
7069
}
7170

72-
aggs, err := openstack.GetAggregatesByName(ctx, ac.computeClient)
71+
err := openstack.ApplyAggregates(ctx, ac.computeClient, hv.Name, hv.Spec.Aggregates)
7372
if err != nil {
74-
err = fmt.Errorf("failed listing aggregates: %w", err)
75-
if err2 := ac.setErrorCondition(ctx, hv, err.Error()); err2 != nil {
76-
return ctrl.Result{}, errors.Join(err, err2)
77-
}
78-
return ctrl.Result{}, err
79-
}
80-
81-
toAdd := Difference(hv.Status.Aggregates, hv.Spec.Aggregates)
82-
toRemove := Difference(hv.Spec.Aggregates, hv.Status.Aggregates)
83-
84-
// We need to add first the host to the aggregates, because if we first drop
85-
// an aggregate with a filter criterion and then add a new one, we leave the host
86-
// open for period of time. Still, this may fail due to a conflict of aggregates
87-
// with different availability zones, so we collect all the errors and return them
88-
// so it hopefully will converge eventually.
89-
var errs []error
90-
if len(toAdd) > 0 {
91-
log.Info("Adding", "aggregates", toAdd)
92-
for item := range slices.Values(toAdd) {
93-
if err = openstack.AddToAggregate(ctx, ac.computeClient, aggs, hv.Name, item, ""); err != nil {
94-
errs = append(errs, err)
95-
}
96-
}
97-
}
98-
99-
if len(toRemove) > 0 {
100-
log.Info("Removing", "aggregates", toRemove)
101-
for item := range slices.Values(toRemove) {
102-
if err = openstack.RemoveFromAggregate(ctx, ac.computeClient, aggs, hv.Name, item); err != nil {
103-
errs = append(errs, err)
104-
}
105-
}
106-
}
107-
108-
if errs != nil {
109-
err = fmt.Errorf("encountered errors during aggregate update: %w", errors.Join(errs...))
73+
err = fmt.Errorf("failed to apply aggregates: %w", err)
11074
if err2 := ac.setErrorCondition(ctx, hv, err.Error()); err2 != nil {
11175
return ctrl.Result{}, errors.Join(err, err2)
11276
}

internal/controller/aggregates_controller_test.go

Lines changed: 3 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -302,15 +302,15 @@ var _ = Describe("AggregatesController", func() {
302302
Expect(cond.Message).To(ContainSubstring(expectedMessage))
303303
}
304304

305-
Context("when GetAggregates fails", func() {
305+
Context("when ApplyAggregates fails", func() {
306306
BeforeEach(func(ctx SpecContext) {
307307
By("Setting a missing aggregate")
308308
hypervisor := &kvmv1.Hypervisor{}
309309
Expect(k8sClient.Get(ctx, hypervisorName, hypervisor)).To(Succeed())
310310
hypervisor.Spec.Aggregates = []string{"test-aggregate1"}
311311
Expect(k8sClient.Update(ctx, hypervisor)).To(Succeed())
312312

313-
By("Mocking GetAggregates to fail")
313+
By("Mocking GET /os-aggregates to fail (first API call in ApplyAggregates)")
314314
fakeServer.Mux.HandleFunc("GET /os-aggregates", func(w http.ResponseWriter, r *http.Request) {
315315
w.Header().Add("Content-Type", "application/json")
316316
w.WriteHeader(http.StatusInternalServerError)
@@ -322,79 +322,7 @@ var _ = Describe("AggregatesController", func() {
322322
It("should set error condition", func(ctx SpecContext) {
323323
_, err := aggregatesController.Reconcile(ctx, reconcileRequest)
324324
Expect(err).To(HaveOccurred())
325-
sharedErrorConditionChecks(ctx, "failed listing aggregates")
326-
})
327-
})
328-
329-
Context("when AddToAggregate fails", func() {
330-
BeforeEach(func(ctx SpecContext) {
331-
By("Setting a missing aggregate")
332-
hypervisor := &kvmv1.Hypervisor{}
333-
Expect(k8sClient.Get(ctx, hypervisorName, hypervisor)).To(Succeed())
334-
hypervisor.Spec.Aggregates = []string{"test-aggregate1"}
335-
Expect(k8sClient.Update(ctx, hypervisor)).To(Succeed())
336-
337-
By("Mocking GetAggregates")
338-
fakeServer.Mux.HandleFunc("GET /os-aggregates", func(w http.ResponseWriter, r *http.Request) {
339-
w.Header().Add("Content-Type", "application/json")
340-
w.WriteHeader(http.StatusOK)
341-
_, err := fmt.Fprint(w, AggregateListBodyEmpty)
342-
Expect(err).NotTo(HaveOccurred())
343-
})
344-
345-
By("Mocking CreateAggregate")
346-
fakeServer.Mux.HandleFunc("POST /os-aggregates", func(w http.ResponseWriter, r *http.Request) {
347-
w.Header().Add("Content-Type", "application/json")
348-
w.WriteHeader(http.StatusOK)
349-
_, err := fmt.Fprint(w, AggregatesPostBody)
350-
Expect(err).NotTo(HaveOccurred())
351-
})
352-
353-
By("Mocking AddHost to fail")
354-
fakeServer.Mux.HandleFunc("POST /os-aggregates/42/action", func(w http.ResponseWriter, r *http.Request) {
355-
w.Header().Add("Content-Type", "application/json")
356-
w.WriteHeader(http.StatusConflict)
357-
_, err := fmt.Fprint(w, `{"conflictingRequest": {"message": "Cannot add host to aggregate", "code": 409}}`)
358-
Expect(err).NotTo(HaveOccurred())
359-
})
360-
})
361-
362-
It("should set error condition", func(ctx SpecContext) {
363-
_, err := aggregatesController.Reconcile(ctx, reconcileRequest)
364-
Expect(err).To(HaveOccurred())
365-
sharedErrorConditionChecks(ctx, "encountered errors during aggregate update")
366-
})
367-
})
368-
369-
Context("when RemoveFromAggregate fails", func() {
370-
BeforeEach(func(ctx SpecContext) {
371-
By("Setting existing aggregate in status")
372-
hypervisor := &kvmv1.Hypervisor{}
373-
Expect(k8sClient.Get(ctx, hypervisorName, hypervisor)).To(Succeed())
374-
hypervisor.Status.Aggregates = []string{"test-aggregate2"}
375-
Expect(k8sClient.Status().Update(ctx, hypervisor)).To(Succeed())
376-
377-
By("Mocking GetAggregates")
378-
fakeServer.Mux.HandleFunc("GET /os-aggregates", func(w http.ResponseWriter, r *http.Request) {
379-
w.Header().Add("Content-Type", "application/json")
380-
w.WriteHeader(http.StatusOK)
381-
_, err := fmt.Fprint(w, AggregateListBodyFull)
382-
Expect(err).NotTo(HaveOccurred())
383-
})
384-
385-
By("Mocking RemoveHost to fail")
386-
fakeServer.Mux.HandleFunc("POST /os-aggregates/100001/action", func(w http.ResponseWriter, r *http.Request) {
387-
w.Header().Add("Content-Type", "application/json")
388-
w.WriteHeader(http.StatusConflict)
389-
_, err := fmt.Fprint(w, `{"conflictingRequest": {"message": "Cannot remove host from aggregate", "code": 409}}`)
390-
Expect(err).NotTo(HaveOccurred())
391-
})
392-
})
393-
394-
It("should set error condition", func(ctx SpecContext) {
395-
_, err := aggregatesController.Reconcile(ctx, reconcileRequest)
396-
Expect(err).To(HaveOccurred())
397-
sharedErrorConditionChecks(ctx, "encountered errors during aggregate update")
325+
sharedErrorConditionChecks(ctx, "failed to get aggregates")
398326
})
399327
})
400328

internal/controller/decomission_controller.go

Lines changed: 4 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,8 @@ import (
2222
"errors"
2323
"fmt"
2424
"net/http"
25-
"slices"
2625

2726
"github.com/gophercloud/gophercloud/v2"
28-
"github.com/gophercloud/gophercloud/v2/openstack/compute/v2/aggregates"
2927
"github.com/gophercloud/gophercloud/v2/openstack/compute/v2/services"
3028
"github.com/gophercloud/gophercloud/v2/openstack/placement/v1/resourceproviders"
3129
"k8s.io/apimachinery/pkg/api/meta"
@@ -115,22 +113,11 @@ func (r *NodeDecommissionReconciler) Reconcile(ctx context.Context, req ctrl.Req
115113
return r.setDecommissioningCondition(ctx, hv, msg)
116114
}
117115

118-
// Before removing the service, first take the node out of the aggregates,
119-
// so when the node comes back, it doesn't up with the old associations
120-
aggs, err := openstack.GetAggregatesByName(ctx, r.computeClient)
121-
if err != nil {
122-
return r.setDecommissioningCondition(ctx, hv, fmt.Sprintf("cannot list aggregates due to %v", err))
123-
}
124-
116+
// Before removing the service, first take the node out of all aggregates,
117+
// so when the node comes back, it doesn't end up with the old associations
125118
host := hv.Name
126-
for name, aggregate := range aggs {
127-
if slices.Contains(aggregate.Hosts, host) {
128-
opts := aggregates.RemoveHostOpts{Host: host}
129-
if err = aggregates.RemoveHost(ctx, r.computeClient, aggregate.ID, opts).Err; err != nil {
130-
msg := fmt.Sprintf("failed to remove host %v from aggregate %v due to %v", name, host, err)
131-
return r.setDecommissioningCondition(ctx, hv, msg)
132-
}
133-
}
119+
if err := openstack.ApplyAggregates(ctx, r.computeClient, host, []string{}); err != nil {
120+
return r.setDecommissioningCondition(ctx, hv, fmt.Sprintf("failed to remove host from aggregates: %v", err))
134121
}
135122

136123
// Deleting and evicted, so better delete the service

internal/controller/decomission_controller_test.go

Lines changed: 2 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -404,7 +404,7 @@ var _ = Describe("Decommission Controller", func() {
404404
})
405405
})
406406

407-
Context("When removing host from aggregate fails", func() {
407+
Context("When ApplyAggregates fails", func() {
408408
BeforeEach(func() {
409409
fakeServer.Mux.HandleFunc("GET /os-hypervisors/detail", func(w http.ResponseWriter, r *http.Request) {
410410
w.Header().Add("Content-Type", "application/json")
@@ -425,37 +425,11 @@ var _ = Describe("Decommission Controller", func() {
425425
}`)
426426
})
427427

428+
// Mock only the first API call in ApplyAggregates (GET /os-aggregates) to fail
428429
fakeServer.Mux.HandleFunc("GET /os-aggregates", func(w http.ResponseWriter, r *http.Request) {
429-
w.Header().Add("Content-Type", "application/json")
430-
w.WriteHeader(http.StatusOK)
431-
fmt.Fprint(w, AggregateListWithHv)
432-
})
433-
434-
fakeServer.Mux.HandleFunc("POST /os-aggregates/100001/action", func(w http.ResponseWriter, r *http.Request) {
435430
w.WriteHeader(http.StatusInternalServerError)
436431
fmt.Fprint(w, `{"error": "Internal Server Error"}`)
437432
})
438-
439-
// Add handlers for subsequent steps even though we expect failure earlier
440-
fakeServer.Mux.HandleFunc("DELETE /os-services/service-1234", func(w http.ResponseWriter, r *http.Request) {
441-
w.WriteHeader(http.StatusNoContent)
442-
})
443-
444-
fakeServer.Mux.HandleFunc("GET /resource_providers/c48f6247-abe4-4a24-824e-ea39e108874f", func(w http.ResponseWriter, r *http.Request) {
445-
w.Header().Add("Content-Type", "application/json")
446-
w.WriteHeader(http.StatusOK)
447-
fmt.Fprint(w, `{"uuid": "rp-uuid", "name": "hv-test"}`)
448-
})
449-
450-
fakeServer.Mux.HandleFunc("GET /resource_providers/rp-uuid/allocations", func(w http.ResponseWriter, r *http.Request) {
451-
w.Header().Add("Content-Type", "application/json")
452-
w.WriteHeader(http.StatusOK)
453-
fmt.Fprint(w, `{"allocations": {}}`)
454-
})
455-
456-
fakeServer.Mux.HandleFunc("DELETE /resource_providers/rp-uuid", func(w http.ResponseWriter, r *http.Request) {
457-
w.WriteHeader(http.StatusAccepted)
458-
})
459433
})
460434

461435
It("should set decommissioning condition with error", func(ctx SpecContext) {

internal/controller/onboarding_controller.go

Lines changed: 11 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import (
2222
"errors"
2323
"fmt"
2424
"net/http"
25-
"slices"
2625
"strings"
2726
"time"
2827

@@ -197,34 +196,10 @@ func (r *OnboardingController) initialOnboarding(ctx context.Context, hv *kvmv1.
197196
return fmt.Errorf("cannot find availability-zone label %v on node", corev1.LabelTopologyZone)
198197
}
199198

200-
aggs, err := openstack.GetAggregatesByName(ctx, r.computeClient)
201-
if err != nil {
202-
return fmt.Errorf("cannot list aggregates %w", err)
203-
}
204-
205-
if err = openstack.AddToAggregate(ctx, r.computeClient, aggs, host, zone, zone); err != nil {
206-
return fmt.Errorf("failed to agg to availability-zone aggregate %w", err)
207-
}
208-
209-
err = openstack.AddToAggregate(ctx, r.computeClient, aggs, host, testAggregateName, "")
210-
if err != nil {
211-
return fmt.Errorf("failed to agg to test aggregate %w", err)
212-
}
213-
214-
var errs []error
215-
for aggregateName, aggregate := range aggs {
216-
if aggregateName == testAggregateName || aggregateName == zone {
217-
continue
218-
}
219-
if slices.Contains(aggregate.Hosts, host) {
220-
if err := openstack.RemoveFromAggregate(ctx, r.computeClient, aggs, host, aggregateName); err != nil {
221-
errs = append(errs, err)
222-
}
223-
}
224-
}
225-
226-
if len(errs) > 0 {
227-
return fmt.Errorf("failed to remove host %v from aggregates due to %w", host, errors.Join(errs...))
199+
// Apply the desired aggregate state (zone and test aggregate only)
200+
desiredAggregates := []string{zone, testAggregateName}
201+
if err := openstack.ApplyAggregates(ctx, r.computeClient, host, desiredAggregates); err != nil {
202+
return fmt.Errorf("failed to apply aggregates: %w", err)
228203
}
229204

230205
// The service may be forced down previously due to an HA event,
@@ -345,14 +320,15 @@ func (r *OnboardingController) completeOnboarding(ctx context.Context, host stri
345320
return ctrl.Result{}, err
346321
}
347322

348-
aggs, err := openstack.GetAggregatesByName(ctx, r.computeClient)
349-
if err != nil {
350-
return ctrl.Result{}, fmt.Errorf("failed to get aggregates %w", err)
323+
zone, found := hv.Labels[corev1.LabelTopologyZone]
324+
if !found || zone == "" {
325+
return ctrl.Result{}, fmt.Errorf("cannot find availability-zone label %v on node", corev1.LabelTopologyZone)
351326
}
352327

353-
err = openstack.RemoveFromAggregate(ctx, r.computeClient, aggs, host, testAggregateName)
354-
if err != nil {
355-
return ctrl.Result{}, fmt.Errorf("failed to remove from test aggregate %w", err)
328+
// Remove host from test aggregate by only keeping the zone aggregate
329+
desiredAggregates := []string{zone}
330+
if err := openstack.ApplyAggregates(ctx, r.computeClient, host, desiredAggregates); err != nil {
331+
return ctrl.Result{}, fmt.Errorf("failed to apply aggregates: %w", err)
356332
}
357333
log.Info("removed from test-aggregate", "name", testAggregateName)
358334

internal/controller/onboarding_controller_test.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -455,6 +455,8 @@ var _ = Describe("Onboarding Controller", func() {
455455
})
456456
Expect(k8sClient.Status().Update(ctx, hv)).To(Succeed())
457457

458+
// Mock for ApplyAggregates during completeOnboarding
459+
// Returns aggregates with host in both test-az and tenant_filter_tests
458460
fakeServer.Mux.HandleFunc("GET /os-aggregates", func(w http.ResponseWriter, r *http.Request) {
459461
w.Header().Add("Content-Type", "application/json")
460462
w.WriteHeader(http.StatusOK)
@@ -490,10 +492,20 @@ var _ = Describe("Onboarding Controller", func() {
490492
Expect(err).NotTo(HaveOccurred())
491493
})
492494

495+
// Mock for removing host from test aggregate (ApplyAggregates will call this)
493496
fakeServer.Mux.HandleFunc("POST /os-aggregates/99/action", func(w http.ResponseWriter, r *http.Request) {
494497
w.Header().Add("Content-Type", "application/json")
495498
w.WriteHeader(http.StatusOK)
496-
_, err := fmt.Fprint(w, addedHostToTestBody)
499+
// Return aggregate without the host after removal
500+
_, err := fmt.Fprint(w, `{
501+
"aggregate": {
502+
"name": "tenant_filter_tests",
503+
"availability_zone": "",
504+
"deleted": false,
505+
"hosts": [],
506+
"id": 99
507+
}
508+
}`)
497509
Expect(err).NotTo(HaveOccurred())
498510
})
499511

0 commit comments

Comments
 (0)