diff --git a/v2/distributed_gobreaker.go b/v2/distributed_gobreaker.go index e790647..cc530c9 100644 --- a/v2/distributed_gobreaker.go +++ b/v2/distributed_gobreaker.go @@ -23,6 +23,8 @@ type SharedState struct { // SharedDataStore stores the shared state of DistributedCircuitBreaker. type SharedDataStore interface { + Lock(name string) error + Unlock(name string) error GetData(name string) ([]byte, error) SetData(name string, data []byte) error } @@ -34,17 +36,28 @@ type DistributedCircuitBreaker[T any] struct { } // NewDistributedCircuitBreaker returns a new DistributedCircuitBreaker. -func NewDistributedCircuitBreaker[T any](store SharedDataStore, settings Settings) (*DistributedCircuitBreaker[T], error) { +func NewDistributedCircuitBreaker[T any](store SharedDataStore, settings Settings) (dcb *DistributedCircuitBreaker[T], err error) { if store == nil { return nil, ErrNoSharedStore } - dcb := &DistributedCircuitBreaker[T]{ + dcb = &DistributedCircuitBreaker[T]{ CircuitBreaker: NewCircuitBreaker[T](settings), store: store, } - _, err := dcb.getSharedState() + err = dcb.lock() + if err != nil { + return nil, err + } + defer func() { + e := dcb.unlock() + if err == nil { + err = e + } + }() + + _, err = dcb.getSharedState() if err == ErrNoSharedState { err = dcb.setSharedState(dcb.extract()) } @@ -55,8 +68,43 @@ func NewDistributedCircuitBreaker[T any](store SharedDataStore, settings Setting return dcb, nil } +const ( + mutexTimeout = 5 * time.Second + mutexWaitTime = 500 * time.Millisecond +) + +func (dcb *DistributedCircuitBreaker[T]) mutexKey() string { + return "gobreaker:mutex:" + dcb.name +} + +func (dcb *DistributedCircuitBreaker[T]) lock() error { + if dcb.store == nil { + return ErrNoSharedStore + } + + var err error + expiry := time.Now().Add(mutexTimeout) + for time.Now().Before(expiry) { + err = dcb.store.Lock(dcb.mutexKey()) + if err == nil { + return nil + } + + time.Sleep(mutexWaitTime) + } + return err +} + +func (dcb *DistributedCircuitBreaker[T]) unlock() error { + if dcb.store == nil { + return ErrNoSharedStore + } + + return dcb.store.Unlock(dcb.mutexKey()) +} + func (dcb *DistributedCircuitBreaker[T]) sharedStateKey() string { - return "gobreaker:" + dcb.name + return "gobreaker:state:" + dcb.name } func (dcb *DistributedCircuitBreaker[T]) getSharedState() (SharedState, error) { @@ -112,14 +160,25 @@ func (dcb *DistributedCircuitBreaker[T]) extract() SharedState { } // State returns the State of DistributedCircuitBreaker. -func (dcb *DistributedCircuitBreaker[T]) State() (State, error) { +func (dcb *DistributedCircuitBreaker[T]) State() (state State, err error) { shared, err := dcb.getSharedState() if err != nil { return shared.State, err } + err = dcb.lock() + if err != nil { + return state, err + } + defer func() { + e := dcb.unlock() + if err == nil { + err = e + } + }() + dcb.inject(shared) - state := dcb.CircuitBreaker.State() + state = dcb.CircuitBreaker.State() shared = dcb.extract() err = dcb.setSharedState(shared) @@ -127,22 +186,31 @@ func (dcb *DistributedCircuitBreaker[T]) State() (State, error) { } // Execute runs the given request if the DistributedCircuitBreaker accepts it. -func (dcb *DistributedCircuitBreaker[T]) Execute(req func() (T, error)) (T, error) { +func (dcb *DistributedCircuitBreaker[T]) Execute(req func() (T, error)) (t T, err error) { shared, err := dcb.getSharedState() if err != nil { - var defaultValue T - return defaultValue, err + return t, err } + err = dcb.lock() + if err != nil { + return t, err + } + defer func() { + e := dcb.unlock() + if err == nil { + err = e + } + }() + dcb.inject(shared) - t, e := dcb.CircuitBreaker.Execute(req) + t, err = dcb.CircuitBreaker.Execute(req) shared = dcb.extract() - err = dcb.setSharedState(shared) - if err != nil { - var defaultValue T - return defaultValue, err + e := dcb.setSharedState(shared) + if e != nil { + return t, e } - return t, e + return t, err } diff --git a/v2/distributed_gobreaker_test.go b/v2/distributed_gobreaker_test.go index 28a9d43..d242a6f 100644 --- a/v2/distributed_gobreaker_test.go +++ b/v2/distributed_gobreaker_test.go @@ -7,6 +7,8 @@ import ( "time" "github.com/alicebob/miniredis/v2" + "github.com/go-redsync/redsync/v4" + "github.com/go-redsync/redsync/v4/redis/goredis/v9" "github.com/redis/go-redis/v9" "github.com/stretchr/testify/assert" ) @@ -14,14 +16,48 @@ import ( type storeAdapter struct { ctx context.Context client *redis.Client + rs *redsync.Redsync + mutex map[string]*redsync.Mutex } -func (sa *storeAdapter) GetData(key string) ([]byte, error) { - return sa.client.Get(sa.ctx, key).Bytes() +func newStoreAdapter(client *redis.Client) *storeAdapter { + return &storeAdapter{ + ctx: context.Background(), + client: client, + rs: redsync.New(goredis.NewPool(client)), + mutex: map[string]*redsync.Mutex{}, + } +} + +func (sa *storeAdapter) Lock(name string) error { + mutex, ok := sa.mutex[name] + if ok { + return mutex.Lock() + } + + mutex = sa.rs.NewMutex(name, redsync.WithExpiry(mutexTimeout)) + sa.mutex[name] = mutex + return mutex.Lock() +} + +func (sa *storeAdapter) Unlock(name string) error { + mutex, ok := sa.mutex[name] + if ok { + var err error + ok, err = mutex.Unlock() + if ok && err == nil { + return nil + } + } + return errors.New("unlock failed") } -func (sa *storeAdapter) SetData(key string, value []byte) error { - return sa.client.Set(sa.ctx, key, value, 0).Err() +func (sa *storeAdapter) GetData(name string) ([]byte, error) { + return sa.client.Get(sa.ctx, name).Bytes() +} + +func (sa *storeAdapter) SetData(name string, data []byte) error { + return sa.client.Set(sa.ctx, name, data, 0).Err() } var redisServer *miniredis.Miniredis @@ -37,10 +73,7 @@ func setUpDCB() *DistributedCircuitBreaker[any] { Addr: redisServer.Addr(), }) - store := &storeAdapter{ - ctx: context.Background(), - client: client, - } + store := newStoreAdapter(client) dcb, err := NewDistributedCircuitBreaker[any](store, Settings{ Name: "TestBreaker", @@ -205,10 +238,7 @@ func TestCustomDistributedCircuitBreaker(t *testing.T) { Addr: mr.Addr(), }) - store := &storeAdapter{ - ctx: context.Background(), - client: client, - } + store := newStoreAdapter(client) customDCB, err = NewDistributedCircuitBreaker[any](store, Settings{ Name: "CustomBreaker", @@ -301,10 +331,7 @@ func TestCustomDistributedCircuitBreakerStateTransitions(t *testing.T) { Addr: mr.Addr(), }) - store := &storeAdapter{ - ctx: context.Background(), - client: client, - } + store := newStoreAdapter(client) dcb, err := NewDistributedCircuitBreaker[any](store, customSt) assert.NoError(t, err) diff --git a/v2/go.mod b/v2/go.mod index eb1204f..3489a6a 100644 --- a/v2/go.mod +++ b/v2/go.mod @@ -1,6 +1,8 @@ module github.com/sony/gobreaker/v2 -go 1.21 +go 1.22 + +toolchain go1.22.10 require github.com/stretchr/testify v1.8.4 @@ -8,12 +10,15 @@ require ( github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/hashicorp/errwrap v1.1.0 // indirect + github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/yuin/gopher-lua v1.1.1 // indirect ) require ( github.com/alicebob/miniredis/v2 v2.33.0 github.com/davecgh/go-spew v1.1.1 // indirect + github.com/go-redsync/redsync/v4 v4.13.0 github.com/pmezard/go-difflib v1.0.0 // indirect github.com/redis/go-redis/v9 v9.7.0 gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/v2/go.sum b/v2/go.sum index f36dd60..4984a19 100644 --- a/v2/go.sum +++ b/v2/go.sum @@ -2,20 +2,45 @@ github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a h1:HbKu58rmZp github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc= github.com/alicebob/miniredis/v2 v2.33.0 h1:uvTF0EDeu9RLnUEG27Db5I68ESoIxTiXbNUiji6lZrA= github.com/alicebob/miniredis/v2 v2.33.0/go.mod h1:MhP4a3EU7aENRi9aO+tHfTBZicLqQevyi/DJpoj6mi0= +github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= +github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= +github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= +github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= +github.com/go-redis/redis v6.15.9+incompatible h1:K0pv1D7EQUjfyoMql+r/jZqCLizCGKFlFgcHWWmHQjg= +github.com/go-redis/redis v6.15.9+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA= +github.com/go-redis/redis/v7 v7.4.1 h1:PASvf36gyUpr2zdOUS/9Zqc80GbM+9BDyiJSJDDOrTI= +github.com/go-redis/redis/v7 v7.4.1/go.mod h1:JDNMw23GTyLNC4GZu9njt15ctBQVn7xjRfnwdHj/Dcg= +github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI= +github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo= +github.com/go-redsync/redsync/v4 v4.13.0 h1:49X6GJfnbLGaIpBBREM/zA4uIMDXKAh1NDkvQ1EkZKA= +github.com/go-redsync/redsync/v4 v4.13.0/go.mod h1:HMW4Q224GZQz6x1Xc7040Yfgacukdzu7ifTDAKiyErQ= +github.com/gomodule/redigo v1.8.9 h1:Sl3u+2BI/kk+VEatbj0scLdrFhjPmbxOc1myhDP41ws= +github.com/gomodule/redigo v1.8.9/go.mod h1:7ArFNvsTjH8GMMzB4uy1snslv2BwmginuMs06a1uzZE= +github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= +github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= +github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/redis/go-redis/v9 v9.7.0 h1:HhLSs+B6O021gwzl+locl0zEDnyNkxMtf/Z3NNBMa9E= github.com/redis/go-redis/v9 v9.7.0/go.mod h1:f6zhXITC7JUJIlPEiBOTXxJgPLdZcA93GewI7inzyWw= +github.com/redis/rueidis v1.0.19 h1:s65oWtotzlIFN8eMPhyYwxlwLR1lUdhza2KtWprKYSo= +github.com/redis/rueidis v1.0.19/go.mod h1:8B+r5wdnjwK3lTFml5VtxjzGOQAC+5UmujoD12pDrEo= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/stvp/tempredis v0.0.0-20181119212430-b82af8480203 h1:QVqDTf3h2WHt08YuiTGPZLls0Wq99X9bWd0Q5ZSBesM= +github.com/stvp/tempredis v0.0.0-20181119212430-b82af8480203/go.mod h1:oqN97ltKNihBbwlX8dLpwxCl3+HnXKV/R0e+sRLd9C8= github.com/yuin/gopher-lua v1.1.1 h1:kYKnWBjvbNP4XLT3+bPEwAXJx262OhaHDWDVOPjL46M= github.com/yuin/gopher-lua v1.1.1/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw= +golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E= +golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=