88 "time"
99
1010 k8ssnapshots "github.com/kubernetes-csi/external-snapshotter/client/v8/clientset/versioned"
11+ "golang.org/x/time/rate"
1112 corev1 "k8s.io/api/core/v1"
1213 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1314 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
@@ -64,9 +65,12 @@ const (
6465 OperationStatusSuccess string = "Success"
6566 OperationStatusFailed string = "Failed"
6667
67- controllerName = "crd"
68- controllerAgentName = "trident-crd-controller"
69- crdControllerQueueName = "trident-crd-workqueue"
68+ controllerName = "crd"
69+ controllerAgentName = "trident-crd-controller"
70+ crdControllerQueueName = "trident-crd-workqueue"
71+ tagriWorkqueueName = "trident-tagri-workqueue"
72+ tagriWorkqueueQPS = 8.0
73+ tagriWorkqueueBucketSize = 100
7074
7175 transactionSyncPeriod = 60 * time .Second
7276)
@@ -191,6 +195,18 @@ type TridentCrdController struct {
191195 // simultaneously in two different workers.
192196 workqueue workqueue.RateLimitingInterface
193197
198+ // tagriWorkqueue is a dedicated rate-limited queue for TridentAutogrowRequestInternal (TAGRI) CRs.
199+ // It uses a higher QPS/bucket than the main workqueue so TAGRI processing is isolated and can
200+ // drain faster without blocking or being blocked by other CR types. A dedicated worker processes it.
201+ // Uses deprecated RateLimitingInterface because KeyItem contains context.Context and is not comparable,
202+ // so TypedRateLimitingInterface[KeyItem] cannot be used (workqueue requires T comparable).
203+ tagriWorkqueue workqueue.RateLimitingInterface
204+
205+ // tagriRateLimiter is the same rate limiter used by tagriWorkqueue. Kept so we can call When(item)
206+ // to decide whether to use AddRateLimited or AddAfter(maxDuration) when ReconcileDeferredWithMaxDuration
207+ // is used (e.g. Failed phase), ensuring tagriTimeout is never exceeded.
208+ tagriRateLimiter workqueue.RateLimiter
209+
194210 // recorder is an event recorder for recording Event resources to the Kubernetes API.
195211 recorder record.EventRecorder
196212 actionMirrorUpdatesSynced func () bool
@@ -269,6 +285,13 @@ func newTridentCrdControllerImpl(
269285 eventBroadcaster .StartRecordingToSink (& typedcorev1.EventSinkImpl {Interface : kubeClientset .CoreV1 ().Events ("" )})
270286 recorder := eventBroadcaster .NewRecorder (scheme .Scheme , corev1.EventSource {Component : controllerAgentName })
271287
288+ // Use deprecated workqueue rate limiter API: KeyItem is not comparable, so TAGRI cannot use the typed queue/limiter.
289+ // 5ms and 1000s match client-go default exponential backoff;
290+ // bucket uses tagriWorkqueueQPS (8) so the default queue (10 QPS) has preference for API server capacity over TAGRI.
291+ tagriRateLimiter := workqueue .NewMaxOfRateLimiter (
292+ workqueue .NewItemExponentialFailureRateLimiter (5 * time .Millisecond , 1000 * time .Second ),
293+ & workqueue.BucketRateLimiter {Limiter : rate .NewLimiter (rate .Limit (tagriWorkqueueQPS ), tagriWorkqueueBucketSize )},
294+ )
272295 controller := & TridentCrdController {
273296 orchestrator : orchestrator ,
274297 kubeClientset : kubeClientset ,
@@ -317,6 +340,8 @@ func newTridentCrdControllerImpl(
317340 autogrowRequestInternalSynced : autogrowRequestInternalInformer .Informer ().HasSynced ,
318341 workqueue : workqueue .NewNamedRateLimitingQueue (workqueue .DefaultControllerRateLimiter (),
319342 crdControllerQueueName ),
343+ tagriRateLimiter : tagriRateLimiter ,
344+ tagriWorkqueue : workqueue .NewNamedRateLimitingQueue (tagriRateLimiter , tagriWorkqueueName ),
320345 recorder : recorder ,
321346 indexers : indexers ,
322347 nodeRemediationUtils : nodeRemediationUtils ,
@@ -452,6 +477,7 @@ func (c *TridentCrdController) Run(ctx context.Context, threadiness int, stopCh
452477
453478 defer utilruntime .HandleCrash ()
454479 defer c .workqueue .ShutDown ()
480+ defer c .tagriWorkqueue .ShutDown ()
455481
456482 // Start the informer factories to begin populating the informer caches
457483 Log ().Info ("Starting Trident CRD controller." )
@@ -478,11 +504,13 @@ func (c *TridentCrdController) Run(ctx context.Context, threadiness int, stopCh
478504 return
479505 }
480506
481- // Launch workers to process CRD resources
507+ // Launch workers to process CRD resources (main queue only; TAGRI has its own queue and worker).
482508 Logx (ctx ).Debug ("Starting workers." )
483509 for i := 0 ; i < threadiness ; i ++ {
484510 go wait .Until (c .runWorker , time .Second , stopCh )
485511 }
512+ // One dedicated worker for the TAGRI queue so TAGRI processing is isolated and does not block other CRs.
513+ go wait .Until (c .runTagriWorker , time .Second , stopCh )
486514
487515 Logx (ctx ).Debug ("Started workers." )
488516 <- stopCh
@@ -499,6 +527,15 @@ func (c *TridentCrdController) runWorker() {
499527 }
500528}
501529
530+ // runTagriWorker processes items from the dedicated TAGRI workqueue only. All requeues
531+ // (AddRateLimited, AddAfter) go to tagriWorkqueue so TAGRI handling is fully isolated.
532+ func (c * TridentCrdController ) runTagriWorker () {
533+ ctx := GenerateRequestContext (nil , "" , "" , WorkflowNone , LogLayerCRDFrontend )
534+ Logx (ctx ).Trace ("TridentCrdController runTagriWorker started." )
535+ for c .processNextTagriWorkItem () {
536+ }
537+ }
538+
502539func (c * TridentCrdController ) addEventToWorkqueue (key string , event EventType , ctx context.Context , objKind ObjectType ) {
503540 keyItem := KeyItem {
504541 key : key ,
@@ -507,6 +544,12 @@ func (c *TridentCrdController) addEventToWorkqueue(key string, event EventType,
507544 objectType : objKind ,
508545 }
509546
547+ // Route TAGRI events to the dedicated TAGRI workqueue for isolation and a higher rate limit.
548+ if objKind == ObjectTypeTridentAutogrowRequestInternal {
549+ c .tagriWorkqueue .Add (keyItem )
550+ Logx (ctx ).WithFields (LogFields {"key" : key , "kind" : objKind , "event" : event }).Debug ("Added TAGRI event to tagri workqueue." )
551+ return
552+ }
510553 c .workqueue .Add (keyItem )
511554 fields := LogFields {
512555 "key" : key ,
@@ -677,8 +720,6 @@ func (c *TridentCrdController) processNextWorkItem() bool {
677720 handleFunction = c .handleTridentNodeRemediation
678721 case ObjectTypeTridentAutogrowPolicy :
679722 handleFunction = c .handleAutogrowPolicy
680- case ObjectTypeTridentAutogrowRequestInternal :
681- handleFunction = c .handleTridentAutogrowRequestInternal
682723 default :
683724 return fmt .Errorf ("unknown objectType in the workqueue: %v" , keyItem .objectType )
684725 }
@@ -746,6 +787,101 @@ func (c *TridentCrdController) processNextWorkItem() bool {
746787 return true
747788}
748789
790+ // processNextTagriWorkItem reads one item from the TAGRI workqueue and processes it
791+ // with handleTridentAutogrowRequestInternal. All requeues use tagriWorkqueue.
792+ func (c * TridentCrdController ) processNextTagriWorkItem () bool {
793+ ctx := GenerateRequestContext (nil , "" , ContextSourceCRD , WorkflowCRReconcile , LogLayerCRDFrontend )
794+ Logx (ctx ).Trace ("TridentCrdController#processNextTagriWorkItem" )
795+
796+ obj , shutdown := c .tagriWorkqueue .Get ()
797+ if shutdown {
798+ Logx (ctx ).Trace ("TridentCrdController#processNextTagriWorkItem shutting down" )
799+ return false
800+ }
801+
802+ err := func (obj interface {}) error {
803+ defer c .tagriWorkqueue .Done (obj )
804+ var keyItem KeyItem
805+ var ok bool
806+ if keyItem , ok = obj .(KeyItem ); ! ok {
807+ c .tagriWorkqueue .Forget (obj )
808+ Logx (ctx ).Errorf ("expected KeyItem in tagri workqueue but got %#v" , obj )
809+ return nil
810+ }
811+ if keyItem == (KeyItem {}) {
812+ c .tagriWorkqueue .Forget (keyItem )
813+ return fmt .Errorf ("keyItem is empty" )
814+ }
815+
816+ keyItemName := keyItem .key
817+ if err := c .handleTridentAutogrowRequestInternal (& keyItem ); err != nil {
818+ if errors .IsUnsupportedConfigError (err ) {
819+ c .tagriWorkqueue .Forget (keyItem )
820+ Logx (keyItem .ctx ).Errorf ("found unsupported configuration, error syncing '%v', not requeuing: %v" , keyItem .key , err )
821+ } else if duration , ok := errors .ReconcileDeferredWithDurationValue (err ); ok {
822+ if duration <= 0 {
823+ // Defense in depth: handler normally deletes when at/past timeout and does not send duration <= 0.
824+ // If we see it (e.g. bug or clock skew), requeue immediately so the next run sees timeout exceeded and deletes.
825+ // Idempotency is preserved because the handler is idempotent; we only schedule another reconciliation.
826+ Logx (keyItem .ctx ).Infof ("deferred TAGRI '%v', duration <= 0 (%v), requeuing immediately so next run can handle; %v" , keyItem .key , duration , err .Error ())
827+ keyItem .isRetry = true
828+ c .tagriWorkqueue .Add (keyItem )
829+ } else {
830+ Logx (keyItem .ctx ).Infof ("deferred TAGRI '%v', requeuing with AddAfter(%v); %v" , keyItem .key , duration , err .Error ())
831+ keyItem .isRetry = true
832+ c .tagriWorkqueue .AddAfter (keyItem , duration )
833+ }
834+ } else if maxDuration , ok := errors .ReconcileDeferredWithMaxDurationValue (err ); ok {
835+ if maxDuration <= 0 {
836+ // Defense in depth: handler normally deletes when at/past timeout and does not send maxDuration <= 0.
837+ // If we see it, requeue immediately so the next run sees timeout exceeded and deletes. Idempotency preserved.
838+ Logx (keyItem .ctx ).Infof ("deferred TAGRI '%v', maxDuration <= 0 (%v), requeuing immediately so next run can handle; %v" , keyItem .key , maxDuration , err .Error ())
839+ keyItem .isRetry = true
840+ c .tagriWorkqueue .Add (keyItem )
841+ } else {
842+ nextDelay := c .tagriRateLimiter .When (keyItem )
843+ if nextDelay > maxDuration {
844+ Logx (keyItem .ctx ).Infof ("deferred TAGRI '%v', next backoff %v > max %v, requeuing with AddAfter(%v); %v" , keyItem .key , nextDelay , maxDuration , maxDuration , err .Error ())
845+ keyItem .isRetry = true
846+ // Forget first so the rate limiter does not count this as another failure; then schedule.
847+ // We are not using AddRateLimited here; the next process will happen when AddAfter(maxDuration) fires.
848+ c .tagriWorkqueue .Forget (keyItem )
849+ c .tagriWorkqueue .AddAfter (keyItem , maxDuration )
850+ } else {
851+ // Use AddAfter(nextDelay), not AddRateLimited: we already called When() above (which updated
852+ // the rate limiter). AddRateLimited would call When() again and double-count. We do not
853+ // Forget here so the limiter keeps this failure count; the next failure will get a longer delay.
854+ Logx (keyItem .ctx ).Infof ("deferred TAGRI '%v', requeuing with rate limit (next %v <= max %v); %v" , keyItem .key , nextDelay , maxDuration , err .Error ())
855+ keyItem .isRetry = true
856+ c .tagriWorkqueue .AddAfter (keyItem , nextDelay )
857+ }
858+ }
859+ } else if errors .IsReconcileDeferredError (err ) {
860+ Logx (keyItem .ctx ).Infof ("deferred syncing TAGRI '%v', requeuing; %v" , keyItem .key , err .Error ())
861+ keyItem .isRetry = true
862+ c .tagriWorkqueue .AddRateLimited (keyItem )
863+ } else if errors .IsReconcileIncompleteError (err ) {
864+ Logx (keyItem .ctx ).Errorf ("error syncing TAGRI '%v', requeuing immediately; %v" , keyItem .key , err .Error ())
865+ keyItem .isRetry = true
866+ c .tagriWorkqueue .Add (keyItem )
867+ } else {
868+ // Unhandled error type: return so caller logs it; item is not Forgotten (Done already deferred).
869+ return err
870+ }
871+ return nil
872+ }
873+
874+ c .tagriWorkqueue .Forget (obj )
875+ Logx (keyItem .ctx ).Tracef ("Synced TAGRI '%s'" , keyItemName )
876+ return nil
877+ }(obj )
878+ if err != nil {
879+ Logx (ctx ).Error (err )
880+ return true
881+ }
882+ return true
883+ }
884+
749885// getTridentBackend returns a TridentBackend CR with the given backendUUID
750886func (c * TridentCrdController ) getTridentBackend (
751887 ctx context.Context , namespace , backendUUID string ,
0 commit comments