Skip to content

Commit

Permalink
pkg/stash: implement a Redis lock for stashing (gomods#1116)
Browse files Browse the repository at this point in the history
* pkg/stash: implement a Redis lock for stashing

* fix tests

* fix op
  • Loading branch information
marwan-at-work authored Mar 11, 2019
1 parent c3d8a05 commit fb9437d
Show file tree
Hide file tree
Showing 59 changed files with 11,929 additions and 3 deletions.
5 changes: 5 additions & 0 deletions cmd/proxy/actions/app_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ func getSingleFlight(c *config.Config, checker storage.Checker) (stash.Wrapper,
}
endpoints := strings.Split(c.SingleFlight.Etcd.Endpoints, ",")
return stash.WithEtcd(endpoints, checker)
case "redis":
if c.SingleFlight == nil || c.SingleFlight.Redis == nil {
return nil, fmt.Errorf("Redis config must be present")
}
return stash.WithRedisLock(c.SingleFlight.Redis.Endpoint, checker)
default:
return nil, fmt.Errorf("unrecognized single flight type: %v", c.SingleFlightType)
}
Expand Down
7 changes: 6 additions & 1 deletion config.dev.toml
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ StatsExporter = "prometheus"
# we want to make sure only the first request gets to store the module,
# and the second request will wait for the first one to finish so that
# it doesn't override the storage.
# Options are ["memory", "etcd"]
# Options are ["memory", "etcd", "redis"]
# The default option is "memory" which means that only one instance of Athens
# should be used.
# Env override: ATHENS_SINGLE_FLIGHT_TYPE
Expand All @@ -169,6 +169,11 @@ SingleFlightType = "memory"
# as the SingleFlightType field above determines whether Etcd is used or not.
# Env override: ATHENS_ETCD_ENDPOINTS
Endpoints = "localhost:2379,localhost:22379,localhost:32379"
[SingleFlight.Redis]
# Endpoint is the redis endpoint for a SingleFlight lock.
# TODO(marwan): enable multiple endpoints for redis clusters.
# Env override: ATHENS_REDIS_ENDPOINT
Endpoint = "127.0.0.1:6379"

[Storage]
# Only storage backends that are specified in Proxy.StorageType are required here
Expand Down
4 changes: 4 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ services:
- 6832:6832/udp
- 5778:5778
- 16686:16686
redis:
image: redis
ports:
- 6379:6379
etcd0:
image: quay.io/coreos/etcd
ports:
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ require (
github.com/DataDog/datadog-go v0.0.0-20180822151419-281ae9f2d895 // indirect
github.com/DataDog/opencensus-go-exporter-datadog v0.0.0-20180917103902-e6c7f767dc57
github.com/aws/aws-sdk-go v1.15.24
github.com/bsm/redis-lock v8.0.0+incompatible
github.com/fatih/color v1.7.0
github.com/globalsign/mgo v0.0.0-20180828104044-6f9f54af1356
github.com/go-playground/locales v0.12.1 // indirect
github.com/go-playground/universal-translator v0.16.0 // indirect
github.com/go-redis/redis v6.15.2+incompatible
github.com/gobuffalo/envy v1.6.7
github.com/gobuffalo/httptest v1.0.4
github.com/gogo/protobuf v1.2.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ github.com/aws/aws-sdk-go v1.15.24/go.mod h1:mFuSZ37Z9YOHbQEwBWztmVzqXrEkub65tZo
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 h1:xJ4a3vCFaGF/jqvzLMYoU8P317H5OQ+Via4RmuPwCS0=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
github.com/bsm/redis-lock v8.0.0+incompatible h1:QgB0J2pNG8hUfndTIvpPh38F5XsUTTvO7x8Sls++9Mk=
github.com/bsm/redis-lock v8.0.0+incompatible/go.mod h1:8dGkQ5GimBCahwF2R67tqGCJbyDZSp0gzO7wq3pDrik=
github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
github.com/coreos/go-systemd v0.0.0-20180511133405-39ca1b05acc7 h1:u9SHYsPQNyt5tgDm3YN7+9dYrpK96E5wFilTFWIDZOM=
github.com/coreos/go-systemd v0.0.0-20180511133405-39ca1b05acc7/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
Expand All @@ -43,6 +45,8 @@ github.com/go-playground/locales v0.12.1 h1:2FITxuFt/xuCNP1Acdhv62OzaCiviiE4kotf
github.com/go-playground/locales v0.12.1/go.mod h1:IUMDtCfWo/w/mtMfIE/IG2K+Ey3ygWanZIBtBW0W2TM=
github.com/go-playground/universal-translator v0.16.0 h1:X++omBR/4cE2MNg91AoC3rmGrCjJ8eAeUP/K/EKx4DM=
github.com/go-playground/universal-translator v0.16.0/go.mod h1:1AnU7NaIRDWWzGEKwgtJRd2xk99HeFyHw3yid4rvQIY=
github.com/go-redis/redis v6.15.2+incompatible h1:9SpNVG76gr6InJGxoZ6IuuxaCOQwDAhzyXg+Bs+0Sb4=
github.com/go-redis/redis v6.15.2+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA=
github.com/gobuffalo/envy v1.6.7 h1:XMZGuFqTupAXhZTriQ+qO38QvNOSU/0rl3hEPCFci/4=
github.com/gobuffalo/envy v1.6.7/go.mod h1:N+GkhhZ/93bGZc6ZKhJLP6+m+tCNPKwgSpH9kaifseQ=
github.com/gobuffalo/httptest v1.0.4 h1:P0uKaPEjti1bbJmuBILE3QQ7iU1cS7oIkxVba5HbcVE=
Expand Down
5 changes: 4 additions & 1 deletion pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,10 @@ func createDefault() *Config {
SingleFlightType: "memory",
GlobalEndpoint: "http://localhost:3001",
TraceExporterURL: "http://localhost:14268",
SingleFlight: &SingleFlight{&Etcd{"localhost:2379,localhost:22379,localhost:32379"}},
SingleFlight: &SingleFlight{
Etcd: &Etcd{"localhost:2379,localhost:22379,localhost:32379"},
Redis: &Redis{"127.0.0.1:6379"},
},
}
}

Expand Down
9 changes: 8 additions & 1 deletion pkg/config/singleflight.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ package config
// backend configurations for a distributed
// lock or single flight mechanism.
type SingleFlight struct {
Etcd *Etcd
Etcd *Etcd
Redis *Redis
}

// Etcd holds client side configuration
Expand All @@ -13,3 +14,9 @@ type SingleFlight struct {
type Etcd struct {
Endpoints string `envconfig:"ATHENS_ETCD_ENDPOINTS"`
}

// Redis holds the client side configuration
// to connect to redis as a SingleFlight implementation.
type Redis struct {
Endpoint string `envconfig:"ATHENS_REDIS_ENDPOINT"`
}
73 changes: 73 additions & 0 deletions pkg/stash/with_redis.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package stash

import (
"context"
"time"

lock "github.com/bsm/redis-lock"
"github.com/go-redis/redis"
"github.com/gomods/athens/pkg/config"
"github.com/gomods/athens/pkg/errors"
"github.com/gomods/athens/pkg/observ"
"github.com/gomods/athens/pkg/storage"
)

// WithRedisLock returns a distributed singleflight
// using an redis cluster. If it cannot connect, it will return an error.
func WithRedisLock(endpoint string, checker storage.Checker) (Wrapper, error) {
const op errors.Op = "stash.WithRedisLock"
client := redis.NewClient(&redis.Options{
Network: "tcp",
Addr: endpoint,
})
_, err := client.Ping().Result()
if err != nil {
return nil, errors.E(op, err)
}

return func(s Stasher) Stasher {
return &redisLock{client, s, checker}
}, nil
}

type redisLock struct {
client *redis.Client
stasher Stasher
checker storage.Checker
}

func (s *redisLock) Stash(ctx context.Context, mod, ver string) (newVer string, err error) {
const op errors.Op = "redis.Stash"
ctx, span := observ.StartSpan(ctx, op.String())
defer span.End()
mv := config.FmtModVer(mod, ver)

// Obtain a new lock with default settings
lock, err := lock.Obtain(s.client, mv, &lock.Options{
LockTimeout: time.Minute * 5,
RetryCount: 60 * 5,
RetryDelay: time.Second,
})
if err != nil {
return ver, errors.E(op, err)
}
defer func() {
const op errors.Op = "redis.Unlock"
lockErr := lock.Unlock()
if err == nil && lockErr != nil {
err = errors.E(op, lockErr)
}
}()
ok, err := s.checker.Exists(ctx, mod, ver)
if err != nil {
return ver, errors.E(op, err)
}
if ok {
return ver, nil
}
newVer, err = s.stasher.Stash(ctx, mod, ver)
if err != nil {
return ver, errors.E(op, err)
}
return newVer, nil
}
82 changes: 82 additions & 0 deletions pkg/stash/with_redis_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package stash

import (
"context"
"fmt"
"os"
"strings"
"sync"
"testing"
"time"

"github.com/gomods/athens/pkg/storage"
"github.com/gomods/athens/pkg/storage/mem"
"golang.org/x/sync/errgroup"
)

// WithRedisLock will ensure that 5 concurrent requests will all get the first request's
// response. We can ensure that because only the first response does not return an error
// and therefore all 5 responses should have no error.
func TestWithRedisLock(t *testing.T) {
endpoint := os.Getenv("REDIS_TEST_ENDPOINT")
if len(endpoint) == 0 {
t.SkipNow()
}
strg, err := mem.NewStorage()
if err != nil {
t.Fatal(err)
}
ms := &mockRedisStasher{strg: strg}
wrapper, err := WithRedisLock(endpoint, strg)
if err != nil {
t.Fatal(err)
}
s := wrapper(ms)

var eg errgroup.Group
for i := 0; i < 5; i++ {
eg.Go(func() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
_, err := s.Stash(ctx, "mod", "ver")
return err
})
}

err = eg.Wait()
if err != nil {
t.Fatal(err)
}
}

// mockRedisStasher is like mockStasher
// but leverages in memory storage
// so that redis can determine
// whether to call the underlying stasher or not.
type mockRedisStasher struct {
strg storage.Backend
mu sync.Mutex
num int
}

func (ms *mockRedisStasher) Stash(ctx context.Context, mod, ver string) (string, error) {
time.Sleep(time.Millisecond * 100) // allow for second requests to come in.
ms.mu.Lock()
defer ms.mu.Unlock()
if ms.num == 0 {
err := ms.strg.Save(
ctx,
mod,
ver,
[]byte("mod file"),
strings.NewReader("zip file"),
[]byte("info file"),
)
if err != nil {
return "", err
}
ms.num++
return "", nil
}
return "", fmt.Errorf("second time error")
}
1 change: 1 addition & 0 deletions vendor/github.com/bsm/redis-lock/.gitignore

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions vendor/github.com/bsm/redis-lock/.travis.yml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

113 changes: 113 additions & 0 deletions vendor/github.com/bsm/redis-lock/Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit fb9437d

Please sign in to comment.