Skip to content

Commit 4fad4bb

Browse files
committed
refactor: clean up component h-scale orchestration
1 parent cf05d91 commit 4fad4bb

11 files changed

Lines changed: 38 additions & 1323 deletions

controllers/apps/component/transformer_component_pre_terminate.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -107,13 +107,12 @@ func (t *componentPreTerminateTransformer) provisioned(transCtx *componentTransf
107107
return false, client.IgnoreNotFound(err)
108108
}
109109

110-
provisioned, err := component.GetReplicasStatusFunc(its, func(s component.ReplicaStatus) bool {
111-
return s.Provisioned
112-
})
113-
if err != nil {
114-
return false, err
110+
for _, status := range its.Status.InstanceStatus {
111+
if status.Provisioned {
112+
return true, nil
113+
}
115114
}
116-
return len(provisioned) > 0, nil
115+
return false, nil
117116
}
118117

119118
func (t *componentPreTerminateTransformer) checkPreTerminateDone(transCtx *componentTransformContext, dag *graph.DAG) bool {

controllers/apps/component/transformer_component_pre_terminate_test.go

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ import (
3737
workloads "github.com/apecloud/kubeblocks/apis/workloads/v1"
3838
appsutil "github.com/apecloud/kubeblocks/controllers/apps/util"
3939
"github.com/apecloud/kubeblocks/pkg/constant"
40-
"github.com/apecloud/kubeblocks/pkg/controller/component"
4140
"github.com/apecloud/kubeblocks/pkg/controller/graph"
4241
"github.com/apecloud/kubeblocks/pkg/controller/model"
4342
kbacli "github.com/apecloud/kubeblocks/pkg/kbagent/client"
@@ -65,10 +64,12 @@ var _ = Describe("pre-terminate transformer test", func() {
6564
}
6665

6766
provisioned := func(its *workloads.InstanceSet) {
68-
replicas := []string{
69-
fmt.Sprintf("%s-0", its.Name),
67+
its.Status.InstanceStatus = []workloads.InstanceStatus{
68+
{
69+
PodName: fmt.Sprintf("%s-0", its.Name),
70+
Provisioned: true,
71+
},
7072
}
71-
Expect(component.StatusReplicasStatus(its, replicas, false, false)).Should(Succeed())
7273
}
7374

7475
BeforeEach(func() {
@@ -199,12 +200,9 @@ var _ = Describe("pre-terminate transformer test", func() {
199200

200201
It("not provisioned", func() {
201202
its := reader.Objects[1].(*workloads.InstanceSet)
202-
Expect(component.UpdateReplicasStatusFunc(its, func(r *component.ReplicasStatus) error {
203-
for i := range r.Status {
204-
r.Status[i].Provisioned = false
205-
}
206-
return nil
207-
})).Should(Succeed())
203+
for i := range its.Status.InstanceStatus {
204+
its.Status.InstanceStatus[i].Provisioned = false
205+
}
208206

209207
transformer := &componentPreTerminateTransformer{}
210208
err := transformer.Transform(transCtx, dag)

controllers/apps/component/transformer_component_status.go

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -225,20 +225,18 @@ func (t *componentStatusTransformer) hasScaleOutRunning(transCtx *componentTrans
225225
return false, false, nil
226226
}
227227

228-
replicas, err := component.GetReplicasStatusFunc(t.protoITS, func(status component.ReplicaStatus) bool {
229-
return status.DataLoaded != nil && !*status.DataLoaded ||
230-
status.MemberJoined != nil && !*status.MemberJoined
231-
})
232-
if err != nil {
233-
return false, false, err
234-
}
235-
if len(replicas) == 0 {
236-
return false, false, nil
228+
for _, status := range t.runningITS.Status.InstanceStatus {
229+
if status.DataLoaded != nil && !*status.DataLoaded {
230+
return true, false, nil
231+
}
232+
if status.MemberJoined != nil && !*status.MemberJoined {
233+
return true, false, nil
234+
}
237235
}
238236

239237
// TODO: scale-out failed
240238

241-
return true, false, nil
239+
return false, false, nil
242240
}
243241

244242
func (t *componentStatusTransformer) hasVolumeExpansionRunning() bool {

controllers/apps/component/transformer_component_workload.go

Lines changed: 4 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ import (
2727
"golang.org/x/exp/maps"
2828
corev1 "k8s.io/api/core/v1"
2929
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
30-
"k8s.io/apimachinery/pkg/util/sets"
3130
"k8s.io/utils/ptr"
3231
"sigs.k8s.io/controller-runtime/pkg/client"
3332

@@ -102,10 +101,6 @@ func (t *componentWorkloadTransformer) reconcileWorkload(ctx context.Context, cl
102101

103102
t.buildInstanceSetPlacementAnnotation(comp, protoITS)
104103

105-
if err := t.reconcileReplicasStatus(ctx, cli, synthesizedComp, runningITS, protoITS); err != nil {
106-
return err
107-
}
108-
109104
return nil
110105
}
111106

@@ -121,43 +116,6 @@ func (t *componentWorkloadTransformer) buildInstanceSetPlacementAnnotation(comp
121116
}
122117
}
123118

124-
func (t *componentWorkloadTransformer) reconcileReplicasStatus(ctx context.Context, cli client.Reader,
125-
synthesizedComp *component.SynthesizedComponent, runningITS, protoITS *workloads.InstanceSet) error {
126-
var (
127-
namespace = synthesizedComp.Namespace
128-
clusterName = synthesizedComp.ClusterName
129-
compName = synthesizedComp.Name
130-
)
131-
132-
// HACK: sync replicas status from runningITS to protoITS
133-
component.BuildReplicasStatus(runningITS, protoITS)
134-
135-
replicas, err := func() ([]string, error) {
136-
pods, err := component.ListOwnedPods(ctx, cli, namespace, clusterName, compName)
137-
if err != nil {
138-
return nil, err
139-
}
140-
podNameSet := sets.New[string]()
141-
for _, pod := range pods {
142-
podNameSet.Insert(pod.Name)
143-
}
144-
145-
desiredPodNames, err := component.GetDesiredPodNamesByITS(runningITS, protoITS)
146-
if err != nil {
147-
return nil, err
148-
}
149-
desiredPodNameSet := sets.New(desiredPodNames...)
150-
151-
return desiredPodNameSet.Intersection(podNameSet).UnsortedList(), nil
152-
}()
153-
if err != nil {
154-
return err
155-
}
156-
157-
hasMemberJoinDefined, hasDataActionDefined := hasMemberJoinNDataActionDefined(synthesizedComp.LifecycleActions.ComponentLifecycleActions)
158-
return component.StatusReplicasStatus(protoITS, replicas, hasMemberJoinDefined, hasDataActionDefined)
159-
}
160-
161119
func (t *componentWorkloadTransformer) handleUpdate(transCtx *componentTransformContext, cli model.GraphClient, dag *graph.DAG,
162120
synthesizedComp *component.SynthesizedComponent, comp *appsv1.Component, runningITS, protoITS *workloads.InstanceSet) error {
163121
start, stop := t.handleWorkloadStartNStop(transCtx, synthesizedComp, runningITS, &protoITS)
@@ -206,13 +164,13 @@ func isCompStopped(synthesizedComp *component.SynthesizedComponent) bool {
206164
return ptr.Deref(synthesizedComp.Stop, false)
207165
}
208166

209-
func (t *componentWorkloadTransformer) handleWorkloadUpdate(transCtx *componentTransformContext, dag *graph.DAG,
210-
synthesizeComp *component.SynthesizedComponent, comp *appsv1.Component, obj, its *workloads.InstanceSet) error {
211-
cwo, err := newComponentWorkloadOps(transCtx, t.Client, synthesizeComp, comp, obj, its, dag)
167+
func (t *componentWorkloadTransformer) handleWorkloadUpdate(_ *componentTransformContext, _ *graph.DAG,
168+
synthesizeComp *component.SynthesizedComponent, _ *appsv1.Component, obj, its *workloads.InstanceSet) error {
169+
cwo, err := newComponentWorkloadOps(synthesizeComp, obj, its)
212170
if err != nil {
213171
return err
214172
}
215-
if err := cwo.horizontalScale(); err != nil {
173+
if err := cwo.validateHorizontalScale(); err != nil {
216174
return err
217175
}
218176
return nil

0 commit comments

Comments
 (0)