diff --git a/CHANGELOG.md b/CHANGELOG.md index 1c8778d6ab0..515e3e696e1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,23 @@ "write_timeout": 10 } + - It is now possible to set hard timeouts on a path-by-path basis, e.g. if you have a long-running microservice, but do not want to hold up a dependent client should a query take too long, you can enforce a timeout for that path so the requesting client is not held up forever (or maange it's own timeout). To do so, add this to the extended_paths section of your APi definition: + + ... + extended_paths: { + ... + transform_response_headers: [], + hard_timeouts: [ + { + path: "delay/5", + method: "GET", + timeout: 3 + } + ] + } + ... + + # v1.7 - Open APIs now support caching, body transforms and header transforms - Added RPC storage backend for cloud-based suport. RPC server is built in vayala/gorpc, signature for the methods that need to be provideda are in the rpc_storage_handler.go file (see the dispatcher). diff --git a/api_definition_manager.go b/api_definition_manager.go index f23d3f101a8..86c8030aa7f 100644 --- a/api_definition_manager.go +++ b/api_definition_manager.go @@ -39,6 +39,7 @@ const ( HeaderInjected URLStatus = 6 HeaderInjectedResponse URLStatus = 7 TransformedResponse URLStatus = 8 + HardTimeout URLStatus = 9 ) // RequestStatus is a custom type to avoid collisions @@ -62,6 +63,7 @@ const ( StatusHeaderInjectedResponse RequestStatus = "Header injected on response" StatusActionRedirect RequestStatus = "Found an Action, changing route" StatusRedirectFlowByReply RequestStatus = "Exceptional action requested, redirecting flow!" + StatusHardTimeout RequestStatus = "Hard Timeout enforced on path" ) // URLSpec represents a flattened specification for URLs, used to check if a proxy URL @@ -75,6 +77,7 @@ type URLSpec struct { TransformResponseAction TransformSpec InjectHeaders tykcommon.HeaderInjectionMeta InjectHeadersResponse tykcommon.HeaderInjectionMeta + HardTimeout tykcommon.HardTimeoutMeta } type TransformSpec struct { @@ -471,6 +474,24 @@ func (a *APIDefinitionLoader) compileInjectedHeaderSpec(paths []tykcommon.Header return thisURLSpec } +func (a *APIDefinitionLoader) compileTimeoutPathSpec(paths []tykcommon.HardTimeoutMeta, stat URLStatus) []URLSpec { + + // transform an extended configuration URL into an array of URLSpecs + // This way we can iterate the whole array once, on match we break with status + thisURLSpec := []URLSpec{} + + for _, stringSpec := range paths { + newSpec := URLSpec{} + a.generateRegex(stringSpec.Path, &newSpec, stat) + // Extend with method actions + newSpec.HardTimeout = stringSpec + + thisURLSpec = append(thisURLSpec, newSpec) + } + + return thisURLSpec +} + func (a *APIDefinitionLoader) getExtendedPathSpecs(apiVersionDef tykcommon.VersionInfo) ([]URLSpec, bool) { // TODO: New compiler here, needs to put data into a different structure @@ -482,6 +503,7 @@ func (a *APIDefinitionLoader) getExtendedPathSpecs(apiVersionDef tykcommon.Versi transformResponsePaths := a.compileTransformPathSpec(apiVersionDef.ExtendedPaths.TransformResponse, TransformedResponse) headerTransformPaths := a.compileInjectedHeaderSpec(apiVersionDef.ExtendedPaths.TransformHeader, HeaderInjected) headerTransformPathsOnResponse := a.compileInjectedHeaderSpec(apiVersionDef.ExtendedPaths.TransformResponseHeader, HeaderInjectedResponse) + hardTimeouts := a.compileTimeoutPathSpec(apiVersionDef.ExtendedPaths.HardTimeouts, HardTimeout) combinedPath := []URLSpec{} combinedPath = append(combinedPath, ignoredPaths...) @@ -492,6 +514,7 @@ func (a *APIDefinitionLoader) getExtendedPathSpecs(apiVersionDef tykcommon.Versi combinedPath = append(combinedPath, transformResponsePaths...) combinedPath = append(combinedPath, headerTransformPaths...) combinedPath = append(combinedPath, headerTransformPathsOnResponse...) + combinedPath = append(combinedPath, hardTimeouts...) if len(whiteListPaths) > 0 { return combinedPath, true @@ -525,6 +548,8 @@ func (a *APISpec) getURLStatus(stat URLStatus) RequestStatus { return StatusHeaderInjectedResponse case TransformedResponse: return StatusTransformResponse + case HardTimeout: + return StatusHardTimeout default: log.Error("URL Status was not one of Ignored, Blacklist or WhiteList! Blocking.") return EndPointNotAllowed @@ -625,6 +650,10 @@ func (a *APISpec) CheckSpecMatchesStatus(url string, method interface{}, RxPaths if method != nil && method.(string) == v.TransformResponseAction.TemplateMeta.Method { return true, &v.TransformResponseAction } + case HardTimeout: + if method != nil && method.(string) == v.HardTimeout.Method { + return true, &v.HardTimeout.TimeOut + } } } diff --git a/main.go b/main.go index 11d4b382808..9fffbdaedc8 100644 --- a/main.go +++ b/main.go @@ -381,6 +381,8 @@ func loadApps(APISpecs []APISpec, Muxer *http.ServeMux) { } proxy := TykNewSingleHostReverseProxy(remote, &referenceSpec) + // initialise the proxy + proxy.New(nil, &referenceSpec) referenceSpec.target = remote // Create the response processors diff --git a/tyk_reverse_proxy_clone.go b/tyk_reverse_proxy_clone.go index 659db53850a..a23dba8aa36 100644 --- a/tyk_reverse_proxy_clone.go +++ b/tyk_reverse_proxy_clone.go @@ -9,6 +9,7 @@ package main import ( "bytes" "github.com/gorilla/context" + //"github.com/lonelycode/tykcommon" "io" "io/ioutil" "net" @@ -68,9 +69,39 @@ type ReverseProxy struct { FlushInterval time.Duration TykAPISpec *APISpec + ErrorHandler ErrorHandler ResponseHandler ResponseChain } +var TykDefaultTransport http.RoundTripper = &http.Transport{ + Proxy: http.ProxyFromEnvironment, + Dial: (&net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + }).Dial, + TLSHandshakeTimeout: 10 * time.Second, +} + +func GetTransport(timeOut int) http.RoundTripper { + if timeOut > 0 { + log.Debug("Setting timeout for outbound request to: ", timeOut) + var ModifiedTransport http.RoundTripper = &http.Transport{ + Proxy: http.ProxyFromEnvironment, + Dial: (&net.Dialer{ + Timeout: time.Duration(timeOut) * time.Second, + KeepAlive: 30 * time.Second, + }).Dial, + ResponseHeaderTimeout: time.Duration(timeOut) * time.Second, + TLSHandshakeTimeout: 10 * time.Second, + } + + return ModifiedTransport + + } + + return TykDefaultTransport +} + func singleJoiningSlash(a, b string) string { aslash := strings.HasSuffix(a, "/") bslash := strings.HasPrefix(b, "/") @@ -132,6 +163,7 @@ func (p *ReverseProxy) ReturnRequestServeHttp(rw http.ResponseWriter, req *http. } func (p *ReverseProxy) New(c interface{}, spec *APISpec) (TykResponseHandler, error) { + p.ErrorHandler = ErrorHandler{TykMiddleware: &TykMiddleware{spec, p}} return nil, nil } @@ -144,18 +176,42 @@ func (p *ReverseProxy) ServeHTTPForCache(rw http.ResponseWriter, req *http.Reque return p.WrappedServeHTTP(rw, req, true) } +func (p *ReverseProxy) CheckHardTimeoutEnforced(spec *APISpec, req *http.Request) (bool, int) { + var stat RequestStatus + var meta interface{} + var found bool + + _, versionPaths, _, _ := spec.GetVersionData(req) + found, meta = spec.CheckSpecMatchesStatus(req.URL.Path, req.Method, versionPaths, HardTimeout) + if found { + stat = StatusHardTimeout + } + + if stat == StatusHardTimeout { + thisMeta := meta.(*int) + log.Debug("HARD TIMEOUT ENFORCED: ", *thisMeta) + return true, *thisMeta + } + + return false, 0 +} + func (p *ReverseProxy) WrappedServeHTTP(rw http.ResponseWriter, req *http.Request, withCache bool) *http.Response { transport := p.Transport if transport == nil { - transport = http.DefaultTransport + // 1. Check if timeouts are set for this endpoint + _, timeout := p.CheckHardTimeoutEnforced(p.TykAPISpec, req) + transport = GetTransport(timeout) } // Do this before we make a shallow copy sessVal := context.Get(req, SessionData) outreq := new(http.Request) + logreq := new(http.Request) log.Debug("UPSTREAM REQUEST URL: ", req.URL) *outreq = *req // includes shallow copies of maps, but okay + *logreq = *req p.Director(outreq) outreq.Proto = "HTTP/1.1" @@ -173,10 +229,13 @@ func (p *ReverseProxy) WrappedServeHTTP(rw http.ResponseWriter, req *http.Reques if outreq.Header.Get(h) != "" { if !copiedHeaders { outreq.Header = make(http.Header) + logreq.Header = make(http.Header) copyHeader(outreq.Header, req.Header) + copyHeader(logreq.Header, req.Header) copiedHeaders = true } outreq.Header.Del(h) + logreq.Header.Del(h) } } @@ -193,6 +252,10 @@ func (p *ReverseProxy) WrappedServeHTTP(rw http.ResponseWriter, req *http.Reques res, err := transport.RoundTrip(outreq) if err != nil { log.Printf("http: proxy error: %v", err) + if strings.Contains(err.Error(), "timeout awaiting response headers") { + p.ErrorHandler.HandleError(rw, logreq, "Upstream service reached hard timeout.", 408) + return nil + } rw.WriteHeader(http.StatusInternalServerError) return nil }