From ced106604e1a048f2dd654d10691c6296881e901 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Norman=20B=C3=B6wing?= Date: Fri, 12 Jun 2026 08:02:06 +0200 Subject: [PATCH] Add throttler rate-limiting support and return 429 when rate-limited --- cmd/activator/main.go | 5 +- pkg/activator/handler/handler.go | 5 +- pkg/activator/handler/handler_test.go | 2 +- pkg/activator/net/lb_policy_test.go | 21 +++--- pkg/activator/net/throttler.go | 22 ++++-- pkg/activator/net/throttler_test.go | 24 +++--- pkg/queue/breaker.go | 67 +++++++++++----- pkg/queue/breaker_test.go | 105 +++++++++++++++++++------- pkg/queue/handler.go | 4 +- pkg/queue/handler_test.go | 14 ++-- pkg/queue/request_metric_test.go | 6 +- pkg/queue/sharedmain/main.go | 4 +- pkg/queue/sharedmain/main_test.go | 2 +- 13 files changed, 196 insertions(+), 85 deletions(-) diff --git a/cmd/activator/main.go b/cmd/activator/main.go index 27d5cfcfa5c2..4f0b0a293be9 100644 --- a/cmd/activator/main.go +++ b/cmd/activator/main.go @@ -78,6 +78,8 @@ type config struct { PodName string `split_words:"true" required:"true"` PodIP string `split_words:"true" required:"true"` + RateLimitingFactor float64 `split_words:"true" default:"0.0"` + // These are here to allow configuring higher values of keep-alive for larger environments. // TODO: run loadtests using these flags to determine optimal default values. MaxIdleProxyConns int `split_words:"true" default:"1000"` @@ -154,6 +156,7 @@ func main() { } logger.Info("Starting the knative activator") + logger.Infow("Using rate-limiting throttler", "rate-limit", env.RateLimitingFactor) // Create the transport used by both the activator->QP probe and the proxy. // It's important that the throttler and the activatorhandler share this @@ -203,7 +206,7 @@ func main() { activatornet.SetProbeSettings(probeTimeout, probeFrequency) // Start throttler. - throttler := activatornet.NewThrottler(ctx, env.PodIP) + throttler := activatornet.NewThrottler(ctx, env.PodIP, env.RateLimitingFactor) go throttler.Run(ctx, transport, networkConfig.EnableMeshPodAddressability, networkConfig.MeshCompatibilityMode) // Set up our config store diff --git a/pkg/activator/handler/handler.go b/pkg/activator/handler/handler.go index 6ce9cb508633..721938b68345 100644 --- a/pkg/activator/handler/handler.go +++ b/pkg/activator/handler/handler.go @@ -124,7 +124,10 @@ func (a *activationHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { a.logger.Errorw("Throttler try error", zap.String(logkey.Key, revID.String()), zap.Error(err)) - if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, queue.ErrRequestQueueFull) { + if errors.Is(err, queue.ErrRequestQueueFull) { + a.logger.Infof("Rate-limited %s/%s", revID.Namespace, revID.Name) + http.Error(w, err.Error(), http.StatusTooManyRequests) + } else if errors.Is(err, context.DeadlineExceeded) { http.Error(w, err.Error(), http.StatusServiceUnavailable) } else { w.WriteHeader(http.StatusInternalServerError) diff --git a/pkg/activator/handler/handler_test.go b/pkg/activator/handler/handler_test.go index 91f70b5939ce..ac1425c0632a 100644 --- a/pkg/activator/handler/handler_test.go +++ b/pkg/activator/handler/handler_test.go @@ -95,7 +95,7 @@ func TestActivationHandler(t *testing.T) { }, { name: "overflow", wantBody: "pending request queue full\n", - wantCode: http.StatusServiceUnavailable, + wantCode: http.StatusTooManyRequests, throttler: fakeThrottler{err: queue.ErrRequestQueueFull}, }} for _, test := range tests { diff --git a/pkg/activator/net/lb_policy_test.go b/pkg/activator/net/lb_policy_test.go index ffdd4fb098d5..15345720887e 100644 --- a/pkg/activator/net/lb_policy_test.go +++ b/pkg/activator/net/lb_policy_test.go @@ -126,9 +126,10 @@ func TestFirstAvailable(t *testing.T) { podTrackers := []*podTracker{{ dest: "this-is-nowhere", b: queue.NewBreaker(queue.BreakerParams{ - QueueDepth: 1, - MaxConcurrency: 1, - InitialCapacity: 1, + Concurrency: 1, + MaxQueueDepth: 1, + InitialCapacity: 1, + RateLimitingFactor: rateLimitingFactor, }), }} @@ -152,16 +153,18 @@ func TestFirstAvailable(t *testing.T) { podTrackers := []*podTracker{{ dest: "down-by-the-river", b: queue.NewBreaker(queue.BreakerParams{ - QueueDepth: 1, - MaxConcurrency: 1, - InitialCapacity: 1, + Concurrency: 1, + MaxQueueDepth: 1, + InitialCapacity: 1, + RateLimitingFactor: rateLimitingFactor, }), }, { dest: "heart-of-gold", b: queue.NewBreaker(queue.BreakerParams{ - QueueDepth: 1, - MaxConcurrency: 1, - InitialCapacity: 1, + Concurrency: 1, + MaxQueueDepth: 1, + InitialCapacity: 1, + RateLimitingFactor: rateLimitingFactor, }), }} diff --git a/pkg/activator/net/throttler.go b/pkg/activator/net/throttler.go index a3c6ebe1171f..09cb8183cf34 100644 --- a/pkg/activator/net/throttler.go +++ b/pkg/activator/net/throttler.go @@ -55,6 +55,9 @@ const ( // across the entire revision), and for the individual podTracker breakers. breakerQueueDepth = 10000 + // Do not have a smaller queue than 100 entries + minBreakerQueueDepth = 100 + // The revisionThrottler breaker's concurrency increases up to this value as // new endpoints show up. We need to set some value here since the breaker // requires an explicit buffer size (it's backed by a chan struct{}), but @@ -149,7 +152,8 @@ type revisionThrottler struct { backendCount int // This is a breaker for the revision as a whole. - breaker breaker + breaker breaker + rateLimitingFactor float64 // This will be non-empty when we're able to use pod addressing. podTrackers []*podTracker @@ -199,6 +203,7 @@ func newRevisionThrottler(revID types.NamespacedName, logger: logger, protocol: proto, lbPolicy: lbp, + rateLimitingFactor: breakerParams.RateLimitingFactor, } // Start with unknown @@ -436,9 +441,11 @@ func (rt *revisionThrottler) handleUpdate(update revisionDestsUpdate) { tracker = newPodTracker(newDest, nil) } else { tracker = newPodTracker(newDest, queue.NewBreaker(queue.BreakerParams{ - QueueDepth: breakerQueueDepth, - MaxConcurrency: rt.containerConcurrency, - InitialCapacity: rt.containerConcurrency, // Presume full unused capacity. + Concurrency: rt.containerConcurrency, + MinQueueDepth: minBreakerQueueDepth, + MaxQueueDepth: rt.containerConcurrency, + InitialCapacity: rt.containerConcurrency, // Presume full unused capacity. + RateLimitingFactor: rt.rateLimitingFactor, })) } } @@ -459,17 +466,19 @@ type Throttler struct { revisionThrottlersMutex sync.RWMutex revisionLister servinglisters.RevisionLister ipAddress string // The IP address of this activator. + rateLimitingFactor float64 logger *zap.SugaredLogger epsUpdateCh chan *corev1.Endpoints } // NewThrottler creates a new Throttler -func NewThrottler(ctx context.Context, ipAddr string) *Throttler { +func NewThrottler(ctx context.Context, ipAddr string, rateLimitingFactor float64) *Throttler { revisionInformer := revisioninformer.Get(ctx) t := &Throttler{ revisionThrottlers: make(map[types.NamespacedName]*revisionThrottler), revisionLister: revisionInformer.Lister(), ipAddress: ipAddr, + rateLimitingFactor: rateLimitingFactor, logger: logging.FromContext(ctx), epsUpdateCh: make(chan *corev1.Endpoints), } @@ -552,7 +561,8 @@ func (t *Throttler) getOrCreateRevisionThrottler(revID types.NamespacedName) (*r revID, int(rev.Spec.GetContainerConcurrency()), pkgnet.ServicePortName(rev.GetProtocol()), - queue.BreakerParams{QueueDepth: breakerQueueDepth, MaxConcurrency: revisionMaxConcurrency}, + queue.BreakerParams{Concurrency: int(rev.Spec.GetContainerConcurrency()), MinQueueDepth: minBreakerQueueDepth, MaxQueueDepth: revisionMaxConcurrency, + RateLimitingFactor: t.rateLimitingFactor}, t.logger, ) t.revisionThrottlers[revID] = revThrottler diff --git a/pkg/activator/net/throttler_test.go b/pkg/activator/net/throttler_test.go index ed727a26c79b..4132f22c98bc 100644 --- a/pkg/activator/net/throttler_test.go +++ b/pkg/activator/net/throttler_test.go @@ -48,10 +48,13 @@ import ( "knative.dev/serving/pkg/queue" ) +const rateLimitingFactor = 3.0 + var testBreakerParams = queue.BreakerParams{ - QueueDepth: 1, - MaxConcurrency: revisionMaxConcurrency, - InitialCapacity: 0, + Concurrency: 1, + MaxQueueDepth: revisionMaxConcurrency, + InitialCapacity: 0, + RateLimitingFactor: rateLimitingFactor, } type tryResult struct { @@ -60,7 +63,7 @@ type tryResult struct { } func newTestThrottler(ctx context.Context) *Throttler { - return NewThrottler(ctx, "10.10.10.10") + return NewThrottler(ctx, "10.10.10.10", rateLimitingFactor) } func TestThrottlerUpdateCapacity(t *testing.T) { @@ -280,9 +283,10 @@ func makeTrackers(num, cc int) []*podTracker { x[i] = newPodTracker(strconv.Itoa(i), nil) if cc > 0 { x[i].b = queue.NewBreaker(queue.BreakerParams{ - QueueDepth: 1, - MaxConcurrency: cc, - InitialCapacity: cc, + Concurrency: 1, + MaxQueueDepth: cc, + InitialCapacity: cc, + RateLimitingFactor: rateLimitingFactor, }) } } @@ -509,7 +513,7 @@ func TestThrottlerSuccesses(t *testing.T) { updateCh := make(chan revisionDestsUpdate) - throttler := NewThrottler(ctx, "130.0.0.2") + throttler := NewThrottler(ctx, "130.0.0.2", rateLimitingFactor) var grp errgroup.Group grp.Go(func() error { throttler.run(updateCh); return nil }) // Ensure the throttler stopped before we leave the test, so that @@ -729,7 +733,7 @@ func TestActivatorsIndexUpdate(t *testing.T) { updateCh := make(chan revisionDestsUpdate) - throttler := NewThrottler(ctx, "130.0.0.2") + throttler := NewThrottler(ctx, "130.0.0.2", rateLimitingFactor) var grp errgroup.Group grp.Go(func() error { throttler.run(updateCh); return nil }) // Ensure the throttler stopped before we leave the test, so that @@ -824,7 +828,7 @@ func TestMultipleActivators(t *testing.T) { updateCh := make(chan revisionDestsUpdate) - throttler := NewThrottler(ctx, "130.0.0.2") + throttler := NewThrottler(ctx, "130.0.0.2", rateLimitingFactor) var grp errgroup.Group grp.Go(func() error { throttler.run(updateCh); return nil }) // Ensure the throttler stopped before we leave the test, so that diff --git a/pkg/queue/breaker.go b/pkg/queue/breaker.go index 918f57b743a5..87e83fe57036 100644 --- a/pkg/queue/breaker.go +++ b/pkg/queue/breaker.go @@ -21,6 +21,7 @@ import ( "errors" "fmt" "math" + "sync" "sync/atomic" ) @@ -33,9 +34,11 @@ const MaxBreakerCapacity = math.MaxInt32 // BreakerParams defines the parameters of the breaker. type BreakerParams struct { - QueueDepth int - MaxConcurrency int - InitialCapacity int + Concurrency int + MinQueueDepth int + MaxQueueDepth int + InitialCapacity int + RateLimitingFactor float64 } // Breaker is a component that enforces a concurrency limit on the @@ -43,9 +46,13 @@ type BreakerParams struct { // executions in excess of the concurrency limit. Function call attempts // beyond the limit of the queue are failed immediately. type Breaker struct { - inFlight atomic.Int64 - totalSlots int64 - sem *semaphore + inFlight atomic.Int64 + totalSlots int64 + slotsMutex sync.RWMutex + rateLimitingFactor float64 + minQueueDepth int + maxQueueDepth int + sem *semaphore // release is the callback function returned to callers by Reserve to // allow the reservation made by Reserve to be released. @@ -55,19 +62,29 @@ type Breaker struct { // NewBreaker creates a Breaker with the desired queue depth, // concurrency limit and initial capacity. func NewBreaker(params BreakerParams) *Breaker { - if params.QueueDepth <= 0 { - panic(fmt.Sprintf("Queue depth must be greater than 0. Got %v.", params.QueueDepth)) + if params.Concurrency <= 0 { + panic(fmt.Sprintf("Concurrency must be greater than 0. Got %v.", params.Concurrency)) } - if params.MaxConcurrency < 0 { - panic(fmt.Sprintf("Max concurrency must be 0 or greater. Got %v.", params.MaxConcurrency)) + if params.RateLimitingFactor < 0 { + panic(fmt.Sprintf("Rate limiting factor must be 0 or greater. Got %v.", params.RateLimitingFactor)) } - if params.InitialCapacity < 0 || params.InitialCapacity > params.MaxConcurrency { + if params.MinQueueDepth < 0 { + panic(fmt.Sprintf("Min queue depth must be 0 or greater. Got %v.", params.MinQueueDepth)) + } + if params.MaxQueueDepth < 0 { + panic(fmt.Sprintf("Max queue depth must be 0 or greater. Got %v.", params.MaxQueueDepth)) + } + if params.InitialCapacity < 0 || params.InitialCapacity > params.MaxQueueDepth { panic(fmt.Sprintf("Initial capacity must be between 0 and max concurrency. Got %v.", params.InitialCapacity)) } b := &Breaker{ - totalSlots: int64(params.QueueDepth + params.MaxConcurrency), - sem: newSemaphore(params.MaxConcurrency, params.InitialCapacity), + rateLimitingFactor: params.RateLimitingFactor, + minQueueDepth: params.MinQueueDepth, + maxQueueDepth: params.MaxQueueDepth, + totalSlots: calculateQueueDepth(params.Concurrency, int64(params.MinQueueDepth), int64(params.MaxQueueDepth), params.RateLimitingFactor), + slotsMutex: sync.RWMutex{}, + sem: newSemaphore(params.MaxQueueDepth, params.InitialCapacity), } // Allocating the closure returned by Reserve here avoids an allocation in Reserve. @@ -97,6 +114,8 @@ func (b *Breaker) tryAcquirePending() bool { // anymore. for { cur := b.inFlight.Load() + b.slotsMutex.RLock() + defer b.slotsMutex.RUnlock() if cur == b.totalSlots { return false } @@ -160,8 +179,11 @@ func (b *Breaker) InFlight() int { } // UpdateConcurrency updates the maximum number of in-flight requests. -func (b *Breaker) UpdateConcurrency(size int) { - b.sem.updateCapacity(size) +func (b *Breaker) UpdateConcurrency(semaphoreSize int) { + b.slotsMutex.Lock() + defer b.slotsMutex.Unlock() + b.totalSlots = calculateQueueDepth(semaphoreSize, int64(b.minQueueDepth), int64(b.maxQueueDepth), b.rateLimitingFactor) + b.sem.updateCapacity(semaphoreSize) } // Capacity returns the number of allowed in-flight requests on this breaker. @@ -177,6 +199,17 @@ func newSemaphore(maxCapacity, initialCapacity int) *semaphore { return sem } +func calculateQueueDepth(concurrency int, minQueueDepth, maxQueueDepth int64, rateLimitingFactor float64) int64 { + // num_concurrency requests are let through the breaker, so we must allow num_concurrency * (rate_limit + 1) in the queue + // since inflight requests are part of the queue + // all values non-negative so casting is ok here + maxAllowedConcurrency := int64(float64(concurrency) * (rateLimitingFactor + 1)) + if rateLimitingFactor == 0.0 { // no rate limiting; var is exactly set + maxAllowedConcurrency = maxQueueDepth + } + return int64(max(minQueueDepth, min(maxAllowedConcurrency, maxQueueDepth))) +} + // semaphore is an implementation of a semaphore based on packed integers and a channel. // state is an uint64 that has two uint32s packed into it: capacity and inFlight. The // former specifies how many request are allowed at any given time into the semaphore @@ -261,8 +294,8 @@ func (s *semaphore) release() { } // updateCapacity updates the capacity of the semaphore to the desired size. -func (s *semaphore) updateCapacity(size int) { - s64 := uint64(size) //nolint:gosec // TODO(dprotaso) capacity should be uint +func (s *semaphore) updateCapacity(semaphoreSize int) { + s64 := uint64(semaphoreSize) //nolint:gosec // TODO(dprotaso) capacity should be uint for { old := s.state.Load() capacity, in := unpack(old) diff --git a/pkg/queue/breaker_test.go b/pkg/queue/breaker_test.go index 547959a1da54..0f239871c137 100644 --- a/pkg/queue/breaker_test.go +++ b/pkg/queue/breaker_test.go @@ -19,6 +19,7 @@ package queue import ( "context" "fmt" + "sync/atomic" "testing" "time" ) @@ -31,6 +32,8 @@ const ( // semNoChangeTimeout is some additional wait time after a number // of acquires is reached to assert that no more acquires get through. semNoChangeTimeout = 50 * time.Millisecond + + rateLimitingFactor = 3.0 ) func TestBreakerInvalidConstructor(t *testing.T) { @@ -38,17 +41,23 @@ func TestBreakerInvalidConstructor(t *testing.T) { name string options BreakerParams }{{ - name: "QueueDepth = 0", - options: BreakerParams{QueueDepth: 0, MaxConcurrency: 1, InitialCapacity: 1}, + name: "Concurrency = 0", + options: BreakerParams{Concurrency: 0, MaxQueueDepth: 1, InitialCapacity: 1}, }, { - name: "MaxConcurrency negative", - options: BreakerParams{QueueDepth: 1, MaxConcurrency: -1, InitialCapacity: 1}, + name: "MaxQueueDepth negative", + options: BreakerParams{Concurrency: 1, MaxQueueDepth: -1, InitialCapacity: 1}, }, { name: "InitialCapacity negative", - options: BreakerParams{QueueDepth: 1, MaxConcurrency: 1, InitialCapacity: -1}, + options: BreakerParams{Concurrency: 1, MaxQueueDepth: 1, InitialCapacity: -1}, }, { name: "InitialCapacity out-of-bounds", - options: BreakerParams{QueueDepth: 1, MaxConcurrency: 5, InitialCapacity: 6}, + options: BreakerParams{Concurrency: 1, MaxQueueDepth: 5, InitialCapacity: 6}, + }, { + name: "Rate limiting factor negative", + options: BreakerParams{Concurrency: 1, RateLimitingFactor: -1}, + }, { + name: "MinQueueDepth negative", + options: BreakerParams{Concurrency: 1, RateLimitingFactor: 1, MinQueueDepth: -1}, }} for _, test := range tests { @@ -65,7 +74,7 @@ func TestBreakerInvalidConstructor(t *testing.T) { } func TestBreakerReserveOverload(t *testing.T) { - params := BreakerParams{QueueDepth: 1, MaxConcurrency: 1, InitialCapacity: 1} + params := BreakerParams{Concurrency: 1, MaxQueueDepth: 1, InitialCapacity: 1, RateLimitingFactor: rateLimitingFactor} b := NewBreaker(params) // Breaker capacity = 2 cb1, rr := b.Reserve(context.Background()) if !rr { @@ -87,16 +96,13 @@ func TestBreakerReserveOverload(t *testing.T) { func TestBreakerOverloadMixed(t *testing.T) { // This tests when reservation and maybe are intermised. - params := BreakerParams{QueueDepth: 1, MaxConcurrency: 1, InitialCapacity: 1} - b := NewBreaker(params) // Breaker capacity = 2 + params := BreakerParams{Concurrency: 1, MaxQueueDepth: 1, InitialCapacity: 1, RateLimitingFactor: rateLimitingFactor} + b := NewBreaker(params) // Breaker capacity = 1 reqs := newRequestor(b) // Bring breaker to capacity. - reqs.request() - // This happens in go-routine, so spin. - for _, in := unpack(b.sem.state.Load()); in != 1; _, in = unpack(b.sem.state.Load()) { - time.Sleep(time.Millisecond * 2) - } + reqs.requestAndWaitVerifyInflight(&b.sem.state, 1) + _, rr := b.Reserve(context.Background()) if rr { t.Fatal("Reserve was an unexpected success.") @@ -113,13 +119,50 @@ func TestBreakerOverloadMixed(t *testing.T) { } func TestBreakerOverload(t *testing.T) { - params := BreakerParams{QueueDepth: 1, MaxConcurrency: 1, InitialCapacity: 1} - b := NewBreaker(params) // Breaker capacity = 2 + params := BreakerParams{Concurrency: 1, MaxQueueDepth: 1, InitialCapacity: 1, RateLimitingFactor: rateLimitingFactor} + b := NewBreaker(params) // Breaker capacity = 1 reqs := newRequestor(b) // Bring breaker to capacity. + reqs.requestAndWaitVerifyInflight(&b.sem.state, 1) + + // Overshoot by one. reqs.request() + reqs.expectFailure(t) + + // The remainder should succeed. + reqs.processSuccessfully(t) +} + +func TestBreakerOverloadDueToRateLimiting(t *testing.T) { + params := BreakerParams{Concurrency: 1, MaxQueueDepth: 10000000, InitialCapacity: 1, RateLimitingFactor: 1.0} + b := NewBreaker(params) // Breaker capacity = 2 + reqs := newRequestor(b) + + // Bring breaker to capacity. + reqs.requestAndWaitVerifyInflight(&b.sem.state, 1) + // queue one + reqs.requestAndWaitVerifyInflight(&b.sem.state, 1) + + // Overshoot by one. reqs.request() + reqs.expectFailure(t) + + // The remainder should succeed. + reqs.processSuccessfully(t) + reqs.processSuccessfully(t) +} + +func TestBreakerNoOverloadDueToRateLimiting(t *testing.T) { + params := BreakerParams{Concurrency: 1, MinQueueDepth: 3, MaxQueueDepth: 10000000, InitialCapacity: 1, RateLimitingFactor: 1.0} + b := NewBreaker(params) // Breaker capacity = 3 + reqs := newRequestor(b) + + // Bring breaker to capacity. + reqs.requestAndWaitVerifyInflight(&b.sem.state, 1) + // queue two + reqs.requestAndWaitVerifyInflight(&b.sem.state, 1) + reqs.requestAndWaitVerifyInflight(&b.sem.state, 1) // Overshoot by one. reqs.request() @@ -128,16 +171,17 @@ func TestBreakerOverload(t *testing.T) { // The remainder should succeed. reqs.processSuccessfully(t) reqs.processSuccessfully(t) + reqs.processSuccessfully(t) } func TestBreakerQueueing(t *testing.T) { - params := BreakerParams{QueueDepth: 1, MaxConcurrency: 1, InitialCapacity: 0} + params := BreakerParams{Concurrency: 1, MaxQueueDepth: 2, InitialCapacity: 0, RateLimitingFactor: rateLimitingFactor} b := NewBreaker(params) // Breaker capacity = 2 reqs := newRequestor(b) // Bring breaker to capacity. Doesn't error because queue subsumes these requests. - reqs.request() - reqs.request() + reqs.requestAndWaitVerifyInflight(&b.sem.state, 0) + reqs.requestAndWaitVerifyInflight(&b.sem.state, 0) // Update concurrency to allow the requests to be processed. b.UpdateConcurrency(1) @@ -148,13 +192,14 @@ func TestBreakerQueueing(t *testing.T) { } func TestBreakerNoOverload(t *testing.T) { - params := BreakerParams{QueueDepth: 1, MaxConcurrency: 1, InitialCapacity: 1} + params := BreakerParams{Concurrency: 1, MaxQueueDepth: 2, InitialCapacity: 1, RateLimitingFactor: rateLimitingFactor} b := NewBreaker(params) // Breaker capacity = 2 reqs := newRequestor(b) // Bring request to capacity. - reqs.request() - reqs.request() + reqs.requestAndWaitVerifyInflight(&b.sem.state, 1) + // queue one + reqs.requestAndWaitVerifyInflight(&b.sem.state, 1) // Process one, send a new one in, at capacity again. reqs.processSuccessfully(t) @@ -170,7 +215,7 @@ func TestBreakerNoOverload(t *testing.T) { } func TestBreakerCancel(t *testing.T) { - params := BreakerParams{QueueDepth: 1, MaxConcurrency: 1, InitialCapacity: 0} + params := BreakerParams{Concurrency: 1, MaxQueueDepth: 1, InitialCapacity: 0, RateLimitingFactor: 5} b := NewBreaker(params) reqs := newRequestor(b) @@ -209,7 +254,7 @@ func TestBreakerCancel(t *testing.T) { } func TestBreakerUpdateConcurrency(t *testing.T) { - params := BreakerParams{QueueDepth: 1, MaxConcurrency: 1, InitialCapacity: 0} + params := BreakerParams{Concurrency: 1, MaxQueueDepth: 1, InitialCapacity: 0, RateLimitingFactor: rateLimitingFactor} b := NewBreaker(params) b.UpdateConcurrency(1) if got, want := b.Capacity(), 1; got != want { @@ -343,6 +388,14 @@ func (r *requestor) request() { r.requestWithContext(context.Background()) } +func (r *requestor) requestAndWaitVerifyInflight(inflight *atomic.Uint64, expectedInflight uint64) { + r.request() + // This happens in go-routine, so spin. + for _, in := unpack(inflight.Load()); in != expectedInflight; _, in = unpack(inflight.Load()) { + time.Sleep(time.Millisecond * 2) + } +} + // requestWithContext simulates a request in a separate goroutine. The // request will either fail immediately (as observable via expectFailure) // or block until processSuccessfully is called. @@ -377,7 +430,7 @@ func BenchmarkBreakerMaybe(b *testing.B) { op := func() {} for _, c := range []int{1, 10, 100, 1000} { - breaker := NewBreaker(BreakerParams{QueueDepth: 10000000, MaxConcurrency: c, InitialCapacity: c}) + breaker := NewBreaker(BreakerParams{Concurrency: 10000000, MaxQueueDepth: c, InitialCapacity: c, RateLimitingFactor: rateLimitingFactor}) b.Run(fmt.Sprintf("%d-sequential", c), func(b *testing.B) { for range b.N { @@ -397,7 +450,7 @@ func BenchmarkBreakerMaybe(b *testing.B) { func BenchmarkBreakerReserve(b *testing.B) { op := func() {} - breaker := NewBreaker(BreakerParams{QueueDepth: 1, MaxConcurrency: 10000000, InitialCapacity: 10000000}) + breaker := NewBreaker(BreakerParams{Concurrency: 1, MaxQueueDepth: 10000000, InitialCapacity: 10000000, RateLimitingFactor: rateLimitingFactor}) b.Run("sequential", func(b *testing.B) { for range b.N { diff --git a/pkg/queue/handler.go b/pkg/queue/handler.go index f9c472f6b474..cad2131b2d1d 100644 --- a/pkg/queue/handler.go +++ b/pkg/queue/handler.go @@ -72,7 +72,9 @@ func ProxyHandler( next.ServeHTTP(w, r) }); err != nil { waitSpan.End() - if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, ErrRequestQueueFull) { + if errors.Is(err, ErrRequestQueueFull) { + http.Error(w, err.Error(), http.StatusTooManyRequests) + } else if errors.Is(err, context.DeadlineExceeded) { http.Error(w, err.Error(), http.StatusServiceUnavailable) } else { // This line is most likely untestable :-). diff --git a/pkg/queue/handler_test.go b/pkg/queue/handler_test.go index 3245d03f78bf..7cd55eb25010 100644 --- a/pkg/queue/handler_test.go +++ b/pkg/queue/handler_test.go @@ -52,7 +52,7 @@ func TestHandlerBreakerQueueFull(t *testing.T) { <-resp }) breaker := NewBreaker(BreakerParams{ - QueueDepth: 1, MaxConcurrency: 1, InitialCapacity: 1, + Concurrency: 1, MaxQueueDepth: 2, InitialCapacity: 1, }) stats := netstats.NewRequestStats(time.Now()) h := ProxyHandler(tracer, breaker, stats, blockHandler) @@ -70,7 +70,7 @@ func TestHandlerBreakerQueueFull(t *testing.T) { // One of the three requests fails and it should be the first we see since the others // are still held by the resp channel. failure := <-resps - if got, want := failure.Code, http.StatusServiceUnavailable; got != want { + if got, want := failure.Code, http.StatusTooManyRequests; got != want { t.Errorf("Code = %d, want: %d", got, want) } const want = "pending request queue full" @@ -104,7 +104,7 @@ func TestHandlerBreakerTimeout(t *testing.T) { <-resp }) breaker := NewBreaker(BreakerParams{ - QueueDepth: 1, MaxConcurrency: 1, InitialCapacity: 1, + Concurrency: 1, MaxQueueDepth: 3, InitialCapacity: 1, RateLimitingFactor: 3, }) stats := netstats.NewRequestStats(time.Now()) h := ProxyHandler(tracer, breaker, stats, blockHandler) @@ -132,7 +132,7 @@ func TestHandlerBreakerTimeout(t *testing.T) { } func TestHandlerReqEvent(t *testing.T) { - params := BreakerParams{QueueDepth: 10, MaxConcurrency: 10, InitialCapacity: 10} + params := BreakerParams{Concurrency: 10, MaxQueueDepth: 10, InitialCapacity: 10, RateLimitingFactor: 3} breaker := NewBreaker(params) for _, br := range []*Breaker{breaker, nil} { t.Run(fmt.Sprint("Breaker?=", br == nil), func(t *testing.T) { @@ -231,7 +231,7 @@ func TestIgnoreProbe(t *testing.T) { proxy := httputil.NewSingleHostReverseProxy(serverURL) // Ensure no more than 1 request can be queued. So we'll send 3. - breaker := NewBreaker(BreakerParams{QueueDepth: 1, MaxConcurrency: 1, InitialCapacity: 1}) + breaker := NewBreaker(BreakerParams{Concurrency: 1, MaxQueueDepth: 1, InitialCapacity: 1, RateLimitingFactor: 3}) stats := netstats.NewRequestStats(time.Now()) h := ProxyHandler(tracer, breaker, stats, proxy) @@ -262,7 +262,7 @@ func BenchmarkProxyHandler(b *testing.B) { reportPeriod time.Duration }{{ label: "breaker-10-no-reports", - breaker: NewBreaker(BreakerParams{QueueDepth: 10, MaxConcurrency: 10, InitialCapacity: 10}), + breaker: NewBreaker(BreakerParams{Concurrency: 10, MaxQueueDepth: 10, InitialCapacity: 10, RateLimitingFactor: 3}), reportPeriod: time.Hour, }, { label: "breaker-infinite-no-reports", @@ -270,7 +270,7 @@ func BenchmarkProxyHandler(b *testing.B) { reportPeriod: time.Hour, }, { label: "breaker-10-many-reports", - breaker: NewBreaker(BreakerParams{QueueDepth: 10, MaxConcurrency: 10, InitialCapacity: 10}), + breaker: NewBreaker(BreakerParams{Concurrency: 10, MaxQueueDepth: 10, InitialCapacity: 10, RateLimitingFactor: 3}), reportPeriod: time.Microsecond, }, { label: "breaker-infinite-many-reports", diff --git a/pkg/queue/request_metric_test.go b/pkg/queue/request_metric_test.go index 0b82faffb61b..7e03c22ee33b 100644 --- a/pkg/queue/request_metric_test.go +++ b/pkg/queue/request_metric_test.go @@ -46,7 +46,7 @@ func TestAppRequestMetricsHandlerPanickingHandler(t *testing.T) { fakeClock.SetTime(fakeClock.Now().Add(time.Second)) panic("no!") }) - breaker := NewBreaker(BreakerParams{QueueDepth: 10, MaxConcurrency: 10, InitialCapacity: 10}) + breaker := NewBreaker(BreakerParams{Concurrency: 10, MaxQueueDepth: 10, InitialCapacity: 10, RateLimitingFactor: 3}) // Increment the breaker to report 1 active request breaker.tryAcquirePending() @@ -81,7 +81,7 @@ func TestAppRequestMetricsHandler(t *testing.T) { baseHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { fakeClock.SetTime(fakeClock.Now().Add(time.Second)) }) - breaker := NewBreaker(BreakerParams{QueueDepth: 10, MaxConcurrency: 10, InitialCapacity: 10}) + breaker := NewBreaker(BreakerParams{Concurrency: 10, MaxQueueDepth: 10, InitialCapacity: 10, RateLimitingFactor: 3}) breaker.tryAcquirePending() handler, err := NewAppRequestMetricsHandler(mp, baseHandler, breaker) if err != nil { @@ -152,7 +152,7 @@ func BenchmarkAppRequestMetricsHandler(b *testing.B) { mp := metric.NewMeterProvider(metric.WithReader(reader)) baseHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {}) - breaker := NewBreaker(BreakerParams{QueueDepth: 10, MaxConcurrency: 10, InitialCapacity: 10}) + breaker := NewBreaker(BreakerParams{Concurrency: 10, MaxQueueDepth: 10, InitialCapacity: 10, RateLimitingFactor: 3}) handler, err := NewAppRequestMetricsHandler(mp, baseHandler, breaker) if err != nil { b.Fatal("Failed to create handler:", err) diff --git a/pkg/queue/sharedmain/main.go b/pkg/queue/sharedmain/main.go index d40010ec6d2d..9921d36e20ca 100644 --- a/pkg/queue/sharedmain/main.go +++ b/pkg/queue/sharedmain/main.go @@ -398,8 +398,8 @@ func buildBreaker(logger *zap.SugaredLogger, env config) *queue.Breaker { // allow the autoscaler time to react. queueDepth := 10 * env.ContainerConcurrency params := queue.BreakerParams{ - QueueDepth: queueDepth, - MaxConcurrency: env.ContainerConcurrency, + Concurrency: env.ContainerConcurrency, + MaxQueueDepth: queueDepth, InitialCapacity: env.ContainerConcurrency, } logger.Infof("Queue container is starting with BreakerParams = %#v", params) diff --git a/pkg/queue/sharedmain/main_test.go b/pkg/queue/sharedmain/main_test.go index fd778a9fddd0..51c64f32fd6c 100644 --- a/pkg/queue/sharedmain/main_test.go +++ b/pkg/queue/sharedmain/main_test.go @@ -73,7 +73,7 @@ func TestQueueTraceSpans(t *testing.T) { serverURL, _ := url.Parse(server.URL) proxy := httputil.NewSingleHostReverseProxy(serverURL) - params := queue.BreakerParams{QueueDepth: 10, MaxConcurrency: 10, InitialCapacity: 10} + params := queue.BreakerParams{Concurrency: 10, MaxQueueDepth: 10, InitialCapacity: 10, RateLimitingFactor: 3} var breaker *queue.Breaker if !tc.infiniteCC { breaker = queue.NewBreaker(params)