Skip to content

Commit

Permalink
Hard timeouts on a path-by-path basis now enabled
Browse files Browse the repository at this point in the history
  • Loading branch information
lonelycode committed Jul 10, 2015
1 parent 4390370 commit 485295a
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 1 deletion.
17 changes: 17 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,23 @@
"write_timeout": 10
}

- It is now possible to set hard timeouts on a path-by-path basis, e.g. if you have a long-running microservice, but do not want to hold up a dependent client should a query take too long, you can enforce a timeout for that path so the requesting client is not held up forever (or maange it's own timeout). To do so, add this to the extended_paths section of your APi definition:

...
extended_paths: {
...
transform_response_headers: [],
hard_timeouts: [
{
path: "delay/5",
method: "GET",
timeout: 3
}
]
}
...


# v1.7
- Open APIs now support caching, body transforms and header transforms
- Added RPC storage backend for cloud-based suport. RPC server is built in vayala/gorpc, signature for the methods that need to be provideda are in the rpc_storage_handler.go file (see the dispatcher).
Expand Down
29 changes: 29 additions & 0 deletions api_definition_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ const (
HeaderInjected URLStatus = 6
HeaderInjectedResponse URLStatus = 7
TransformedResponse URLStatus = 8
HardTimeout URLStatus = 9
)

// RequestStatus is a custom type to avoid collisions
Expand All @@ -62,6 +63,7 @@ const (
StatusHeaderInjectedResponse RequestStatus = "Header injected on response"
StatusActionRedirect RequestStatus = "Found an Action, changing route"
StatusRedirectFlowByReply RequestStatus = "Exceptional action requested, redirecting flow!"
StatusHardTimeout RequestStatus = "Hard Timeout enforced on path"
)

// URLSpec represents a flattened specification for URLs, used to check if a proxy URL
Expand All @@ -75,6 +77,7 @@ type URLSpec struct {
TransformResponseAction TransformSpec
InjectHeaders tykcommon.HeaderInjectionMeta
InjectHeadersResponse tykcommon.HeaderInjectionMeta
HardTimeout tykcommon.HardTimeoutMeta
}

type TransformSpec struct {
Expand Down Expand Up @@ -471,6 +474,24 @@ func (a *APIDefinitionLoader) compileInjectedHeaderSpec(paths []tykcommon.Header
return thisURLSpec
}

func (a *APIDefinitionLoader) compileTimeoutPathSpec(paths []tykcommon.HardTimeoutMeta, stat URLStatus) []URLSpec {

// transform an extended configuration URL into an array of URLSpecs
// This way we can iterate the whole array once, on match we break with status
thisURLSpec := []URLSpec{}

for _, stringSpec := range paths {
newSpec := URLSpec{}
a.generateRegex(stringSpec.Path, &newSpec, stat)
// Extend with method actions
newSpec.HardTimeout = stringSpec

thisURLSpec = append(thisURLSpec, newSpec)
}

return thisURLSpec
}

func (a *APIDefinitionLoader) getExtendedPathSpecs(apiVersionDef tykcommon.VersionInfo) ([]URLSpec, bool) {
// TODO: New compiler here, needs to put data into a different structure

Expand All @@ -482,6 +503,7 @@ func (a *APIDefinitionLoader) getExtendedPathSpecs(apiVersionDef tykcommon.Versi
transformResponsePaths := a.compileTransformPathSpec(apiVersionDef.ExtendedPaths.TransformResponse, TransformedResponse)
headerTransformPaths := a.compileInjectedHeaderSpec(apiVersionDef.ExtendedPaths.TransformHeader, HeaderInjected)
headerTransformPathsOnResponse := a.compileInjectedHeaderSpec(apiVersionDef.ExtendedPaths.TransformResponseHeader, HeaderInjectedResponse)
hardTimeouts := a.compileTimeoutPathSpec(apiVersionDef.ExtendedPaths.HardTimeouts, HardTimeout)

combinedPath := []URLSpec{}
combinedPath = append(combinedPath, ignoredPaths...)
Expand All @@ -492,6 +514,7 @@ func (a *APIDefinitionLoader) getExtendedPathSpecs(apiVersionDef tykcommon.Versi
combinedPath = append(combinedPath, transformResponsePaths...)
combinedPath = append(combinedPath, headerTransformPaths...)
combinedPath = append(combinedPath, headerTransformPathsOnResponse...)
combinedPath = append(combinedPath, hardTimeouts...)

if len(whiteListPaths) > 0 {
return combinedPath, true
Expand Down Expand Up @@ -525,6 +548,8 @@ func (a *APISpec) getURLStatus(stat URLStatus) RequestStatus {
return StatusHeaderInjectedResponse
case TransformedResponse:
return StatusTransformResponse
case HardTimeout:
return StatusHardTimeout
default:
log.Error("URL Status was not one of Ignored, Blacklist or WhiteList! Blocking.")
return EndPointNotAllowed
Expand Down Expand Up @@ -625,6 +650,10 @@ func (a *APISpec) CheckSpecMatchesStatus(url string, method interface{}, RxPaths
if method != nil && method.(string) == v.TransformResponseAction.TemplateMeta.Method {
return true, &v.TransformResponseAction
}
case HardTimeout:
if method != nil && method.(string) == v.HardTimeout.Method {
return true, &v.HardTimeout.TimeOut
}
}

}
Expand Down
2 changes: 2 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,8 @@ func loadApps(APISpecs []APISpec, Muxer *http.ServeMux) {
}

proxy := TykNewSingleHostReverseProxy(remote, &referenceSpec)
// initialise the proxy
proxy.New(nil, &referenceSpec)
referenceSpec.target = remote

// Create the response processors
Expand Down
65 changes: 64 additions & 1 deletion tyk_reverse_proxy_clone.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package main
import (
"bytes"
"github.com/gorilla/context"
//"github.com/lonelycode/tykcommon"
"io"
"io/ioutil"
"net"
Expand Down Expand Up @@ -68,9 +69,39 @@ type ReverseProxy struct {
FlushInterval time.Duration

TykAPISpec *APISpec
ErrorHandler ErrorHandler
ResponseHandler ResponseChain
}

var TykDefaultTransport http.RoundTripper = &http.Transport{
Proxy: http.ProxyFromEnvironment,
Dial: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).Dial,
TLSHandshakeTimeout: 10 * time.Second,
}

func GetTransport(timeOut int) http.RoundTripper {
if timeOut > 0 {
log.Debug("Setting timeout for outbound request to: ", timeOut)
var ModifiedTransport http.RoundTripper = &http.Transport{
Proxy: http.ProxyFromEnvironment,
Dial: (&net.Dialer{
Timeout: time.Duration(timeOut) * time.Second,
KeepAlive: 30 * time.Second,
}).Dial,
ResponseHeaderTimeout: time.Duration(timeOut) * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
}

return ModifiedTransport

}

return TykDefaultTransport
}

func singleJoiningSlash(a, b string) string {
aslash := strings.HasSuffix(a, "/")
bslash := strings.HasPrefix(b, "/")
Expand Down Expand Up @@ -132,6 +163,7 @@ func (p *ReverseProxy) ReturnRequestServeHttp(rw http.ResponseWriter, req *http.
}

func (p *ReverseProxy) New(c interface{}, spec *APISpec) (TykResponseHandler, error) {
p.ErrorHandler = ErrorHandler{TykMiddleware: &TykMiddleware{spec, p}}
return nil, nil
}

Expand All @@ -144,18 +176,42 @@ func (p *ReverseProxy) ServeHTTPForCache(rw http.ResponseWriter, req *http.Reque
return p.WrappedServeHTTP(rw, req, true)
}

func (p *ReverseProxy) CheckHardTimeoutEnforced(spec *APISpec, req *http.Request) (bool, int) {
var stat RequestStatus
var meta interface{}
var found bool

_, versionPaths, _, _ := spec.GetVersionData(req)
found, meta = spec.CheckSpecMatchesStatus(req.URL.Path, req.Method, versionPaths, HardTimeout)
if found {
stat = StatusHardTimeout
}

if stat == StatusHardTimeout {
thisMeta := meta.(*int)
log.Debug("HARD TIMEOUT ENFORCED: ", *thisMeta)
return true, *thisMeta
}

return false, 0
}

func (p *ReverseProxy) WrappedServeHTTP(rw http.ResponseWriter, req *http.Request, withCache bool) *http.Response {
transport := p.Transport
if transport == nil {
transport = http.DefaultTransport
// 1. Check if timeouts are set for this endpoint
_, timeout := p.CheckHardTimeoutEnforced(p.TykAPISpec, req)
transport = GetTransport(timeout)
}

// Do this before we make a shallow copy
sessVal := context.Get(req, SessionData)

outreq := new(http.Request)
logreq := new(http.Request)
log.Debug("UPSTREAM REQUEST URL: ", req.URL)
*outreq = *req // includes shallow copies of maps, but okay
*logreq = *req

p.Director(outreq)
outreq.Proto = "HTTP/1.1"
Expand All @@ -173,10 +229,13 @@ func (p *ReverseProxy) WrappedServeHTTP(rw http.ResponseWriter, req *http.Reques
if outreq.Header.Get(h) != "" {
if !copiedHeaders {
outreq.Header = make(http.Header)
logreq.Header = make(http.Header)
copyHeader(outreq.Header, req.Header)
copyHeader(logreq.Header, req.Header)
copiedHeaders = true
}
outreq.Header.Del(h)
logreq.Header.Del(h)
}
}

Expand All @@ -193,6 +252,10 @@ func (p *ReverseProxy) WrappedServeHTTP(rw http.ResponseWriter, req *http.Reques
res, err := transport.RoundTrip(outreq)
if err != nil {
log.Printf("http: proxy error: %v", err)
if strings.Contains(err.Error(), "timeout awaiting response headers") {
p.ErrorHandler.HandleError(rw, logreq, "Upstream service reached hard timeout.", 408)
return nil
}
rw.WriteHeader(http.StatusInternalServerError)
return nil
}
Expand Down

0 comments on commit 485295a

Please sign in to comment.