Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangxu19830126 committed Dec 20, 2018
1 parent 81ea716 commit 582bff8
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 29 deletions.
24 changes: 15 additions & 9 deletions pkg/proxy/dispatcher_runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package proxy
import (
"container/list"
"fmt"
"math/rand"
"net/url"
"regexp"
"sort"
Expand Down Expand Up @@ -91,6 +90,7 @@ type abstractSupportProtectedRuntime struct {
limiter *rate.Limiter
circuit metapb.CircuitStatus
cb *metapb.CircuitBreaker
barrier *util.RateBarrier
}

func (s *abstractSupportProtectedRuntime) getCircuitStatus() metapb.CircuitStatus {
Expand Down Expand Up @@ -171,6 +171,9 @@ func (s *serverRuntime) updateMeta(meta *metapb.Server) {
s.limiter = rate.NewLimiter(rate.Every(time.Second/time.Duration(meta.MaxQPS)), int(meta.MaxQPS))
s.status = metapb.Down
s.circuit = metapb.Open
if s.cb != nil {
s.barrier = util.NewRateBarrier(int(s.cb.HalfTrafficRate))
}
}

func (s *serverRuntime) getCheckURL() string {
Expand Down Expand Up @@ -407,6 +410,9 @@ func (a *apiRuntime) init() {
a.id = a.meta.ID
a.cb = a.meta.CircuitBreaker
a.circuit = metapb.Open
if a.cb != nil {
a.barrier = util.NewRateBarrier(int(a.cb.HalfTrafficRate))
}
if a.meta.MaxQPS > 0 {
a.limiter = rate.NewLimiter(rate.Every(time.Second/time.Duration(a.meta.MaxQPS)), int(a.meta.MaxQPS))
}
Expand Down Expand Up @@ -542,19 +548,20 @@ func (r *apiRule) validate(value []byte) bool {
}

type routingRuntime struct {
meta *metapb.Routing
rand *rand.Rand
meta *metapb.Routing
barrier *util.RateBarrier
}

func newRoutingRuntime(meta *metapb.Routing) *routingRuntime {
return &routingRuntime{
meta: meta,
rand: rand.New(rand.NewSource(time.Now().UnixNano())),
}
r := &routingRuntime{}
r.updateMeta(meta)

return r
}

func (a *routingRuntime) updateMeta(meta *metapb.Routing) {
a.meta = meta
a.barrier = util.NewRateBarrier(int(a.meta.TrafficRate))
}

func (a *routingRuntime) matches(apiID uint64, req *fasthttp.Request) bool {
Expand All @@ -568,8 +575,7 @@ func (a *routingRuntime) matches(apiID uint64, req *fasthttp.Request) bool {
}
}

n := a.rand.Intn(100)
return n < int(a.meta.TrafficRate)
return a.barrier.Allow()
}

func (a *routingRuntime) isUp() bool {
Expand Down
6 changes: 3 additions & 3 deletions pkg/proxy/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,12 +139,12 @@ func (c *proxyContext) circuitResourceID() uint64 {
return c.result.api.id
}

func (c *proxyContext) circuitBreaker() *metapb.CircuitBreaker {
func (c *proxyContext) circuitBreaker() (*metapb.CircuitBreaker, *util.RateBarrier) {
if c.result.dest != nil {
return c.result.dest.cb
return c.result.dest.cb, c.result.dest.barrier
}

return c.result.api.cb
return c.result.api.cb, c.result.api.barrier
}

func (c *proxyContext) circuitStatus() metapb.CircuitStatus {
Expand Down
21 changes: 4 additions & 17 deletions pkg/proxy/filter_circuit_breaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,18 @@ package proxy

import (
"errors"
"math/rand"
"net/http"
"time"

"github.com/fagongzi/gateway/pkg/filter"
"github.com/fagongzi/gateway/pkg/pb/metapb"
)

const (
// RateBase base rate
RateBase = 100
)

var (
// ErrCircuitClose resource is in circuit close
ErrCircuitClose = errors.New("resource is in circuit close")
// ErrCircuitHalfLimited resource is in circuit half, traffic limit
ErrCircuitHalfLimited = errors.New("resource is in circuit half, traffic limit")

rd = rand.New(rand.NewSource(time.Now().UnixNano()))
)

// CircuitBreakeFilter CircuitBreakeFilter
Expand All @@ -46,7 +38,7 @@ func (f *CircuitBreakeFilter) Name() string {
// Pre execute before proxy
func (f *CircuitBreakeFilter) Pre(c filter.Context) (statusCode int, err error) {
pc := c.(*proxyContext)
cb := pc.circuitBreaker()
cb, barrier := pc.circuitBreaker()
if cb == nil {
return f.BaseFilter.Pre(c)
}
Expand All @@ -64,7 +56,7 @@ func (f *CircuitBreakeFilter) Pre(c filter.Context) (statusCode int, err error)

return http.StatusOK, nil
case metapb.Half:
if limitAllow(cb.HalfTrafficRate) {
if barrier.Allow() {
return f.BaseFilter.Pre(c)
}

Expand All @@ -79,7 +71,7 @@ func (f *CircuitBreakeFilter) Pre(c filter.Context) (statusCode int, err error)
// Post execute after proxy
func (f *CircuitBreakeFilter) Post(c filter.Context) (statusCode int, err error) {
pc := c.(*proxyContext)
cb := pc.circuitBreaker()
cb, _ := pc.circuitBreaker()
if cb == nil {
return f.BaseFilter.Post(c)
}
Expand All @@ -98,7 +90,7 @@ func (f *CircuitBreakeFilter) Post(c filter.Context) (statusCode int, err error)
// PostErr execute proxy has errors
func (f *CircuitBreakeFilter) PostErr(c filter.Context) {
pc := c.(*proxyContext)
cb := pc.circuitBreaker()
cb, _ := pc.circuitBreaker()
if cb == nil {
f.BaseFilter.PostErr(c)
return
Expand All @@ -112,8 +104,3 @@ func (f *CircuitBreakeFilter) PostErr(c filter.Context) {
pc.changeCircuitStatusToClose()
}
}

func limitAllow(rate int32) bool {
randValue := rd.Intn(RateBase)
return randValue < int(rate)
}
59 changes: 59 additions & 0 deletions pkg/util/barrier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package util

import (
"math/rand"
"sync"
"sync/atomic"
)

var (
lock sync.Mutex
randSources = make(map[int][]int)
)

func createRandSourceByBase(base int) []int {
lock.Lock()
defer lock.Unlock()

if value, ok := randSources[base]; ok {
return value
}

value := make([]int, base, base)
for i := 0; i < base; i++ {
value[i] = i
}

rand.Shuffle(base, func(i, j int) {
value[i], value[j] = value[j], value[i]
})
randSources[base] = value
return value
}

// RateBarrier rand barrier
type RateBarrier struct {
source []int
op uint64
rate int
base int
}

// NewRateBarrier returns a barrier based by 100
func NewRateBarrier(rate int) *RateBarrier {
return NewRateBarrierBase(rate, 100)
}

// NewRateBarrierBase returns a barrier with base
func NewRateBarrierBase(rate, base int) *RateBarrier {
return &RateBarrier{
source: createRandSourceByBase(base),
rate: rate,
base: base,
}
}

// Allow returns true if allowed
func (b *RateBarrier) Allow() bool {
return b.source[int(atomic.AddUint64(&b.op, 1))%b.base] < b.rate
}

0 comments on commit 582bff8

Please sign in to comment.