Skip to content

Commit

Permalink
Circuit breaker set up and documented
Browse files Browse the repository at this point in the history
  • Loading branch information
lonelycode committed Jul 16, 2015
1 parent 663ef09 commit c48298f
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 20 deletions.
57 changes: 57 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,62 @@
# DEV

- Added circuit breaker as a path-based option. To enable, add a new sectino to your versions `extended_paths` list:

circuit_breakers: [
{
path: "get",
method: "GET",
threshold_percent: 0.5,
samples: 5,
return_to_service_after: 60
}
]

Circuit breakers are individual on a singlie host, they do not centralise or pool back-end data, this is for speed. This means that in a load balanced environment where multiple Tyk nodes are used, some traffic can spill through as other nodes reach the sampling rate limit. This is for pure speed, adding a redis counter layer or data-store on every request to a servcie would jsut add latency.

Circuit breakers use a thresh-old-breaker pattern, so of sample size x if y% requests fail, trip the breaker.

The circuit breaker works across hosts (i.e. if you have multiple targets for an API, the samnple is across *all* upstream requests)

When a circuit breaker trips, it will fire and event: `BreakerTriggered` which you can define actions for in the `event_handlers` section:

```
event_handlers: {
events: {
BreakerTriggered: [
{
handler_name: "eh_log_handler",
handler_meta: {
prefix: "LOG-HANDLER-PREFIX"
}
},
{
handler_name: "eh_web_hook_handler",
handler_meta: {
method: "POST",
target_path: "http://posttestserver.com/post.php?dir=tyk-event-test",
template_path: "templates/breaker_webhook.json",
header_map: {
"X-Tyk-Test-Header": "Tyk v1.BANANA"
},
event_timeout: 10
}
}
]
}
},
```

Status codes are:

```
// BreakerTripped is sent when a breaker trips
BreakerTripped = 0

// BreakerReset is sent when a breaker resets
BreakerReset = 1
```

- Added round-robin load balancing support, to enable, set up in the API Definition under the `proxy` section:

...
Expand Down
54 changes: 41 additions & 13 deletions api_definition_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ func (a *APIDefinitionLoader) compileTransformPathSpec(paths []tykcommon.Templat
// This way we can iterate the whole array once, on match we break with status
thisURLSpec := []URLSpec{}

log.Info("Checking for transform paths...")
log.Debug("Checking for transform paths...")
for _, stringSpec := range paths {
log.Info("-- Generating path")
newSpec := URLSpec{}
Expand Down Expand Up @@ -514,29 +514,57 @@ func (a *APIDefinitionLoader) compileCircuitBreakerPathSpec(paths []tykcommon.Ci
a.generateRegex(stringSpec.Path, &newSpec, stat)
// Extend with method actions
newSpec.CircuitBreaker = ExtendedCircuitBreakerMeta{CircuitBreakerMeta: stringSpec}
log.Info("Initialising circuit breaker for: ", stringSpec.Path)
newSpec.CircuitBreaker.CB = circuit.NewRateBreaker(stringSpec.ThresholdPercent, stringSpec.Samples)

events := newSpec.CircuitBreaker.CB.Subscribe()
go func() {
path := stringSpec.Path
spec := apiSpec
breakerPtr := newSpec.CircuitBreaker.CB
timerActive := false
for {
e := <-events
spec.FireEvent(EVENT_BreakerTriggered,
EVENT_CurcuitBreakerMeta{
EventMetaDefault: EventMetaDefault{Message: "Breaker Tripped"},
CircuitEvent: e,
Path: path,
APIID: spec.APIID,
})
switch e {
case circuit.BreakerTripped:
log.Warning("[PROXY] [CIRCUIT BREKER] Breaker tripped for path: ", path)
log.Debug("Breaker tripped: ", e)
// Start a timer function
go func(timeout int, breaker *circuit.Breaker) {
time.Sleep(time.Duration(timeout) * time.Second)
breaker.Reset()
}(newSpec.CircuitBreaker.ReturnToServiceAfter, breakerPtr)

if !timerActive {
go func(timeout int, breaker *circuit.Breaker) {
log.Debug("-- Sleeping for (s): ", timeout)
time.Sleep(time.Duration(timeout) * time.Second)
log.Debug("-- Resetting breaker")
breaker.Reset()
timerActive = false
}(newSpec.CircuitBreaker.ReturnToServiceAfter, breakerPtr)
timerActive = true
}

if spec.Proxy.ServiceDiscovery.UseDiscoveryService {
if ServiceCache != nil {
log.Warning("[PROXY] [CIRCUIT BREKER] Refreshing host list")
ServiceCache.Delete(spec.APIID)
}
}

spec.FireEvent(EVENT_BreakerTriggered,
EVENT_CurcuitBreakerMeta{
EventMetaDefault: EventMetaDefault{Message: "Breaker Tripped"},
CircuitEvent: e,
Path: path,
APIID: spec.APIID,
})

case circuit.BreakerReset:
spec.FireEvent(EVENT_BreakerTriggered,
EVENT_CurcuitBreakerMeta{
EventMetaDefault: EventMetaDefault{Message: "Breaker Reset"},
CircuitEvent: e,
Path: path,
APIID: spec.APIID,
})

}
}
}()
Expand Down
12 changes: 7 additions & 5 deletions event_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,17 +205,19 @@ func (l LogMessageEventHandler) New(handlerConf interface{}) (TykEventHandler, e

// HandleEvent will be fired when the event handler instance is found in an APISpec EventPaths object during a request chain
func (l LogMessageEventHandler) HandleEvent(em EventMessage) {
var msgConf EVENT_QuotaExceededMeta
// type assert the metadata and then use it however you like
msgConf = em.EventMetaData.(EVENT_QuotaExceededMeta)

var formattedMsgString string
formattedMsgString = fmt.Sprintf("%s:%s:%s", l.conf["prefix"].(string), em.EventType, msgConf.Message)
formattedMsgString = fmt.Sprintf("%s:%s", l.conf["prefix"].(string), em.EventType)

// We can handle specific event types easily
if em.EventType == EVENT_QuotaExceeded {
msgConf := em.EventMetaData.(EVENT_QuotaExceededMeta)
formattedMsgString = fmt.Sprintf("%s:%s:%s:%s", formattedMsgString, msgConf.Key, msgConf.Origin, msgConf.Path)
}

if em.EventType == EVENT_BreakerTriggered {
msgConf := em.EventMetaData.(EVENT_CurcuitBreakerMeta)
formattedMsgString = fmt.Sprintf("%s:%s:%s: [STATUS] %v", formattedMsgString, msgConf.APIID, msgConf.Path, msgConf.CircuitEvent)
}

log.Warning(formattedMsgString)
}
7 changes: 7 additions & 0 deletions templates/breaker_webhook.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"event": "{{.EventType}}",
"message": "{{.EventMetaData.Message}}",
"api_id": "{{.EventMetaData.APIID}}",
"path": "{{.EventMetaData.Path}}",
"Status": "{{.EventMetaData.CircuitEvent}}"
}
8 changes: 6 additions & 2 deletions tyk_reverse_proxy_clone.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ func (p *ReverseProxy) CheckCircuitBreakerEnforced(spec *APISpec, req *http.Requ

if stat == StatusCircuitBreaker {
thisMeta := meta.(*ExtendedCircuitBreakerMeta)
log.Warning("CIRCUIT BREAKER ENFORCED: ", *thisMeta)
log.Debug("CB Enforced for path: ", *thisMeta)
return true, thisMeta
}

Expand Down Expand Up @@ -454,13 +454,17 @@ func (p *ReverseProxy) WrappedServeHTTP(rw http.ResponseWriter, req *http.Reques
var res *http.Response
var err error
if breakerEnforced {
log.Debug("ON REQUEST: Breaker status: ", breakerConf.CB.Ready())
if breakerConf.CB.Ready() {
res, err = transport.RoundTrip(outreq)
if err != nil {
breakerConf.CB.Fail()
} else {
breakerConf.CB.Success()
}
} else {
p.ErrorHandler.HandleError(rw, logreq, "Service temporarily unnavailable.", 503)
return nil
}
} else {
res, err = transport.RoundTrip(outreq)
Expand All @@ -473,7 +477,7 @@ func (p *ReverseProxy) WrappedServeHTTP(rw http.ResponseWriter, req *http.Reques

if p.TykAPISpec.Proxy.ServiceDiscovery.UseDiscoveryService {
if ServiceCache != nil {
log.Warning("[PROXY] [SERVICE DISCOVERY] Upstream host failed, refreshing host list")
log.Debug("[PROXY] [SERVICE DISCOVERY] Upstream host failed, refreshing host list")
ServiceCache.Delete(p.TykAPISpec.APIID)
}
}
Expand Down

0 comments on commit c48298f

Please sign in to comment.