Skip to content
This repository has been archived by the owner on May 4, 2021. It is now read-only.

Commit

Permalink
Rework on registry push retry (#345)
Browse files Browse the repository at this point in the history
* Rework on registry push retry

* typo

* address comments
  • Loading branch information
evelynl94 authored Sep 21, 2020
1 parent e9b23cd commit 07fab96
Show file tree
Hide file tree
Showing 8 changed files with 159 additions and 120 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
github.com/aws/aws-sdk-go v1.30.1 // indirect
github.com/awslabs/amazon-ecr-credential-helper v0.4.0
github.com/axw/gocov v0.0.0-20170322000131-3a69a0d2a4ef
github.com/cenkalti/backoff v2.2.1+incompatible
github.com/client9/misspell v0.3.4
github.com/docker/distribution v2.7.0+incompatible
github.com/docker/docker-credential-helpers v0.6.1
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ github.com/axw/gocov v0.0.0-20170322000131-3a69a0d2a4ef h1:kh7Fi8sfEY7aCl42VEEvG
github.com/axw/gocov v0.0.0-20170322000131-3a69a0d2a4ef/go.mod h1:pc6XrbIn8RLeVSNzXCZKXNst+RTE5Ju/nySYl1Wc0B4=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 h1:xJ4a3vCFaGF/jqvzLMYoU8P317H5OQ+Via4RmuPwCS0=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/cenkalti/backoff v1.1.0 h1:QnvVp8ikKCDWOsFheytRCoYWYPO/ObCTBGxT19Hc+yE=
github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4=
github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
github.com/client9/misspell v0.3.4 h1:ta993UF76GwbvJcIo3Y68y/M3WxlpEHPWIGDkJYwzJI=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
Expand Down
25 changes: 23 additions & 2 deletions lib/registry/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"regexp"
"time"

"github.com/cenkalti/backoff"
"github.com/juju/ratelimit"

"github.com/uber/makisu/lib/concurrency"
Expand Down Expand Up @@ -362,13 +363,33 @@ func (c DockerRegistryClient) pullLayerHelper(

// PushLayer pushes the image layer to the registry.
func (c DockerRegistryClient) PushLayer(layerDigest image.Digest) error {
return c.pushLayerHelper(layerDigest, false)
return c.pushLayerWithBackoff(layerDigest, false)
}

// PushImageConfig pushes image config blob to the registry.
// Same as PushLayer, with slightly different log message.
func (c DockerRegistryClient) PushImageConfig(layerDigest image.Digest) error {
return c.pushLayerHelper(layerDigest, true)
return c.pushLayerWithBackoff(layerDigest, true)
}

func (c DockerRegistryClient) pushLayerWithBackoff(layerDigest image.Digest, isConfig bool) error {
multiError := utils.NewMultiErrors()
b := c.config.backoff()
for {
err := c.pushLayerHelper(layerDigest, isConfig)
// TODO: break on non-retryable errors.
if err != nil {
multiError.Add(err)
d := b.NextBackOff()
if d == backoff.Stop {
break
}
time.Sleep(d)
continue
}
break
}
return multiError.Collect()
}

func (c DockerRegistryClient) pushLayerHelper(layerDigest image.Digest, isConfig bool) error {
Expand Down
11 changes: 11 additions & 0 deletions lib/registry/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,3 +152,14 @@ func TestPushImage(t *testing.T) {
require.NoError(err)
require.NoError(p.Push(testutil.SampleImageTag))
}

func TestPushLayerRetry(t *testing.T) {
require := require.New(t)
ctx, cleanup := context.BuildContextFixtureWithSampleImage()
defer cleanup()

p, err := PushClientFixture(ctx)
require.NoError(err)
p.config.Retries = 1
require.EqualError(p.PushLayer(image.NewEmptyDigest()), "push layer content : get layer file stat: file does not exist; push layer content : get layer file stat: file does not exist")
}
21 changes: 15 additions & 6 deletions lib/registry/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

yaml "gopkg.in/yaml.v2"

"github.com/cenkalti/backoff"
"github.com/uber/makisu/lib/registry/security"
"github.com/uber/makisu/lib/utils"
"github.com/uber/makisu/lib/utils/httputil"
Expand All @@ -48,7 +49,8 @@ type RepositoryMap map[string]Config
type Config struct {
Concurrency int `yaml:"concurrency" json:"concurrency"`
Timeout time.Duration `yaml:"timeout" json:"timeout"`
Retries int `yaml:"retries" json:"retries"`
RetryDisabled bool `yaml:"retry_disabled" json:"retry_disabled"`
Retries uint64 `yaml:"retries" json:"retries"`
RetryInterval time.Duration `yaml:"retry_interval" json:"retry_interval"`
RetryBackoff float64 `yaml:"retry_backoff" json:"retry_backoff"`
RetryBackoffMax time.Duration `yaml:"retry_backoff_max" json:"retry_backoff_max"`
Expand Down Expand Up @@ -90,12 +92,19 @@ func (c Config) applyDefaults() Config {
return c
}

func (c *Config) backoff() backoff.BackOff {
if c.RetryDisabled {
return &backoff.StopBackOff{}
}
b := backoff.NewExponentialBackOff()
b.InitialInterval = c.RetryInterval
b.Multiplier = c.RetryBackoff
b.MaxInterval = c.RetryBackoffMax
return backoff.WithMaxRetries(b, c.Retries)
}

func (c *Config) sendRetry() httputil.SendOption {
return httputil.SendRetry(
httputil.RetryMax(c.Retries),
httputil.RetryInterval(c.RetryInterval),
httputil.RetryBackoff(c.RetryBackoff),
httputil.RetryBackoffMax(c.RetryBackoffMax))
return httputil.SendRetry(httputil.RetryBackoff(c.backoff()))
}

// UpdateGlobalConfig updates the global registry config given either:
Expand Down
1 change: 1 addition & 0 deletions lib/registry/push_fixture.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ func (t pushTransportFixture) RoundTrip(r *http.Request) (*http.Response, error)
if !found {
return &http.Response{
StatusCode: http.StatusNotFound,
Body: ioutil.NopCloser(bytes.NewReader([]byte{})),
Header: make(http.Header),
}, nil
}
Expand Down
143 changes: 88 additions & 55 deletions lib/utils/httputil/httputil.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,17 @@ import (
"net/url"
"time"

"github.com/cenkalti/backoff"
"github.com/uber/makisu/lib/log"
)

var retryableCodes = map[int]struct{}{
http.StatusTooManyRequests: {},
http.StatusBadGateway: {},
http.StatusServiceUnavailable: {},
http.StatusGatewayTimeout: {},
}

// StatusError occurs if an HTTP response has an unexpected status code.
type StatusError struct {
Method string
Expand Down Expand Up @@ -91,6 +99,18 @@ func IsForbidden(err error) bool {
return IsStatus(err, http.StatusForbidden)
}

func isRetryable(code int) bool {
_, ok := retryableCodes[code]
return ok
}

// IsRetryable returns true if the statis code indicates that the request is
// retryable.
func IsRetryable(err error) bool {
statusErr, ok := err.(StatusError)
return ok && isRetryable(statusErr.Status)
}

// NetworkError occurs on any Send error which occurred while trying to send
// the HTTP request, e.g. the given host is unresponsive.
type NetworkError struct {
Expand Down Expand Up @@ -181,42 +201,37 @@ func SendClient(client *http.Client) SendOption {
}

type retryOptions struct {
max int
interval time.Duration
backoffMultiplier float64
backoffMax time.Duration
backoff backoff.BackOff
extraCodes map[int]bool
}

// RetryOption allows overriding defaults for the SendRetry option.
type RetryOption func(*retryOptions)

// RetryMax sets the max number of retries.
func RetryMax(max int) RetryOption {
return func(o *retryOptions) { o.max = max }
}

// RetryInterval sets the interval between retries.
func RetryInterval(interval time.Duration) RetryOption {
return func(o *retryOptions) { o.interval = interval }
}

// RetryBackoff adds exponential backoff between retries.
func RetryBackoff(backoffMultiplier float64) RetryOption {
return func(o *retryOptions) { o.backoffMultiplier = backoffMultiplier }
func RetryBackoff(b backoff.BackOff) RetryOption {
return func(o *retryOptions) { o.backoff = b }
}

// RetryBackoffMax sets the max duration backoff can reach.
func RetryBackoffMax(backoffMax time.Duration) RetryOption {
return func(o *retryOptions) { o.backoffMax = backoffMax }
// RetryCodes adds more status codes to be retried (in addition to the default
// retryableCodes).
func RetryCodes(codes ...int) RetryOption {
return func(o *retryOptions) {
for _, c := range codes {
o.extraCodes[c] = true
}
}
}

// SendRetry will we retry the request on network / 5XX errors.
func SendRetry(options ...RetryOption) SendOption {
b := backoff.NewExponentialBackOff()
b.InitialInterval = 250 * time.Millisecond
b.Multiplier = 1 // No backoff.
b.MaxInterval = 30 * time.Second
retry := retryOptions{
max: 3,
interval: 250 * time.Millisecond,
backoffMultiplier: 1, // Defaults with no backoff.
backoffMax: 30 * time.Second,
backoff: backoff.WithMaxRetries(b, 3),
extraCodes: make(map[int]bool),
}
for _, o := range options {
o(&retry)
Expand Down Expand Up @@ -261,24 +276,24 @@ func SendContext(ctx context.Context) SendOption {
}

// Send sends an HTTP request. May return NetworkError or StatusError (see above).
func Send(method, rawurl string, options ...SendOption) (resp *http.Response, err error) {
func Send(method, rawurl string, options ...SendOption) (*http.Response, error) {
u, err := url.Parse(rawurl)
if err != nil {
return nil, fmt.Errorf("parse url: %s", err)
}
opts := sendOptions{
opts := &sendOptions{
body: nil,
timeout: 60 * time.Second,
acceptedCodes: map[int]bool{http.StatusOK: true},
headers: map[string]string{},
retry: retryOptions{max: 1},
retry: retryOptions{backoff: &backoff.StopBackOff{}},
transport: nil, // Use HTTP default.
ctx: context.Background(),
url: u,
httpFallbackDisabled: false,
}
for _, o := range options {
o(&opts)
o(opts)
}

req, err := newRequest(method, opts)
Expand All @@ -295,37 +310,31 @@ func Send(method, rawurl string, options ...SendOption) (resp *http.Response, er
}
}

interval := opts.retry.interval
for i := 0; i < opts.retry.max; i++ {
if i > 0 {
time.Sleep(interval)
interval = min(
time.Duration(float64(interval)*opts.retry.backoffMultiplier),
opts.retry.backoffMax)
}
var resp *http.Response
for {
resp, err = client.Do(req)

httpFallbackDisabled := opts.httpFallbackDisabled
if !httpFallbackDisabled && is5xxResponse(resp) {
httpFallbackDisabled = true
}
// Retry without tls. During migration there would be a time when the
// component receiving the tls request does not serve https response.
// TODO (@evelynl): disable retry after tls migration.
if err != nil && req.URL.Scheme == "https" && !httpFallbackDisabled {
if err != nil && shouldFallbackToHTTP(req, resp, opts) && !opts.httpFallbackDisabled {
log.Warnf("Failed to send https request: %s. Retrying with http...", err)
var httpReq *http.Request
httpReq, err = newRequest(method, opts)
originalErr := err
resp, err = fallbackToHTTP(client, method, opts)
if err != nil {
return nil, err
// Sometimes the request fails for a reason unrelated to https.
// To keep this reason visible, we always include the original
// error.
err = fmt.Errorf(
"failed to fallback from https to http, original https error: %s,\n"+
"fallback http error: %s", originalErr, err)
}
httpReq.URL.Scheme = "http"
resp, err = client.Do(httpReq)
}
if err != nil {
continue
}
if resp.StatusCode >= 500 && !opts.acceptedCodes[resp.StatusCode] {
if err != nil || shouldRetry(resp, opts) {
d := opts.retry.backoff.NextBackOff()
if d == backoff.Stop {
break // Backoff timed out.
}
time.Sleep(d)
continue
}
break
Expand Down Expand Up @@ -369,7 +378,7 @@ func Delete(url string, options ...SendOption) (*http.Response, error) {
return Send("DELETE", url, options...)
}

func newRequest(method string, opts sendOptions) (*http.Request, error) {
func newRequest(method string, opts *sendOptions) (*http.Request, error) {
req, err := http.NewRequest(method, opts.url.String(), opts.body)
if err != nil {
return nil, fmt.Errorf("new request: %s", err)
Expand All @@ -384,13 +393,37 @@ func newRequest(method string, opts sendOptions) (*http.Request, error) {
return req, nil
}

func fallbackToHTTP(
client *http.Client, method string, opts *sendOptions) (*http.Response, error) {

req, err := newRequest(method, opts)
if err != nil {
return nil, err
}
req.URL.Scheme = "http"

return client.Do(req)
}

func shouldFallbackToHTTP(req *http.Request, resp *http.Response, opts *sendOptions) bool {
if req.URL.Scheme == "http" { // Already in HTTP.
return false
}
// Try fallback on non-retryable errors.
return !shouldRetry(resp, opts)
}

func shouldRetry(resp *http.Response, opts *sendOptions) bool {
if resp != nil {
return (isRetryable(resp.StatusCode) && !opts.acceptedCodes[resp.StatusCode]) ||
(opts.retry.extraCodes[resp.StatusCode])
}
return false
}

func min(a, b time.Duration) time.Duration {
if a < b {
return a
}
return b
}

func is5xxResponse(resp *http.Response) bool {
return resp != nil && resp.StatusCode >= 500
}
Loading

0 comments on commit 07fab96

Please sign in to comment.