Skip to content

Commit

Permalink
Introduce distributed mutex
Browse files Browse the repository at this point in the history
  • Loading branch information
YoshiyukiMineo authored Dec 29, 2024
1 parent aaba3f4 commit a373049
Showing 1 changed file with 82 additions and 14 deletions.
96 changes: 82 additions & 14 deletions v2/distributed_gobreaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ type SharedState struct {

// SharedDataStore stores the shared state of DistributedCircuitBreaker.
type SharedDataStore interface {
Lock(name string) error
Unlock(name string) error
GetData(name string) ([]byte, error)
SetData(name string, data []byte) error
}
Expand All @@ -34,16 +36,27 @@ type DistributedCircuitBreaker[T any] struct {
}

// NewDistributedCircuitBreaker returns a new DistributedCircuitBreaker.
func NewDistributedCircuitBreaker[T any](store SharedDataStore, settings Settings) (*DistributedCircuitBreaker[T], error) {
func NewDistributedCircuitBreaker[T any](store SharedDataStore, settings Settings) (dcb *DistributedCircuitBreaker[T], err error) {
if store == nil {
return nil, ErrNoSharedStore
}

dcb := &DistributedCircuitBreaker[T]{
dcb = &DistributedCircuitBreaker[T]{
CircuitBreaker: NewCircuitBreaker[T](settings),
store: store,
}

err = dcb.lock()
if err != nil {
return err
}
defer func() {
e := dcb.unlock()
if err == nil {
err = e
}
}()

_, err := dcb.getSharedState()
if err == ErrNoSharedState {
err = dcb.setSharedState(dcb.extract())
Expand All @@ -55,8 +68,43 @@ func NewDistributedCircuitBreaker[T any](store SharedDataStore, settings Setting
return dcb, nil
}

const (
mutexTimeout = 5 * time.Second
mutexWaitTime = 500 * time.Millisecond
)

func (dcb *DistributedCircuitBreaker[T]) mutexKey() string {
return "gobreaker:mutex:" + dcb.name
}

func (dcb *DistributedCircuitBreaker[T]) lock() error {
if dcb.store == nil {
return ErrNoSharedStore
}

var err error
expiry := time.Now().Add(mutexTimeout)
for time.Now().Before(expiry) {
err = dcb.store.Lock(dcb.mutexKey())
if err == nil {
return nil
}

time.Sleep(mutexWaitTime)
}
return err
}

func (dcb *DistributedCircuitBreaker[T]) unlock() error {
if dcb.store == nil {
return ErrNoSharedStore
}

return dcb.store.Unlock(dcb.mutexKey())
}

func (dcb *DistributedCircuitBreaker[T]) sharedStateKey() string {
return "gobreaker:" + dcb.name
return "gobreaker:state:" + dcb.name
}

func (dcb *DistributedCircuitBreaker[T]) getSharedState() (SharedState, error) {
Expand Down Expand Up @@ -112,37 +160,57 @@ func (dcb *DistributedCircuitBreaker[T]) extract() SharedState {
}

// State returns the State of DistributedCircuitBreaker.
func (dcb *DistributedCircuitBreaker[T]) State() (State, error) {
func (dcb *DistributedCircuitBreaker[T]) State() (state State, err error) {
shared, err := dcb.getSharedState()
if err != nil {
return shared.State, err
}

err = dcb.lock()
if err != nil {
return state, err
}
defer func() {
e := dcb.unlock()
if err == nil {
err = e
}
}()

dcb.inject(shared)
state := dcb.CircuitBreaker.State()
state = dcb.CircuitBreaker.State()
shared = dcb.extract()

err = dcb.setSharedState(shared)
return state, err
}

// Execute runs the given request if the DistributedCircuitBreaker accepts it.
func (dcb *DistributedCircuitBreaker[T]) Execute(req func() (T, error)) (T, error) {
func (dcb *DistributedCircuitBreaker[T]) Execute(req func() (T, error)) (t T, err error) {
shared, err := dcb.getSharedState()
if err != nil {
var defaultValue T
return defaultValue, err
return t, err
}

err = dcb.lock()
if err != nil {
return t, err
}
defer func() {
e := dcb.unlock()
if err == nil {
err = e
}
}()

dcb.inject(shared)
t, e := dcb.CircuitBreaker.Execute(req)
t, err = dcb.CircuitBreaker.Execute(req)
shared = dcb.extract()

err = dcb.setSharedState(shared)
if err != nil {
var defaultValue T
return defaultValue, err
e := dcb.setSharedState(shared)
if e != nil {
return t, e
}

return t, e
return t, err
}

0 comments on commit a373049

Please sign in to comment.