Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion cmd/activator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ type config struct {
PodName string `split_words:"true" required:"true"`
PodIP string `split_words:"true" required:"true"`

RateLimitingFactor float64 `split_words:"true" default:"0.0"`

// These are here to allow configuring higher values of keep-alive for larger environments.
// TODO: run loadtests using these flags to determine optimal default values.
MaxIdleProxyConns int `split_words:"true" default:"1000"`
Expand Down Expand Up @@ -154,6 +156,7 @@ func main() {
}

logger.Info("Starting the knative activator")
logger.Infow("Using rate-limiting throttler", "rate-limit", env.RateLimitingFactor)

// Create the transport used by both the activator->QP probe and the proxy.
// It's important that the throttler and the activatorhandler share this
Expand Down Expand Up @@ -203,7 +206,7 @@ func main() {
activatornet.SetProbeSettings(probeTimeout, probeFrequency)

// Start throttler.
throttler := activatornet.NewThrottler(ctx, env.PodIP)
throttler := activatornet.NewThrottler(ctx, env.PodIP, env.RateLimitingFactor)
go throttler.Run(ctx, transport, networkConfig.EnableMeshPodAddressability, networkConfig.MeshCompatibilityMode)

// Set up our config store
Expand Down
5 changes: 4 additions & 1 deletion pkg/activator/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,10 @@ func (a *activationHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {

a.logger.Errorw("Throttler try error", zap.String(logkey.Key, revID.String()), zap.Error(err))

if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, queue.ErrRequestQueueFull) {
if errors.Is(err, queue.ErrRequestQueueFull) {
a.logger.Infof("Rate-limited %s/%s", revID.Namespace, revID.Name)
http.Error(w, err.Error(), http.StatusTooManyRequests)
} else if errors.Is(err, context.DeadlineExceeded) {
http.Error(w, err.Error(), http.StatusServiceUnavailable)
} else {
w.WriteHeader(http.StatusInternalServerError)
Expand Down
2 changes: 1 addition & 1 deletion pkg/activator/handler/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func TestActivationHandler(t *testing.T) {
}, {
name: "overflow",
wantBody: "pending request queue full\n",
wantCode: http.StatusServiceUnavailable,
wantCode: http.StatusTooManyRequests,
throttler: fakeThrottler{err: queue.ErrRequestQueueFull},
}}
for _, test := range tests {
Expand Down
21 changes: 12 additions & 9 deletions pkg/activator/net/lb_policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,10 @@ func TestFirstAvailable(t *testing.T) {
podTrackers := []*podTracker{{
dest: "this-is-nowhere",
b: queue.NewBreaker(queue.BreakerParams{
QueueDepth: 1,
MaxConcurrency: 1,
InitialCapacity: 1,
Concurrency: 1,
MaxQueueDepth: 1,
InitialCapacity: 1,
RateLimitingFactor: rateLimitingFactor,
}),
}}

Expand All @@ -152,16 +153,18 @@ func TestFirstAvailable(t *testing.T) {
podTrackers := []*podTracker{{
dest: "down-by-the-river",
b: queue.NewBreaker(queue.BreakerParams{
QueueDepth: 1,
MaxConcurrency: 1,
InitialCapacity: 1,
Concurrency: 1,
MaxQueueDepth: 1,
InitialCapacity: 1,
RateLimitingFactor: rateLimitingFactor,
}),
}, {
dest: "heart-of-gold",
b: queue.NewBreaker(queue.BreakerParams{
QueueDepth: 1,
MaxConcurrency: 1,
InitialCapacity: 1,
Concurrency: 1,
MaxQueueDepth: 1,
InitialCapacity: 1,
RateLimitingFactor: rateLimitingFactor,
}),
}}

Expand Down
22 changes: 16 additions & 6 deletions pkg/activator/net/throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ const (
// across the entire revision), and for the individual podTracker breakers.
breakerQueueDepth = 10000

// Do not have a smaller queue than 100 entries
minBreakerQueueDepth = 100

// The revisionThrottler breaker's concurrency increases up to this value as
// new endpoints show up. We need to set some value here since the breaker
// requires an explicit buffer size (it's backed by a chan struct{}), but
Expand Down Expand Up @@ -149,7 +152,8 @@ type revisionThrottler struct {
backendCount int

// This is a breaker for the revision as a whole.
breaker breaker
breaker breaker
rateLimitingFactor float64

// This will be non-empty when we're able to use pod addressing.
podTrackers []*podTracker
Expand Down Expand Up @@ -199,6 +203,7 @@ func newRevisionThrottler(revID types.NamespacedName,
logger: logger,
protocol: proto,
lbPolicy: lbp,
rateLimitingFactor: breakerParams.RateLimitingFactor,
}

// Start with unknown
Expand Down Expand Up @@ -436,9 +441,11 @@ func (rt *revisionThrottler) handleUpdate(update revisionDestsUpdate) {
tracker = newPodTracker(newDest, nil)
} else {
tracker = newPodTracker(newDest, queue.NewBreaker(queue.BreakerParams{
QueueDepth: breakerQueueDepth,
MaxConcurrency: rt.containerConcurrency,
InitialCapacity: rt.containerConcurrency, // Presume full unused capacity.
Concurrency: rt.containerConcurrency,
MinQueueDepth: minBreakerQueueDepth,
MaxQueueDepth: rt.containerConcurrency,
InitialCapacity: rt.containerConcurrency, // Presume full unused capacity.
RateLimitingFactor: rt.rateLimitingFactor,
}))
}
}
Expand All @@ -459,17 +466,19 @@ type Throttler struct {
revisionThrottlersMutex sync.RWMutex
revisionLister servinglisters.RevisionLister
ipAddress string // The IP address of this activator.
rateLimitingFactor float64
logger *zap.SugaredLogger
epsUpdateCh chan *corev1.Endpoints
}

// NewThrottler creates a new Throttler
func NewThrottler(ctx context.Context, ipAddr string) *Throttler {
func NewThrottler(ctx context.Context, ipAddr string, rateLimitingFactor float64) *Throttler {
revisionInformer := revisioninformer.Get(ctx)
t := &Throttler{
revisionThrottlers: make(map[types.NamespacedName]*revisionThrottler),
revisionLister: revisionInformer.Lister(),
ipAddress: ipAddr,
rateLimitingFactor: rateLimitingFactor,
logger: logging.FromContext(ctx),
epsUpdateCh: make(chan *corev1.Endpoints),
}
Expand Down Expand Up @@ -552,7 +561,8 @@ func (t *Throttler) getOrCreateRevisionThrottler(revID types.NamespacedName) (*r
revID,
int(rev.Spec.GetContainerConcurrency()),
pkgnet.ServicePortName(rev.GetProtocol()),
queue.BreakerParams{QueueDepth: breakerQueueDepth, MaxConcurrency: revisionMaxConcurrency},
queue.BreakerParams{Concurrency: int(rev.Spec.GetContainerConcurrency()), MinQueueDepth: minBreakerQueueDepth, MaxQueueDepth: revisionMaxConcurrency,
RateLimitingFactor: t.rateLimitingFactor},
t.logger,
)
t.revisionThrottlers[revID] = revThrottler
Expand Down
24 changes: 14 additions & 10 deletions pkg/activator/net/throttler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,13 @@ import (
"knative.dev/serving/pkg/queue"
)

const rateLimitingFactor = 3.0

var testBreakerParams = queue.BreakerParams{
QueueDepth: 1,
MaxConcurrency: revisionMaxConcurrency,
InitialCapacity: 0,
Concurrency: 1,
MaxQueueDepth: revisionMaxConcurrency,
InitialCapacity: 0,
RateLimitingFactor: rateLimitingFactor,
}

type tryResult struct {
Expand All @@ -60,7 +63,7 @@ type tryResult struct {
}

func newTestThrottler(ctx context.Context) *Throttler {
return NewThrottler(ctx, "10.10.10.10")
return NewThrottler(ctx, "10.10.10.10", rateLimitingFactor)
}

func TestThrottlerUpdateCapacity(t *testing.T) {
Expand Down Expand Up @@ -280,9 +283,10 @@ func makeTrackers(num, cc int) []*podTracker {
x[i] = newPodTracker(strconv.Itoa(i), nil)
if cc > 0 {
x[i].b = queue.NewBreaker(queue.BreakerParams{
QueueDepth: 1,
MaxConcurrency: cc,
InitialCapacity: cc,
Concurrency: 1,
MaxQueueDepth: cc,
InitialCapacity: cc,
RateLimitingFactor: rateLimitingFactor,
})
}
}
Expand Down Expand Up @@ -509,7 +513,7 @@ func TestThrottlerSuccesses(t *testing.T) {

updateCh := make(chan revisionDestsUpdate)

throttler := NewThrottler(ctx, "130.0.0.2")
throttler := NewThrottler(ctx, "130.0.0.2", rateLimitingFactor)
var grp errgroup.Group
grp.Go(func() error { throttler.run(updateCh); return nil })
// Ensure the throttler stopped before we leave the test, so that
Expand Down Expand Up @@ -729,7 +733,7 @@ func TestActivatorsIndexUpdate(t *testing.T) {

updateCh := make(chan revisionDestsUpdate)

throttler := NewThrottler(ctx, "130.0.0.2")
throttler := NewThrottler(ctx, "130.0.0.2", rateLimitingFactor)
var grp errgroup.Group
grp.Go(func() error { throttler.run(updateCh); return nil })
// Ensure the throttler stopped before we leave the test, so that
Expand Down Expand Up @@ -824,7 +828,7 @@ func TestMultipleActivators(t *testing.T) {

updateCh := make(chan revisionDestsUpdate)

throttler := NewThrottler(ctx, "130.0.0.2")
throttler := NewThrottler(ctx, "130.0.0.2", rateLimitingFactor)
var grp errgroup.Group
grp.Go(func() error { throttler.run(updateCh); return nil })
// Ensure the throttler stopped before we leave the test, so that
Expand Down
67 changes: 50 additions & 17 deletions pkg/queue/breaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"errors"
"fmt"
"math"
"sync"
"sync/atomic"
)

Expand All @@ -33,19 +34,25 @@ const MaxBreakerCapacity = math.MaxInt32

// BreakerParams defines the parameters of the breaker.
type BreakerParams struct {
QueueDepth int
MaxConcurrency int
InitialCapacity int
Concurrency int
MinQueueDepth int
MaxQueueDepth int
InitialCapacity int
RateLimitingFactor float64
}

// Breaker is a component that enforces a concurrency limit on the
// execution of a function. It also maintains a queue of function
// executions in excess of the concurrency limit. Function call attempts
// beyond the limit of the queue are failed immediately.
type Breaker struct {
inFlight atomic.Int64
totalSlots int64
sem *semaphore
inFlight atomic.Int64
totalSlots int64
slotsMutex sync.RWMutex
rateLimitingFactor float64
minQueueDepth int
maxQueueDepth int
sem *semaphore

// release is the callback function returned to callers by Reserve to
// allow the reservation made by Reserve to be released.
Expand All @@ -55,19 +62,29 @@ type Breaker struct {
// NewBreaker creates a Breaker with the desired queue depth,
// concurrency limit and initial capacity.
func NewBreaker(params BreakerParams) *Breaker {
if params.QueueDepth <= 0 {
panic(fmt.Sprintf("Queue depth must be greater than 0. Got %v.", params.QueueDepth))
if params.Concurrency <= 0 {
panic(fmt.Sprintf("Concurrency must be greater than 0. Got %v.", params.Concurrency))
}
if params.MaxConcurrency < 0 {
panic(fmt.Sprintf("Max concurrency must be 0 or greater. Got %v.", params.MaxConcurrency))
if params.RateLimitingFactor < 0 {
panic(fmt.Sprintf("Rate limiting factor must be 0 or greater. Got %v.", params.RateLimitingFactor))
}
if params.InitialCapacity < 0 || params.InitialCapacity > params.MaxConcurrency {
if params.MinQueueDepth < 0 {
panic(fmt.Sprintf("Min queue depth must be 0 or greater. Got %v.", params.MinQueueDepth))
}
if params.MaxQueueDepth < 0 {
panic(fmt.Sprintf("Max queue depth must be 0 or greater. Got %v.", params.MaxQueueDepth))
}
if params.InitialCapacity < 0 || params.InitialCapacity > params.MaxQueueDepth {
panic(fmt.Sprintf("Initial capacity must be between 0 and max concurrency. Got %v.", params.InitialCapacity))
}

b := &Breaker{
totalSlots: int64(params.QueueDepth + params.MaxConcurrency),
sem: newSemaphore(params.MaxConcurrency, params.InitialCapacity),
rateLimitingFactor: params.RateLimitingFactor,
minQueueDepth: params.MinQueueDepth,
maxQueueDepth: params.MaxQueueDepth,
totalSlots: calculateQueueDepth(params.Concurrency, int64(params.MinQueueDepth), int64(params.MaxQueueDepth), params.RateLimitingFactor),
slotsMutex: sync.RWMutex{},
sem: newSemaphore(params.MaxQueueDepth, params.InitialCapacity),
}

// Allocating the closure returned by Reserve here avoids an allocation in Reserve.
Expand Down Expand Up @@ -97,6 +114,8 @@ func (b *Breaker) tryAcquirePending() bool {
// anymore.
for {
cur := b.inFlight.Load()
b.slotsMutex.RLock()
defer b.slotsMutex.RUnlock()
if cur == b.totalSlots {
return false
}
Expand Down Expand Up @@ -160,8 +179,11 @@ func (b *Breaker) InFlight() int {
}

// UpdateConcurrency updates the maximum number of in-flight requests.
func (b *Breaker) UpdateConcurrency(size int) {
b.sem.updateCapacity(size)
func (b *Breaker) UpdateConcurrency(semaphoreSize int) {
b.slotsMutex.Lock()
defer b.slotsMutex.Unlock()
b.totalSlots = calculateQueueDepth(semaphoreSize, int64(b.minQueueDepth), int64(b.maxQueueDepth), b.rateLimitingFactor)
b.sem.updateCapacity(semaphoreSize)
}

// Capacity returns the number of allowed in-flight requests on this breaker.
Expand All @@ -177,6 +199,17 @@ func newSemaphore(maxCapacity, initialCapacity int) *semaphore {
return sem
}

func calculateQueueDepth(concurrency int, minQueueDepth, maxQueueDepth int64, rateLimitingFactor float64) int64 {
// num_concurrency requests are let through the breaker, so we must allow num_concurrency * (rate_limit + 1) in the queue
// since inflight requests are part of the queue
// all values non-negative so casting is ok here
maxAllowedConcurrency := int64(float64(concurrency) * (rateLimitingFactor + 1))
if rateLimitingFactor == 0.0 { // no rate limiting; var is exactly set
maxAllowedConcurrency = maxQueueDepth
}
return int64(max(minQueueDepth, min(maxAllowedConcurrency, maxQueueDepth)))
}

// semaphore is an implementation of a semaphore based on packed integers and a channel.
// state is an uint64 that has two uint32s packed into it: capacity and inFlight. The
// former specifies how many request are allowed at any given time into the semaphore
Expand Down Expand Up @@ -261,8 +294,8 @@ func (s *semaphore) release() {
}

// updateCapacity updates the capacity of the semaphore to the desired size.
func (s *semaphore) updateCapacity(size int) {
s64 := uint64(size) //nolint:gosec // TODO(dprotaso) capacity should be uint
func (s *semaphore) updateCapacity(semaphoreSize int) {
s64 := uint64(semaphoreSize) //nolint:gosec // TODO(dprotaso) capacity should be uint
for {
old := s.state.Load()
capacity, in := unpack(old)
Expand Down
Loading