Skip to content

Commit

Permalink
Merge pull request weibocom#101 from Zha-Zha/bugfix/circuitBreaker
Browse files Browse the repository at this point in the history
Refactor CircuitBreaker filter
  • Loading branch information
rayzhang0603 authored Jan 15, 2019
2 parents a2a6e2d + cccbf6a commit ffea151
Show file tree
Hide file tree
Showing 16 changed files with 397 additions and 161 deletions.
1 change: 1 addition & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ func (c *Client) BuildRequest(method string, args []interface{}) motan.Request {
}
req.SetAttachment(mpro.MSource, application)
req.SetAttachment(mpro.MGroup, c.url.Group)
req.SetAttachment(mpro.MPath, req.GetServiceName())
return req
}

Expand Down
2 changes: 1 addition & 1 deletion cluster/motanCluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (m *MotanCluster) initCluster() bool {
m.HaStrategy = m.extFactory.GetHa(m.url)
//lb
m.LoadBalance = m.extFactory.GetLB(m.url)
//filter
//filter should initialize after HaStrategy
m.initFilters()

if m.clusterFilter == nil {
Expand Down
6 changes: 3 additions & 3 deletions core/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ const (
Receive = "receive"
Decode = "decode"
Convert = "convert"
ClustFliter = "clustFilter"
EpFilterStart = "selectEp"
EpFilterEnd = "epFilter"
ClFilter = "clusterFilter"
EpFilterStart = "selectEndpoint"
EpFilterEnd = "endpointFilter"
Encode = "encode"
Send = "send"
)
Expand Down
20 changes: 10 additions & 10 deletions core/globalContext.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,6 @@ const (
basicReferKey = "basicRefer"
basicServiceKey = "basicService"

// default config file and path
configFile = "./motan.yaml"
configPath = "./"
fileSuffix = ".yaml"

// const for application pool
basicConfig = "basic.yaml"
servicePath = "services/"
Expand All @@ -59,6 +54,11 @@ type Context struct {
}

var (
// default config file and path
defaultConfigFile = "./motan.yaml"
defaultConfigPath = "./"
defaultFileSuffix = ".yaml"

urlFields = map[string]bool{"protocol": true, "host": true, "port": true, "path": true, "group": true}
)

Expand Down Expand Up @@ -133,7 +133,7 @@ func (c *Context) Initialize() {
var err error
if *Pool != "" { // parse application pool configs
if c.ConfigFile == "" {
c.ConfigFile = configPath
c.ConfigFile = defaultConfigPath
}
if !strings.HasSuffix(c.ConfigFile, "/") {
c.ConfigFile = c.ConfigFile + "/"
Expand All @@ -144,7 +144,7 @@ func (c *Context) Initialize() {
}
} else { // parse single config file and dynamic file
if c.ConfigFile == "" {
c.ConfigFile = configFile
c.ConfigFile = defaultConfigFile
}
if cfgRs, err = cfg.NewConfigFromFile(c.ConfigFile); err != nil {
fmt.Printf("parse config fail. err:%s\n", err.Error())
Expand Down Expand Up @@ -209,7 +209,7 @@ func parsePool(path string, pool string) (*cfg.Config, error) {
if application == "" && len(poolPart) > 0 { // the first part be the application name
application = poolPart[0]
}
application = path + applicationPath + application + fileSuffix
application = path + applicationPath + application + defaultFileSuffix
if application != "" {
appconfig, err = cfg.NewConfigFromFile(application)
if err == nil && appconfig != nil {
Expand All @@ -218,7 +218,7 @@ func parsePool(path string, pool string) (*cfg.Config, error) {
if err == nil && is != nil {
if li, ok := is.([]interface{}); ok {
for _, r := range li {
tempcfg, err = cfg.NewConfigFromFile(path + servicePath + r.(string) + fileSuffix)
tempcfg, err = cfg.NewConfigFromFile(path + servicePath + r.(string) + defaultFileSuffix)
if err == nil && tempcfg != nil {
c.Merge(tempcfg)
}
Expand All @@ -235,7 +235,7 @@ func parsePool(path string, pool string) (*cfg.Config, error) {
base := ""
for _, v := range poolPart {
base = base + v
tempcfg, err = cfg.NewConfigFromFile(path + poolPath + base + fileSuffix)
tempcfg, err = cfg.NewConfigFromFile(path + poolPath + base + defaultFileSuffix)
if err == nil && tempcfg != nil {
c.Merge(tempcfg)
}
Expand Down
2 changes: 1 addition & 1 deletion core/motan.go
Original file line number Diff line number Diff line change
Expand Up @@ -814,7 +814,7 @@ func (l *lastClusterFilter) NewFilter(url *URL) Filter {
func (l *lastClusterFilter) Filter(haStrategy HaStrategy, loadBalance LoadBalance, request Request) Response {
if request.GetRPCContext(true).Tc != nil {
// clusterFilter end
request.GetRPCContext(true).Tc.PutReqSpan(&Span{Name: ClustFliter, Time: time.Now()})
request.GetRPCContext(true).Tc.PutReqSpan(&Span{Name: ClFilter, Time: time.Now()})
}
response := haStrategy.Call(request, loadBalance)
if request.GetRPCContext(true).Tc != nil {
Expand Down
150 changes: 150 additions & 0 deletions filter/circuitBreaker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package filter

import (
"errors"
"strconv"

"github.com/afex/hystrix-go/hystrix"
motan "github.com/weibocom/motan-go/core"
"github.com/weibocom/motan-go/log"
)

const (
RequestVolumeThresholdField = "circuitBreaker.requestThreshold"
SleepWindowField = "circuitBreaker.sleepWindow" //ms
ErrorPercentThreshold = "circuitBreaker.errorPercent" //%
IncludeBizException = "circuitBreaker.bizException"
)

type CircuitBreakerFilter struct {
url *motan.URL
next motan.EndPointFilter
circuitBreaker *hystrix.CircuitBreaker
includeBizException bool
}

func (c *CircuitBreakerFilter) GetIndex() int {
return 20
}

func (c *CircuitBreakerFilter) GetName() string {
return CircuitBreaker
}

func (c *CircuitBreakerFilter) NewFilter(url *motan.URL) motan.Filter {
bizException := newCircuitBreaker(c.GetName(), url)
return &CircuitBreakerFilter{url: url, includeBizException: bizException}
}

func (c *CircuitBreakerFilter) Filter(caller motan.Caller, request motan.Request) motan.Response {
var response motan.Response
err := hystrix.Do(c.url.GetIdentity(), func() error {
response = c.GetNext().Filter(caller, request)
return checkException(response, c.includeBizException)
}, nil)
if err != nil {
return defaultErrMotanResponse(request, err.Error())
}
return response
}

func (c *CircuitBreakerFilter) HasNext() bool {
return c.next != nil
}

func (c *CircuitBreakerFilter) SetNext(nextFilter motan.EndPointFilter) {
c.next = nextFilter
}

func (c *CircuitBreakerFilter) GetNext() motan.EndPointFilter {
return c.next
}

func (c *CircuitBreakerFilter) GetType() int32 {
return motan.EndPointFilterType
}

func newCircuitBreaker(filterName string, url *motan.URL) bool {
bizExceptionStr := url.GetParam(IncludeBizException, "true")
bizException, err := strconv.ParseBool(bizExceptionStr)
if err != nil {
bizException = true
vlog.Warningf("[%s] parse config %s error, use default", filterName, IncludeBizException)
}
commandConfig := buildCommandConfig(filterName, url)
hystrix.ConfigureCommand(url.GetIdentity(), *commandConfig)
if _, _, err = hystrix.GetCircuit(url.GetIdentity()); err != nil {
vlog.Errorf("[%s] new circuit fail. err:%s, url:%v, config{%s}\n", err.Error(), filterName, url.GetIdentity(), getConfigStr(commandConfig)+"bizException:"+bizExceptionStr)
} else {
vlog.Infof("[%s] new circuit success. url:%v, config{%s}\n", filterName, url.GetIdentity(), getConfigStr(commandConfig)+"bizException:"+bizExceptionStr)
}
return bizException
}

func buildCommandConfig(filterName string, url *motan.URL) *hystrix.CommandConfig {
hystrix.DefaultMaxConcurrent = 1000
hystrix.DefaultTimeout = int(url.GetPositiveIntValue(motan.TimeOutKey, int64(hystrix.DefaultTimeout))) * 2
commandConfig := &hystrix.CommandConfig{}
if v, ok := url.Parameters[RequestVolumeThresholdField]; ok {
if temp, _ := strconv.Atoi(v); temp > 0 {
commandConfig.RequestVolumeThreshold = temp
} else {
vlog.Warningf("[%s] parse config %s error, use default", filterName, RequestVolumeThresholdField)
}
}
if v, ok := url.Parameters[SleepWindowField]; ok {
if temp, _ := strconv.Atoi(v); temp > 0 {
commandConfig.SleepWindow = temp
} else {
vlog.Warningf("[%s] parse config %s error, use default", filterName, SleepWindowField)
}
}
if v, ok := url.Parameters[ErrorPercentThreshold]; ok {
if temp, _ := strconv.Atoi(v); temp > 0 && temp <= 100 {
commandConfig.ErrorPercentThreshold = temp
} else {
vlog.Warningf("[%s] parse config %s error, use default", filterName, ErrorPercentThreshold)
}
}
return commandConfig
}

func defaultErrMotanResponse(request motan.Request, errMsg string) motan.Response {
response := &motan.MotanResponse{
RequestID: request.GetRequestID(),
Attachment: motan.NewStringMap(motan.DefaultAttachmentSize),
ProcessTime: 0,
Exception: &motan.Exception{
ErrCode: 400,
ErrMsg: errMsg,
ErrType: motan.ServiceException},
}
return response
}

func getConfigStr(config *hystrix.CommandConfig) string {
var ret string
if config.RequestVolumeThreshold != 0 {
ret += "requestThreshold:" + strconv.Itoa(config.RequestVolumeThreshold) + " "
} else {
ret += "requestThreshold:" + strconv.Itoa(hystrix.DefaultVolumeThreshold) + " "
}
if config.SleepWindow != 0 {
ret += "sleepWindow:" + strconv.Itoa(config.SleepWindow) + " "
} else {
ret += "sleepWindow:" + strconv.Itoa(hystrix.DefaultSleepWindow) + " "
}
if config.ErrorPercentThreshold != 0 {
ret += "errorPercent:" + strconv.Itoa(config.ErrorPercentThreshold) + " "
} else {
ret += "errorPercent:" + strconv.Itoa(hystrix.DefaultErrorPercentThreshold) + " "
}
return ret
}

func checkException(response motan.Response, includeBizException bool) error {
if ex := response.GetException(); ex != nil && (includeBizException || ex.ErrType != motan.BizException) {
return errors.New(ex.ErrMsg)
}
return nil
}
123 changes: 123 additions & 0 deletions filter/circuitBreaker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package filter

import (
"sync"
"sync/atomic"
"testing"
"time"

"github.com/weibocom/motan-go/core"
"github.com/weibocom/motan-go/endpoint"
"github.com/weibocom/motan-go/log"
)

var (
count = int64(0)
countLock sync.RWMutex
filterSleepTime = 7 * time.Millisecond
filterSleepTimeLock sync.RWMutex
)

func TestCircuitBreakerFilter(t *testing.T) {
//Init
defaultExtFactory := &core.DefaultExtensionFactory{}
defaultExtFactory.Initialize()
RegistDefaultFilters(defaultExtFactory)
endpoint.RegistDefaultEndpoint(defaultExtFactory)
url := &core.URL{Host: "127.0.0.1", Port: 7888, Protocol: "mockEndpoint"}
caller := defaultExtFactory.GetEndPoint(url)
request := &core.MotanRequest{Method: "testMethod"}

//Test NewFilter
param := map[string]string{core.TimeOutKey: "2", SleepWindowField: "300"}
filterURL := &core.URL{Host: "127.0.0.1", Port: 7888, Protocol: "mockEndpoint", Parameters: param}
f := defaultExtFactory.GetFilter(CircuitBreaker)
if f == nil {
t.Error("Can not find circuitBreaker filter!")
}
f = f.NewFilter(filterURL)
ef := f.(core.EndPointFilter)
ef.SetNext(new(mockEndPointFilter))

//Test circuitBreakerTimeout & requestVolumeThreshold
for i := 0; i < 30; i++ {
ef.Filter(caller, request)
}
time.Sleep(10 * time.Millisecond) //wait until async call complete
countLock.RLock()
if count != 20 && count != 21 {
t.Error("Test circuitBreakerTimeout failed! count:", count)
}
countLock.RUnlock()

//Test sleepWindow
time.Sleep(350 * time.Millisecond) //wait until SleepWindowField
for i := 0; i < 5; i++ {
ef.Filter(caller, request)
}
time.Sleep(10 * time.Millisecond) //wait until async call complete
countLock.RLock()
if count != 21 && count != 22 {
t.Error("Test sleepWindow failed! count:", count)
}
countLock.RUnlock()

//Test errorPercentThreshold
time.Sleep(350 * time.Millisecond) //wait until SleepWindowField
filterSleepTimeLock.Lock()
filterSleepTime = 0 * time.Millisecond
filterSleepTimeLock.Unlock()
for i := 0; i < 100; i++ {
ef.Filter(caller, request)
}
time.Sleep(10 * time.Millisecond) //wait until async call complete
filterSleepTimeLock.Lock()
filterSleepTime = 7 * time.Millisecond
filterSleepTimeLock.Unlock()
for i := 0; i < 50; i++ {
ef.Filter(caller, request)
}
time.Sleep(10 * time.Millisecond) //wait until async call complete
countLock.RLock()
if count != 171 && count != 172 {
t.Error("Test sleepWindow failed! count:", count)
}
countLock.RUnlock()
}

type mockEndPointFilter struct{}

func (m *mockEndPointFilter) GetName() string {
return "mockEndPointFilter"
}

func (m *mockEndPointFilter) NewFilter(url *core.URL) core.Filter {
return core.GetLastEndPointFilter()
}

func (m *mockEndPointFilter) Filter(caller core.Caller, request core.Request) core.Response {
countLock.Lock()
atomic.AddInt64(&count, 1)
countLock.Unlock()
filterSleepTimeLock.RLock()
time.Sleep(filterSleepTime)
filterSleepTimeLock.RUnlock()
return caller.Call(request)
}

func (m *mockEndPointFilter) HasNext() bool {
return false
}

func (m *mockEndPointFilter) SetNext(nextFilter core.EndPointFilter) {
vlog.Errorf("should not set next in mockEndPointFilter! filer:%s\n", nextFilter.GetName())
}
func (m *mockEndPointFilter) GetNext() core.EndPointFilter {
return nil
}
func (m *mockEndPointFilter) GetIndex() int {
return 100
}
func (m *mockEndPointFilter) GetType() int32 {
return core.EndPointFilterType
}
Loading

0 comments on commit ffea151

Please sign in to comment.