Skip to content

Commit 0e6f725

Browse files
authored
Merge pull request #1657 from muthusk/fix/reconciliation-storm
fix: activate NextReconcileTime guard to prevent reconciliation storms
2 parents 3d28647 + 9de58d2 commit 0e6f725

4 files changed

Lines changed: 118 additions & 14 deletions

File tree

controllers/clustersummary_controller.go

Lines changed: 103 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import (
3838
"k8s.io/apimachinery/pkg/types"
3939
"k8s.io/client-go/rest"
4040
"k8s.io/client-go/tools/events"
41+
"k8s.io/client-go/util/workqueue"
4142
clusterv1 "sigs.k8s.io/cluster-api/api/core/v1beta2"
4243
"sigs.k8s.io/cluster-api/util/annotations"
4344
ctrl "sigs.k8s.io/controller-runtime"
@@ -95,6 +96,9 @@ const (
9596

9697
const (
9798
clusterPausedMessage = "Cluster is paused"
99+
100+
rateLimiterBaseDelay = 1 * time.Second
101+
rateLimiterMaxDelay = 5 * time.Minute
98102
)
99103

100104
// ClusterSummaryReconciler reconciles a ClusterSummary object
@@ -118,7 +122,8 @@ type ClusterSummaryReconciler struct {
118122

119123
eventRecorder events.EventRecorder
120124

121-
DeletedInstances map[types.NamespacedName]time.Time
125+
DeletedInstances map[types.NamespacedName]time.Time
126+
NextReconcileTimes map[types.NamespacedName]time.Time // in-memory cooldown, survives status-patch conflicts
122127
}
123128

124129
// If the drift-detection component is deployed in the management cluster, the addon-controller will deploy ResourceSummaries within the same cluster,
@@ -178,7 +183,7 @@ func (r *ClusterSummaryReconciler) Reconcile(ctx context.Context, req ctrl.Reque
178183

179184
if r.skipReconciliation(clusterSummaryScope, req) {
180185
logger.V(logs.LogInfo).Info("ignore update")
181-
return reconcile.Result{Requeue: true, RequeueAfter: normalRequeueAfter}, nil
186+
return reconcile.Result{Requeue: true, RequeueAfter: r.remainingCooldown(clusterSummaryScope, req)}, nil
182187
}
183188

184189
var isMatch bool
@@ -205,9 +210,16 @@ func (r *ClusterSummaryReconciler) Reconcile(ctx context.Context, req ctrl.Reque
205210
}
206211

207212
// Always close the scope when exiting this function so we can persist any ClusterSummary
208-
// changes.
213+
// changes. Conflict errors are swallowed because the watch event from whatever caused the
214+
// conflict will re-enqueue this resource, and the next reconciliation will recompute status.
215+
// Propagating the conflict would cause controller-runtime to immediately requeue, bypassing
216+
// the intended NextReconcileTime backoff.
209217
defer func() {
210218
if err = clusterSummaryScope.Close(ctx); err != nil {
219+
if apierrors.IsConflict(err) {
220+
logger.V(logs.LogDebug).Info("conflict patching ClusterSummary status, will reconcile on next event")
221+
return
222+
}
211223
reterr = err
212224
}
213225
}()
@@ -420,35 +432,50 @@ func (r *ClusterSummaryReconciler) reconcileNormal(ctx context.Context,
420432
clusterSummaryScope.ClusterSummary.Status.ReconciliationSuspended = false
421433
clusterSummaryScope.ClusterSummary.Status.SuspensionReason = nil
422434

423-
err = r.startWatcherForTemplateResourceRefs(ctx, clusterSummaryScope.ClusterSummary)
435+
if result := r.prepareForDeployment(ctx, clusterSummaryScope, logger); result.RequeueAfter > 0 {
436+
return result, nil
437+
}
438+
439+
return r.proceedDeployingClusterSummary(ctx, clusterSummaryScope, logger)
440+
}
441+
442+
func (r *ClusterSummaryReconciler) prepareForDeployment(ctx context.Context,
443+
clusterSummaryScope *scope.ClusterSummaryScope, logger logr.Logger) reconcile.Result {
444+
445+
err := r.startWatcherForTemplateResourceRefs(ctx, clusterSummaryScope.ClusterSummary)
424446
if err != nil {
425447
logger.V(logs.LogInfo).Error(err, "failed to start watcher on resources referenced in TemplateResourceRefs.")
426-
return reconcile.Result{Requeue: true, RequeueAfter: deleteRequeueAfter}, nil
448+
r.setNextReconcileTime(clusterSummaryScope, deleteRequeueAfter)
449+
return reconcile.Result{RequeueAfter: deleteRequeueAfter}
427450
}
428451

429452
allDeployed, msg, err := r.areDependenciesDeployed(ctx, clusterSummaryScope, logger)
430453
if err != nil {
431-
return reconcile.Result{Requeue: true, RequeueAfter: normalRequeueAfter}, nil
454+
r.setNextReconcileTime(clusterSummaryScope, normalRequeueAfter)
455+
return reconcile.Result{RequeueAfter: normalRequeueAfter}
432456
}
433457
clusterSummaryScope.SetDependenciesMessage(&msg)
434458
if !allDeployed {
435-
return reconcile.Result{Requeue: true, RequeueAfter: normalRequeueAfter}, nil
459+
r.setNextReconcileTime(clusterSummaryScope, normalRequeueAfter)
460+
return reconcile.Result{RequeueAfter: normalRequeueAfter}
436461
}
437462

438463
err = r.updateChartMap(ctx, clusterSummaryScope, logger)
439464
if err != nil {
440-
return reconcile.Result{Requeue: true, RequeueAfter: normalRequeueAfter}, nil
465+
r.setNextReconcileTime(clusterSummaryScope, normalRequeueAfter)
466+
return reconcile.Result{RequeueAfter: normalRequeueAfter}
441467
}
442468

443469
if !clusterSummaryScope.IsContinuousWithDriftDetection() {
444470
err = r.removeResourceSummary(ctx, clusterSummaryScope, logger)
445471
if err != nil {
446472
logger.V(logs.LogInfo).Error(err, "failed to remove ResourceSummary.")
447-
return reconcile.Result{Requeue: true, RequeueAfter: normalRequeueAfter}, nil
473+
r.setNextReconcileTime(clusterSummaryScope, normalRequeueAfter)
474+
return reconcile.Result{RequeueAfter: normalRequeueAfter}
448475
}
449476
}
450477

451-
return r.proceedDeployingClusterSummary(ctx, clusterSummaryScope, logger)
478+
return reconcile.Result{}
452479
}
453480

454481
func (r *ClusterSummaryReconciler) proceedDeployingClusterSummary(ctx context.Context,
@@ -460,6 +487,7 @@ func (r *ClusterSummaryReconciler) proceedDeployingClusterSummary(ctx context.Co
460487
ok := errors.As(err, &conflictErr)
461488
if ok {
462489
logger.V(logs.LogInfo).Error(err, "failed to deploy because of conflict")
490+
r.setNextReconcileTime(clusterSummaryScope, r.ConflictRetryTime)
463491
return reconcile.Result{Requeue: true, RequeueAfter: r.ConflictRetryTime}, nil
464492
}
465493

@@ -471,6 +499,7 @@ func (r *ClusterSummaryReconciler) proceedDeployingClusterSummary(ctx context.Co
471499
"checkName", healthCheckError.CheckName,
472500
"reason", healthCheckError.InternalErr.Error(),
473501
"requeueAfter", r.HealthErrorRetryTime.String())
502+
r.setNextReconcileTime(clusterSummaryScope, r.HealthErrorRetryTime)
474503
return reconcile.Result{Requeue: true, RequeueAfter: r.HealthErrorRetryTime}, nil
475504
}
476505

@@ -513,6 +542,7 @@ func (r *ClusterSummaryReconciler) proceedDeployingClusterSummary(ctx context.Co
513542
}
514543

515544
logger.V(logs.LogInfo).Error(err, "failed to deploy")
545+
r.setNextReconcileTime(clusterSummaryScope, requeueAfter)
516546
return reconcile.Result{Requeue: true, RequeueAfter: requeueAfter}, nil
517547
}
518548

@@ -521,12 +551,59 @@ func (r *ClusterSummaryReconciler) proceedDeployingClusterSummary(ctx context.Co
521551
if clusterSummaryScope.IsDryRunSync() {
522552
r.resetFeatureStatusToProvisioning(clusterSummaryScope)
523553
// we need to keep retrying in DryRun ClusterSummaries
554+
r.setNextReconcileTime(clusterSummaryScope, dryRunRequeueAfter)
524555
return reconcile.Result{Requeue: true, RequeueAfter: dryRunRequeueAfter}, nil
525556
}
526557

527558
return reconcile.Result{}, nil
528559
}
529560

561+
// setNextReconcileTime sets NextReconcileTime on the ClusterSummary status
562+
// so that skipReconciliation() can honor the intended backoff period
563+
// even when a watch event re-enqueues the item before RequeueAfter fires.
564+
// It also records the cooldown in the reconciler's in-memory map so that the
565+
// guard works even if the status patch fails (e.g. due to a conflict).
566+
func (r *ClusterSummaryReconciler) setNextReconcileTime(
567+
clusterSummaryScope *scope.ClusterSummaryScope, d time.Duration) {
568+
569+
nextTime := time.Now().Add(d)
570+
clusterSummaryScope.ClusterSummary.Status.NextReconcileTime =
571+
&metav1.Time{Time: nextTime}
572+
573+
// Mirror in the in-memory map so skipReconciliation works even if scope.Close()
574+
// encounters a conflict and the status field is never persisted.
575+
key := types.NamespacedName{
576+
Namespace: clusterSummaryScope.ClusterSummary.Namespace,
577+
Name: clusterSummaryScope.ClusterSummary.Name,
578+
}
579+
r.PolicyMux.Lock()
580+
r.NextReconcileTimes[key] = nextTime
581+
r.PolicyMux.Unlock()
582+
}
583+
584+
// remainingCooldown returns the time remaining before the next reconciliation
585+
// should proceed, checking both the persisted status field and the in-memory map.
586+
func (r *ClusterSummaryReconciler) remainingCooldown(
587+
clusterSummaryScope *scope.ClusterSummaryScope, req ctrl.Request) time.Duration {
588+
589+
requeueAfter := normalRequeueAfter
590+
if nrt := clusterSummaryScope.ClusterSummary.Status.NextReconcileTime; nrt != nil {
591+
if remaining := time.Until(nrt.Time); remaining > 0 {
592+
requeueAfter = remaining
593+
}
594+
}
595+
596+
r.PolicyMux.Lock()
597+
if v, ok := r.NextReconcileTimes[req.NamespacedName]; ok {
598+
if remaining := time.Until(v); remaining > requeueAfter {
599+
requeueAfter = remaining
600+
}
601+
}
602+
r.PolicyMux.Unlock()
603+
604+
return requeueAfter
605+
}
606+
530607
// SetupWithManager sets up the controller with the Manager.
531608
func (r *ClusterSummaryReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager) error {
532609
c, err := ctrl.NewControllerManagedBy(mgr).
@@ -536,6 +613,10 @@ func (r *ClusterSummaryReconciler) SetupWithManager(ctx context.Context, mgr ctr
536613
).
537614
WithOptions(controller.Options{
538615
MaxConcurrentReconciles: r.ConcurrentReconciles,
616+
RateLimiter: workqueue.NewTypedItemExponentialFailureRateLimiter[reconcile.Request](
617+
rateLimiterBaseDelay,
618+
rateLimiterMaxDelay,
619+
),
539620
}).
540621
Watches(&libsveltosv1beta1.SveltosCluster{},
541622
handler.EnqueueRequestsFromMapFunc(r.requeueClusterSummaryForSveltosCluster),
@@ -578,6 +659,7 @@ func (r *ClusterSummaryReconciler) SetupWithManager(ctx context.Context, mgr ctr
578659
initializeManager(ctrl.Log.WithName("watchers"), mgr.GetConfig(), mgr.GetClient())
579660

580661
r.DeletedInstances = make(map[types.NamespacedName]time.Time)
662+
r.NextReconcileTimes = make(map[types.NamespacedName]time.Time)
581663
r.eventRecorder = mgr.GetEventRecorder("event-recorder")
582664
r.ctrl = c
583665

@@ -1642,10 +1724,19 @@ func (r *ClusterSummaryReconciler) skipReconciliation(clusterSummaryScope *scope
16421724
}
16431725
}
16441726

1645-
// Checking if reconciliation should happen
1646-
if cs.Status.NextReconcileTime != nil && time.Now().Before(cs.Status.NextReconcileTime.Time) {
1727+
// Checking if reconciliation should happen — check both the persisted status field
1728+
// and the in-memory map (which survives status-patch conflicts).
1729+
now := time.Now()
1730+
if cs.Status.NextReconcileTime != nil && now.Before(cs.Status.NextReconcileTime.Time) {
16471731
return true
16481732
}
1733+
if v, ok := r.NextReconcileTimes[req.NamespacedName]; ok {
1734+
if now.Before(v) {
1735+
return true
1736+
}
1737+
// Cooldown expired — remove from map
1738+
delete(r.NextReconcileTimes, req.NamespacedName)
1739+
}
16491740

16501741
cs.Status.NextReconcileTime = nil
16511742

controllers/clustersummary_deployer.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,16 @@ func (r *ClusterSummaryReconciler) proceedDeployingFeature(ctx context.Context,
159159
return r.proceedDeployingFeatureInPullMode(ctx, clusterSummaryScope, f, isConfigSame, currentHash, logger)
160160
}
161161

162+
// Skip status update if already provisioned with the same hash — avoids
163+
// unnecessary status patches that would trigger watch events and re-enqueue.
164+
if existingFS := getFeatureSummaryForFeatureID(clusterSummary, f.id); existingFS != nil &&
165+
existingFS.Status == libsveltosv1beta1.FeatureStatusProvisioned &&
166+
reflect.DeepEqual(existingFS.Hash, currentHash) {
167+
168+
logger.V(logs.LogDebug).Info("feature already provisioned with same hash, skipping status update")
169+
return nil
170+
}
171+
162172
r.updateFeatureStatus(clusterSummaryScope, f.id, deployerStatus, currentHash, deployerError, logger)
163173
message := fmt.Sprintf("Feature: %s deployed to cluster %s %s/%s", f.id,
164174
clusterSummary.Spec.ClusterType, clusterSummary.Spec.ClusterNamespace, clusterSummary.Spec.ClusterName)

controllers/controllers_suite_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -176,8 +176,9 @@ func getClusterSummaryReconciler(c client.Client, dep deployer.DeployerInterface
176176
Deployer: dep,
177177
ClusterMap: make(map[corev1.ObjectReference]*libsveltosset.Set),
178178
ReferenceMap: make(map[corev1.ObjectReference]*libsveltosset.Set),
179-
DeletedInstances: make(map[types.NamespacedName]time.Time),
180-
PolicyMux: sync.Mutex{},
179+
DeletedInstances: make(map[types.NamespacedName]time.Time),
180+
NextReconcileTimes: make(map[types.NamespacedName]time.Time),
181+
PolicyMux: sync.Mutex{},
181182
}
182183
}
183184

controllers/export_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,8 @@ var (
8888
AddStageStatus = addStageStatus
8989
UpdateStageStatus = updateStageStatus
9090
GetMainDeploymentClusterProfileLabels = getMainDeploymentClusterProfileLabels
91+
92+
SetNextReconcileTime = (*ClusterSummaryReconciler).setNextReconcileTime
9193
)
9294

9395
var (

0 commit comments

Comments
 (0)