Skip to content

Commit

Permalink
Added load balancing and service discovery support
Browse files Browse the repository at this point in the history
  • Loading branch information
lonelycode committed Jul 15, 2015
1 parent e5b7b14 commit 7e0cf7a
Show file tree
Hide file tree
Showing 4 changed files with 223 additions and 15 deletions.
80 changes: 80 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,85 @@
# DEV

- Added round-robin load balancing support, to enable, set up in the API Definition under the `proxy` section:

...
"enable_load_balancing": true,
"target_list": [
"http://server1",
"http://server2",
"http://server3"
],
...

- Added REST-based Servcie discovery for both single and load balanced entries (tested with etcd, but anything that returns JSON should work), to enable add a service discovery section to your Proxy section:

```
// Solo
service_discovery : {
use_discovery_service: true,
query_endpoint: "http://127.0.0.1:4001/v2/keys/services/single",
use_nested_query: true,
parent_data_path: "node.value",
data_path: "hostname",
port_data_path: "port",
use_target_list: false,
cache_timeout: 10
},


// With LB
"enable_load_balancing": true,
service_discovery: {
use_discovery_service: true,
query_endpoint: "http://127.0.0.1:4001/v2/keys/services/multiobj",
use_nested_query: true,
parent_data_path: "node.value",
data_path: "array.hostname",
port_data_path: "array.port",
use_target_list: true,
cache_timeout: 10
},
```

- For service discovery, multiple assumptions are made:
- The response data is in JSON
- The response data can have a nested value set that will be an encoded JSON string, e.g. from etcd:

```
$ curl -L http://127.0.0.1:4001/v2/keys/services/solo

{
"action": "get",
"node": {
"key": "/services/single",
"value": "{\"hostname\": \"http://httpbin.org\", \"port\": \"80\"}",
"modifiedIndex": 6,
"createdIndex": 6
}
}
```

```
$ curl -L http://127.0.0.1:4001/v2/keys/services/multiobj

{
"action": "get",
"node": {
"key": "/services/multiobj",
"value": "{\"array\":[{\"hostname\": \"http://httpbin.org\", \"port\": \"80\"},{\"hostname\": \"http://httpbin.org\", \"port\": \"80\"}]}",
"modifiedIndex": 9,
"createdIndex": 9
}
}
```

Here the key value is actually an encoded JSON string, which needs to be decoded separately to get to the data.

- In some cases port data will be separate from host data, if you specify a `port_data_path`, the values will be zipped together and concatenated into a valid proxy string.
- If use_target_list is enabled, then enable_load_balancing msut also be enabled, as Tyk will treat the list as a target list.
- The nested data object in a service registry key MUST be a JSON Object, **not just an Array**.


- Fixed bug where version parameter on POST requests would empty request body, streamlined request copies in general.
- it is now possible to use JSVM middleware on Open (Keyless) APIs
- It is now possible to configure the timeout parameters around the http server in the tyk.conf file:
Expand Down
1 change: 1 addition & 0 deletions api_definition_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ type APISpec struct {
Health HealthChecker
JSVM *JSVM
ResponseChain *[]TykResponseHandler
RoundRobin *RoundRobin
}

// APIDefinitionLoader will load an Api definition from a storage system. It has two methods LoadDefinitionsFromMongo()
Expand Down
21 changes: 21 additions & 0 deletions round_robin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package main

type RoundRobin struct {
pos int
max int
cur int
}

func (r *RoundRobin) SetMax(rp interface{}) {
r.max = len(*rp.(*[]string))
}

func (r *RoundRobin) GetPos() int {
r.cur = r.pos
r.pos += 1
if r.pos == (r.max) {
r.pos = 0
}
log.Warning("Returning index: ", r.cur)
return r.cur
}
136 changes: 121 additions & 15 deletions tyk_reverse_proxy_clone.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,52 +26,126 @@ import (

var ServiceCache *cache.Cache

func GetURLFromService(spec *APISpec) (string, error) {
func GetURLFromService(spec *APISpec) (interface{}, error) {
log.Debug("[PROXY] [SERVICE DISCOVERY] [CACHE] Checking:", spec.APIID)
serviceURL, found := ServiceCache.Get(spec.APIID)
if found {
return serviceURL.(string), nil
log.Debug("[PROXY] [SERVICE DISCOVERY] [CACHE] Cached value returned")
return serviceURL, nil
}

// Not found? Query the service
resp, err := http.Get(spec.Proxy.ServiceDiscovery.QueryEndpoint)
if err != nil {
return "", err
return nil, err
}

defer resp.Body.Close()
contents, readErr := ioutil.ReadAll(resp.Body)
if err != nil {
return "", readErr
return nil, readErr
}

jsonParsed, pErr := gabs.ParseJSON(contents)
if pErr != nil {
return "", pErr
return nil, pErr
}

value, ok := jsonParsed.Path(spec.Proxy.ServiceDiscovery.DataPath).Data().(string)
if !ok {
return "", errors.New("Failed to traverse data path")
if spec.Proxy.ServiceDiscovery.UseNestedQuery {
// If this is a nested query, we need to extract the nested JSON string first
log.Debug("[PROXY] [SERVICE DISCOVERY] Data: ", string(contents))
log.Debug("[PROXY] [SERVICE DISCOVERY] Nested parent path: ", string(spec.Proxy.ServiceDiscovery.ParentDataPath))
nestedValueObject, nestedOk := jsonParsed.Path(spec.Proxy.ServiceDiscovery.ParentDataPath).Data().(string)
if !nestedOk {
return nil, errors.New("Nested path traversal failed")
}
var secondNestedErr error
jsonParsed, secondNestedErr = gabs.ParseJSON([]byte(nestedValueObject))
if secondNestedErr != nil {
log.Debug("[PROXY] [SERVICE DISCOVERY] Nested object parsing error, data was: ", nestedValueObject)
return nil, secondNestedErr
}
}

var value interface{}
// Check if we are getting a target list or just a single value
if spec.Proxy.ServiceDiscovery.UseTargetList {
var valErr error
value, valErr = jsonParsed.Path(spec.Proxy.ServiceDiscovery.DataPath).Children()
if valErr != nil {
log.Debug("Failed to get children!")
return nil, valErr
}
// Some servcies separate out port numbers, lets take this into account
if spec.Proxy.ServiceDiscovery.PortDataPath != "" {
newHostArray := make([]string, len(value.([]*gabs.Container)))
portsData, portsErr := jsonParsed.Path(spec.Proxy.ServiceDiscovery.PortDataPath).Children()
if portsErr != nil {
return nil, portsErr
}

// Zip them up
for i, v := range value.([]*gabs.Container) {
asString := v.Data().(string) + ":" + portsData[i].Data().(string)
newHostArray[i] = asString
}
value = newHostArray

}
} else {
var ok bool
value, ok = jsonParsed.Path(spec.Proxy.ServiceDiscovery.DataPath).Data().(string)
if !ok {
return nil, errors.New("Failed to traverse data path")
}

if spec.Proxy.ServiceDiscovery.PortDataPath != "" {
portsData, pOk := jsonParsed.Path(spec.Proxy.ServiceDiscovery.PortDataPath).Data().(string)
if !pOk {
return nil, errors.New("Failed to traverse port data path")
}
newHost := value.(string) + ":" + portsData
value = newHost
}
}

if ServiceCache != nil {
// Set the cached value for this API
ServiceCache.Set(spec.APIID, value, time.Duration(spec.Proxy.ServiceDiscovery.CacheTimeout))
log.Debug("SETTING SERVICE CACHE TIMEOUT TO: ", spec.Proxy.ServiceDiscovery.CacheTimeout)
ServiceCache.Set(spec.APIID, value, time.Duration(spec.Proxy.ServiceDiscovery.CacheTimeout)*time.Second)
}

return value, nil
}

func GetNextTarget(targetData interface{}, spec *APISpec) string {
if spec.Proxy.EnableLoadBalancing {
log.Debug("[PROXY] [LOAD BALANCING] Load balancer enabled, getting upstream target")
// Use a list
spec.RoundRobin.SetMax(targetData)
td := *targetData.(*[]string)
return td[spec.RoundRobin.GetPos()]
}
// Use standard target - might still be service data
log.Debug("TARGET DATA:", targetData)
return targetData.(string)
}

// TykNewSingleHostReverseProxy returns a new ReverseProxy that rewrites
// URLs to the scheme, host, and base path provided in target. If the
// target's path is "/base" and the incoming request was for "/dir",
// the target request will be for /base/dir. This version modifies the
// stdlib version by also setting the host to the target, this allows
// us to work with heroku and other such providers
func TykNewSingleHostReverseProxy(target *url.URL, spec *APISpec) *ReverseProxy {
// initalise round robin
spec.RoundRobin = &RoundRobin{}
spec.RoundRobin.SetMax(&[]string{})

if spec.Proxy.ServiceDiscovery.UseDiscoveryService {
log.Info("[PROXY] Service discovery enabled")
if ServiceCache != nil {
log.Debug("[PROXY] Service discovery enabled")
if ServiceCache == nil {
log.Debug("[PROXY] Service cache initialising")
expiry := 120
if config.ServiceDiscovery.DefaultCacheTimeout > 0 {
expiry = config.ServiceDiscovery.DefaultCacheTimeout
Expand All @@ -82,23 +156,55 @@ func TykNewSingleHostReverseProxy(target *url.URL, spec *APISpec) *ReverseProxy

targetQuery := target.RawQuery
director := func(req *http.Request) {
var targetSet bool
if spec.Proxy.ServiceDiscovery.UseDiscoveryService {
tempTargetURL, tErr := GetURLFromService(spec)
if tErr != nil {
log.Error("[PROXY] [SERVICE DISCOVERY] Failed target lookup: ", tErr)
} else {
// No error, replace the target
remote, err := url.Parse(tempTargetURL)
if err != nil {
log.Error("[PROXY] [SERVICE DISCOVERY] Couldn't parse target URL:", err)
if spec.Proxy.EnableLoadBalancing {
var targetPtr []string = tempTargetURL.([]string)
remote, err := url.Parse(GetNextTarget(&targetPtr, spec))
if err != nil {
log.Error("[PROXY] [SERVICE DISCOVERY] Couldn't parse target URL:", err)
} else {
// Only replace target if everything is OK
target = remote
targetQuery = target.RawQuery
}
} else {
var targetPtr string = tempTargetURL.(string)
remote, err := url.Parse(GetNextTarget(targetPtr, spec))
if err != nil {
log.Error("[PROXY] [SERVICE DISCOVERY] Couldn't parse target URL:", err)
} else {
// Only replace target if everything is OK
target = remote
targetQuery = target.RawQuery
}
}
}
// We've overriden remote now, don;t need to do it again
targetSet = true
}

if !targetSet {
// no override, better check if LB is enabled
if spec.Proxy.EnableLoadBalancing {
// it is, lets get that target data
lbRemote, lbErr := url.Parse(GetNextTarget(&spec.Proxy.TargetList, spec))
if lbErr != nil {
log.Error("[PROXY] [LOAD BALANCING] Couldn't parse target URL:", lbErr)
} else {
// Only replace target if everything is OK
target = remote
target = lbRemote
targetQuery = target.RawQuery
}
}
}

// No override, and no load balancing? Use the existing target
req.URL.Scheme = target.Scheme
req.URL.Host = target.Host
req.URL.Path = singleJoiningSlash(target.Path, req.URL.Path)
Expand Down

0 comments on commit 7e0cf7a

Please sign in to comment.