Skip to content

Commit

Permalink
Workqueue: Add generic versions that are properly typed
Browse files Browse the repository at this point in the history
This change adds a generic version of the various workqueue types while
retaining compatibility for the existing exported symbols and constructors.
The generic variants are prefixed with `Typed` and the existing ones are
marked as deprecated to nudge people to transition without breaking
them.

Kubernetes-commit: 0c7370bb851c15825d30a516722139ccccca0cfc
  • Loading branch information
alvaroaleman authored and k8s-publishing-bot committed Apr 11, 2024
1 parent 9433226 commit d67dddf
Show file tree
Hide file tree
Showing 8 changed files with 270 additions and 131 deletions.
139 changes: 98 additions & 41 deletions util/workqueue/default_rate_limiters.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,69 +24,96 @@ import (
"golang.org/x/time/rate"
)

type RateLimiter interface {
// Deprecated: RateLimiter is deprecated, use TypedRateLimiter instead.
type RateLimiter TypedRateLimiter[any]

type TypedRateLimiter[T comparable] interface {
// When gets an item and gets to decide how long that item should wait
When(item interface{}) time.Duration
When(item T) time.Duration
// Forget indicates that an item is finished being retried. Doesn't matter whether it's for failing
// or for success, we'll stop tracking it
Forget(item interface{})
Forget(item T)
// NumRequeues returns back how many failures the item has had
NumRequeues(item interface{}) int
NumRequeues(item T) int
}

// DefaultControllerRateLimiter is a no-arg constructor for a default rate limiter for a workqueue. It has
// both overall and per-item rate limiting. The overall is a token bucket and the per-item is exponential
//
// Deprecated: Use DefaultTypedControllerRateLimiter instead.
func DefaultControllerRateLimiter() RateLimiter {
return NewMaxOfRateLimiter(
NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),
return DefaultTypedControllerRateLimiter[any]()
}

// DefaultTypedControllerRateLimiter is a no-arg constructor for a default rate limiter for a workqueue. It has
// both overall and per-item rate limiting. The overall is a token bucket and the per-item is exponential
func DefaultTypedControllerRateLimiter[T comparable]() TypedRateLimiter[T] {
return NewTypedMaxOfRateLimiter(
NewTypedItemExponentialFailureRateLimiter[T](5*time.Millisecond, 1000*time.Second),
// 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)
&BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
&TypedBucketRateLimiter[T]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
)
}

// BucketRateLimiter adapts a standard bucket to the workqueue ratelimiter API
type BucketRateLimiter struct {
// Deprecated: BucketRateLimiter is deprecated, use TypedBucketRateLimiter instead.
type BucketRateLimiter = TypedBucketRateLimiter[any]

// TypedBucketRateLimiter adapts a standard bucket to the workqueue ratelimiter API
type TypedBucketRateLimiter[T comparable] struct {
*rate.Limiter
}

var _ RateLimiter = &BucketRateLimiter{}

func (r *BucketRateLimiter) When(item interface{}) time.Duration {
func (r *TypedBucketRateLimiter[T]) When(item T) time.Duration {
return r.Limiter.Reserve().Delay()
}

func (r *BucketRateLimiter) NumRequeues(item interface{}) int {
func (r *TypedBucketRateLimiter[T]) NumRequeues(item T) int {
return 0
}

func (r *BucketRateLimiter) Forget(item interface{}) {
func (r *TypedBucketRateLimiter[T]) Forget(item T) {
}

// ItemExponentialFailureRateLimiter does a simple baseDelay*2^<num-failures> limit
// Deprecated: ItemExponentialFailureRateLimiter is deprecated, use TypedItemExponentialFailureRateLimiter instead.
type ItemExponentialFailureRateLimiter = TypedItemExponentialFailureRateLimiter[any]

// TypedItemExponentialFailureRateLimiter does a simple baseDelay*2^<num-failures> limit
// dealing with max failures and expiration are up to the caller
type ItemExponentialFailureRateLimiter struct {
type TypedItemExponentialFailureRateLimiter[T comparable] struct {
failuresLock sync.Mutex
failures map[interface{}]int
failures map[T]int

baseDelay time.Duration
maxDelay time.Duration
}

var _ RateLimiter = &ItemExponentialFailureRateLimiter{}

// Deprecated: NewItemExponentialFailureRateLimiter is deprecated, use NewTypedItemExponentialFailureRateLimiter instead.
func NewItemExponentialFailureRateLimiter(baseDelay time.Duration, maxDelay time.Duration) RateLimiter {
return &ItemExponentialFailureRateLimiter{
failures: map[interface{}]int{},
return NewTypedItemExponentialFailureRateLimiter[any](baseDelay, maxDelay)
}

func NewTypedItemExponentialFailureRateLimiter[T comparable](baseDelay time.Duration, maxDelay time.Duration) TypedRateLimiter[T] {
return &TypedItemExponentialFailureRateLimiter[T]{
failures: map[T]int{},
baseDelay: baseDelay,
maxDelay: maxDelay,
}
}

// Deprecated: DefaultItemBasedRateLimiter is deprecated, use DefaultTypedItemBasedRateLimiter instead.
func DefaultItemBasedRateLimiter() RateLimiter {
return NewItemExponentialFailureRateLimiter(time.Millisecond, 1000*time.Second)
return DefaultTypedItemBasedRateLimiter[any]()
}

func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration {
func DefaultTypedItemBasedRateLimiter[T comparable]() TypedRateLimiter[T] {
return NewTypedItemExponentialFailureRateLimiter[T](time.Millisecond, 1000*time.Second)
}

func (r *TypedItemExponentialFailureRateLimiter[T]) When(item T) time.Duration {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()

Expand All @@ -107,24 +134,28 @@ func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration
return calculated
}

func (r *ItemExponentialFailureRateLimiter) NumRequeues(item interface{}) int {
func (r *TypedItemExponentialFailureRateLimiter[T]) NumRequeues(item T) int {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()

return r.failures[item]
}

func (r *ItemExponentialFailureRateLimiter) Forget(item interface{}) {
func (r *TypedItemExponentialFailureRateLimiter[T]) Forget(item T) {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()

delete(r.failures, item)
}

// ItemFastSlowRateLimiter does a quick retry for a certain number of attempts, then a slow retry after that
type ItemFastSlowRateLimiter struct {
// Deprecated: Use TypedItemFastSlowRateLimiter instead.
type ItemFastSlowRateLimiter = TypedItemFastSlowRateLimiter[any]

// TypedItemFastSlowRateLimiter does a quick retry for a certain number of attempts, then a slow retry after that
type TypedItemFastSlowRateLimiter[T comparable] struct {
failuresLock sync.Mutex
failures map[interface{}]int
failures map[T]int

maxFastAttempts int
fastDelay time.Duration
Expand All @@ -133,16 +164,21 @@ type ItemFastSlowRateLimiter struct {

var _ RateLimiter = &ItemFastSlowRateLimiter{}

// Deprecated: NewItemFastSlowRateLimiter is deprecated, use NewTypedItemFastSlowRateLimiter instead.
func NewItemFastSlowRateLimiter(fastDelay, slowDelay time.Duration, maxFastAttempts int) RateLimiter {
return &ItemFastSlowRateLimiter{
failures: map[interface{}]int{},
return NewTypedItemFastSlowRateLimiter[any](fastDelay, slowDelay, maxFastAttempts)
}

func NewTypedItemFastSlowRateLimiter[T comparable](fastDelay, slowDelay time.Duration, maxFastAttempts int) TypedRateLimiter[T] {
return &TypedItemFastSlowRateLimiter[T]{
failures: map[T]int{},
fastDelay: fastDelay,
slowDelay: slowDelay,
maxFastAttempts: maxFastAttempts,
}
}

func (r *ItemFastSlowRateLimiter) When(item interface{}) time.Duration {
func (r *TypedItemFastSlowRateLimiter[T]) When(item T) time.Duration {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()

Expand All @@ -155,14 +191,14 @@ func (r *ItemFastSlowRateLimiter) When(item interface{}) time.Duration {
return r.slowDelay
}

func (r *ItemFastSlowRateLimiter) NumRequeues(item interface{}) int {
func (r *TypedItemFastSlowRateLimiter[T]) NumRequeues(item T) int {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()

return r.failures[item]
}

func (r *ItemFastSlowRateLimiter) Forget(item interface{}) {
func (r *TypedItemFastSlowRateLimiter[T]) Forget(item T) {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()

Expand All @@ -172,11 +208,18 @@ func (r *ItemFastSlowRateLimiter) Forget(item interface{}) {
// MaxOfRateLimiter calls every RateLimiter and returns the worst case response
// When used with a token bucket limiter, the burst could be apparently exceeded in cases where particular items
// were separately delayed a longer time.
type MaxOfRateLimiter struct {
limiters []RateLimiter
//
// Deprecated: Use TypedMaxOfRateLimiter instead.
type MaxOfRateLimiter = TypedMaxOfRateLimiter[any]

// TypedMaxOfRateLimiter calls every RateLimiter and returns the worst case response
// When used with a token bucket limiter, the burst could be apparently exceeded in cases where particular items
// were separately delayed a longer time.
type TypedMaxOfRateLimiter[T comparable] struct {
limiters []TypedRateLimiter[T]
}

func (r *MaxOfRateLimiter) When(item interface{}) time.Duration {
func (r *TypedMaxOfRateLimiter[T]) When(item T) time.Duration {
ret := time.Duration(0)
for _, limiter := range r.limiters {
curr := limiter.When(item)
Expand All @@ -188,11 +231,16 @@ func (r *MaxOfRateLimiter) When(item interface{}) time.Duration {
return ret
}

func NewMaxOfRateLimiter(limiters ...RateLimiter) RateLimiter {
return &MaxOfRateLimiter{limiters: limiters}
// Deprecated: NewMaxOfRateLimiter is deprecated, use NewTypedMaxOfRateLimiter instead.
func NewMaxOfRateLimiter(limiters ...TypedRateLimiter[any]) RateLimiter {
return NewTypedMaxOfRateLimiter(limiters...)
}

func (r *MaxOfRateLimiter) NumRequeues(item interface{}) int {
func NewTypedMaxOfRateLimiter[T comparable](limiters ...TypedRateLimiter[T]) TypedRateLimiter[T] {
return &TypedMaxOfRateLimiter[T]{limiters: limiters}
}

func (r *TypedMaxOfRateLimiter[T]) NumRequeues(item T) int {
ret := 0
for _, limiter := range r.limiters {
curr := limiter.NumRequeues(item)
Expand All @@ -204,23 +252,32 @@ func (r *MaxOfRateLimiter) NumRequeues(item interface{}) int {
return ret
}

func (r *MaxOfRateLimiter) Forget(item interface{}) {
func (r *TypedMaxOfRateLimiter[T]) Forget(item T) {
for _, limiter := range r.limiters {
limiter.Forget(item)
}
}

// WithMaxWaitRateLimiter have maxDelay which avoids waiting too long
type WithMaxWaitRateLimiter struct {
limiter RateLimiter
// Deprecated: Use TypedWithMaxWaitRateLimiter instead.
type WithMaxWaitRateLimiter = TypedWithMaxWaitRateLimiter[any]

// TypedWithMaxWaitRateLimiter have maxDelay which avoids waiting too long
type TypedWithMaxWaitRateLimiter[T comparable] struct {
limiter TypedRateLimiter[T]
maxDelay time.Duration
}

// Deprecated: NewWithMaxWaitRateLimiter is deprecated, use NewTypedWithMaxWaitRateLimiter instead.
func NewWithMaxWaitRateLimiter(limiter RateLimiter, maxDelay time.Duration) RateLimiter {
return &WithMaxWaitRateLimiter{limiter: limiter, maxDelay: maxDelay}
return NewTypedWithMaxWaitRateLimiter[any](limiter, maxDelay)
}

func NewTypedWithMaxWaitRateLimiter[T comparable](limiter TypedRateLimiter[T], maxDelay time.Duration) TypedRateLimiter[T] {
return &TypedWithMaxWaitRateLimiter[T]{limiter: limiter, maxDelay: maxDelay}
}

func (w WithMaxWaitRateLimiter) When(item interface{}) time.Duration {
func (w TypedWithMaxWaitRateLimiter[T]) When(item T) time.Duration {
delay := w.limiter.When(item)
if delay > w.maxDelay {
return w.maxDelay
Expand All @@ -229,10 +286,10 @@ func (w WithMaxWaitRateLimiter) When(item interface{}) time.Duration {
return delay
}

func (w WithMaxWaitRateLimiter) Forget(item interface{}) {
func (w TypedWithMaxWaitRateLimiter[T]) Forget(item T) {
w.limiter.Forget(item)
}

func (w WithMaxWaitRateLimiter) NumRequeues(item interface{}) int {
func (w TypedWithMaxWaitRateLimiter[T]) NumRequeues(item T) int {
return w.limiter.NumRequeues(item)
}
Loading

0 comments on commit d67dddf

Please sign in to comment.