Skip to content

Commit

Permalink
Merge pull request kubernetes#3472 from chrislovecnm/global-rate-limiter
Browse files Browse the repository at this point in the history
Automatic merge from submit-queue.

adding kubernetes core rate limiter handlers

This PR is re-using the handlers from the k8s core project, to create a global rate limiting.

This work starts work on kubernetes#3471
  • Loading branch information
Kubernetes Submit Queue authored Oct 27, 2017
2 parents 68c9036 + 6dc953c commit 1f4224b
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 4 deletions.
1 change: 1 addition & 0 deletions upup/pkg/fi/cloudup/awsup/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/kubernetes/federation/pkg/dnsprovider:go_default_library",
"//vendor/k8s.io/kubernetes/federation/pkg/dnsprovider/providers/aws/route53:go_default_library",
"//vendor/k8s.io/kubernetes/pkg/cloudprovider/providers/aws:go_default_library",
],
)

Expand Down
64 changes: 60 additions & 4 deletions upup/pkg/fi/cloudup/awsup/aws_cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ package awsup
import (
"fmt"
"strings"
"sync"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/autoscaling"
"github.com/aws/aws-sdk-go/service/autoscaling/autoscalingiface"
Expand All @@ -43,6 +45,7 @@ import (
"k8s.io/kops/upup/pkg/fi"
"k8s.io/kubernetes/federation/pkg/dnsprovider"
dnsproviderroute53 "k8s.io/kubernetes/federation/pkg/dnsprovider/providers/aws/route53"
k8s_aws "k8s.io/kubernetes/pkg/cloudprovider/providers/aws"
)

// By default, aws-sdk-go only retries 3 times, which doesn't give
Expand Down Expand Up @@ -144,6 +147,13 @@ type awsCloudImplementation struct {
region string

tags map[string]string

regionDelayers *RegionDelayers
}

type RegionDelayers struct {
mutex sync.Mutex
delayerMap map[string]*k8s_aws.CrossRequestRetryDelay
}

var _ fi.Cloud = &awsCloudImplementation{}
Expand All @@ -161,16 +171,19 @@ var awsCloudInstances map[string]AWSCloud = make(map[string]AWSCloud)
func NewAWSCloud(region string, tags map[string]string) (AWSCloud, error) {
raw := awsCloudInstances[region]
if raw == nil {
c := &awsCloudImplementation{region: region}
c := &awsCloudImplementation{
region: region,
regionDelayers: &RegionDelayers{
delayerMap: make(map[string]*k8s_aws.CrossRequestRetryDelay),
},
}

config := aws.NewConfig().WithRegion(region)

// Add some logging of retries
config.Retryer = newLoggingRetryer(ClientMaxRetries)

// This avoids a confusing error message when we fail to get credentials
// e.g. https://github.com/kubernetes/kops/issues/605
config = config.WithCredentialsChainVerboseErrors(true)
config = request.WithRetryer(config, newLoggingRetryer(ClientMaxRetries))

requestLogger := newRequestLogger(2)

Expand All @@ -180,41 +193,47 @@ func NewAWSCloud(region string, tags map[string]string) (AWSCloud, error) {
}
c.cf = cloudformation.New(sess, config)
c.cf.Handlers.Send.PushFront(requestLogger)
c.addHandlers(region, &c.cf.Handlers)

sess, err = session.NewSession(config)
if err != nil {
return c, err
}
c.ec2 = ec2.New(sess, config)
c.ec2.Handlers.Send.PushFront(requestLogger)
c.addHandlers(region, &c.ec2.Handlers)

sess, err = session.NewSession(config)
if err != nil {
return c, err
}
c.iam = iam.New(sess, config)
c.iam.Handlers.Send.PushFront(requestLogger)
c.addHandlers(region, &c.iam.Handlers)

sess, err = session.NewSession(config)
if err != nil {
return c, err
}
c.elb = elb.New(sess, config)
c.elb.Handlers.Send.PushFront(requestLogger)
c.addHandlers(region, &c.elb.Handlers)

sess, err = session.NewSession(config)
if err != nil {
return c, err
}
c.autoscaling = autoscaling.New(sess, config)
c.autoscaling.Handlers.Send.PushFront(requestLogger)
c.addHandlers(region, &c.autoscaling.Handlers)

sess, err = session.NewSession(config)
if err != nil {
return c, err
}
c.route53 = route53.New(sess, config)
c.route53.Handlers.Send.PushFront(requestLogger)
c.addHandlers(region, &c.route53.Handlers)

awsCloudInstances[region] = c
raw = c
Expand All @@ -225,6 +244,43 @@ func NewAWSCloud(region string, tags map[string]string) (AWSCloud, error) {
return i, nil
}

func (c *awsCloudImplementation) addHandlers(regionName string, h *request.Handlers) {

delayer := c.getCrossRequestRetryDelay(regionName)
if delayer != nil {
h.Sign.PushFrontNamed(request.NamedHandler{
Name: "kops/delay-presign",
Fn: delayer.BeforeSign,
})

h.AfterRetry.PushFrontNamed(request.NamedHandler{
Name: "kops/delay-afterretry",
Fn: delayer.AfterRetry,
})
}
}

// Get a CrossRequestRetryDelay, scoped to the region, not to the request.
// This means that when we hit a limit on a call, we will delay _all_ calls to the API.
// We do this to protect the AWS account from becoming overloaded and effectively locked.
// We also log when we hit request limits.
// Note that this delays the current goroutine; this is bad behaviour and will
// likely cause kops to become slow or unresponsive for cloud operations.
// However, this throttle is intended only as a last resort. When we observe
// this throttling, we need to address the root cause (e.g. add a delay to a
// controller retry loop)
func (c *awsCloudImplementation) getCrossRequestRetryDelay(regionName string) *k8s_aws.CrossRequestRetryDelay {
c.regionDelayers.mutex.Lock()
defer c.regionDelayers.mutex.Unlock()

delayer, found := c.regionDelayers.delayerMap[regionName]
if !found {
delayer = k8s_aws.NewCrossRequestRetryDelay()
c.regionDelayers.delayerMap[regionName] = delayer
}
return delayer
}

func NewEC2Filter(name string, values ...string) *ec2.Filter {
awsValues := []*string{}
for _, value := range values {
Expand Down

0 comments on commit 1f4224b

Please sign in to comment.