From cdb2ca2b29cab33a5ebaf6eb4bd6ff82b1e045b3 Mon Sep 17 00:00:00 2001 From: Tomasz Mielech Date: Mon, 23 Aug 2021 07:27:58 +0200 Subject: [PATCH 1/2] refactor connection.Do for better legibility --- cluster/cluster.go | 103 +++++++++++++++++++++++---------------------- 1 file changed, 52 insertions(+), 51 deletions(-) diff --git a/cluster/cluster.go b/cluster/cluster.go index 1eb5dff4..36a25e84 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -1,7 +1,7 @@ // // DISCLAIMER // -// Copyright 2017 ArangoDB GmbH, Cologne, Germany +// Copyright 2017-2021 ArangoDB GmbH, Cologne, Germany // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -18,6 +18,7 @@ // Copyright holder is ArangoDB GmbH, Cologne, Germany // // Author Ewout Prangsma +// Author Tomasz Mielech // package cluster @@ -31,7 +32,7 @@ import ( "sync" "time" - driver "github.com/arangodb/go-driver" + "github.com/arangodb/go-driver" ) const ( @@ -105,24 +106,15 @@ func (c *clusterConnection) NewRequest(method, path string) (driver.Request, err // Do performs a given request, returning its response. func (c *clusterConnection) Do(ctx context.Context, req driver.Request) (driver.Response, error) { - followLeaderRedirect := true + var timeout time.Duration + if ctx == nil { ctx = context.Background() - } else { - if v := ctx.Value(keyFollowLeaderRedirect); v != nil { - if on, ok := v.(bool); ok { - followLeaderRedirect = on - } - } - } - // Timeout management. - // We take the given timeout and divide it in 3 so we allow for other servers - // to give it a try if an earlier server fails. - deadline, hasDeadline := ctx.Deadline() - var timeout time.Duration - if hasDeadline { + } else if deadline, hasDeadline := ctx.Deadline(); hasDeadline { timeout = deadline.Sub(time.Now()) - } else { + } + + if timeout == 0 { timeout = c.defaultTimeout } @@ -149,54 +141,52 @@ func (c *clusterConnection) Do(ctx context.Context, req driver.Request) (driver. attempt := 1 for { - // Send request to specific endpoint with a 1/3 timeout (so we get 3 attempts) + // Send request to specific endpoint with a 1/[1,2,3] timeout (so we get 1 or 2 or 3 attempts) serverCtx, cancel := context.WithTimeout(ctx, durationPerRequest) resp, err := server.Do(serverCtx, req) cancel() - isNoLeaderResponse := false - if err == nil && resp.StatusCode() == 503 { - // Service unavailable, parse the body, perhaps this is a "no leader" - // case where we have to failover. - var aerr driver.ArangoError - if perr := resp.ParseBody("", &aerr); perr == nil && aerr.HasError { - if driver.IsNoLeader(aerr) { - isNoLeaderResponse = true - // Save error in case we have no more servers - err = aerr + if err == nil && resp.StatusCode() == http.StatusServiceUnavailable && c.canFollowLeader(ctx) { + // Service unavailable, handle the "no leader" error. + var errArango driver.ArangoError + + if resp.ParseBody("", &errArango) == nil && errArango.HasError && driver.IsNoLeader(errArango) { + attempt++ + if attempt > serverCount { + // A specific server was specified or all servers failed. Give up. + return nil, driver.WithStack(errArango) } + server = c.getNextServer() + continue } } - if !isNoLeaderResponse || !followLeaderRedirect { - if err == nil { - // We're done - return resp, nil - } - // No success yet - if driver.IsCanceled(err) { - // Request was cancelled, we return directly. + if err == nil { + // The response has been received. + return resp, nil + } + + // No success yet. + if driver.IsCanceled(err) { + // Request was cancelled, we return directly. + return nil, driver.WithStack(err) + } + + if req.Written() { + // Request has been written to network, do not failover. The error must be returned. + if driver.IsArangoError(err) { + // ArangoError, so we got an error response from server. return nil, driver.WithStack(err) } - // If we've completely written the request, we return the error, - // otherwise we'll failover to a new server. - if req.Written() { - // Request has been written to network, do not failover - if driver.IsArangoError(err) { - // ArangoError, so we got an error response from server. - return nil, driver.WithStack(err) - } - // Not an ArangoError, so it must be some kind of timeout, network ... error. - return nil, driver.WithStack(&driver.ResponseError{Err: err}) - } + + // Not an ArangoError, so it must be some kind of timeout, network ... error. + return nil, driver.WithStack(&driver.ResponseError{Err: err}) } - // Failed, try next server + // The request has not been written, try next server. attempt++ if attempt > serverCount { - // A specific server was specified, no failover. - // or - // We've tried all servers. Giving up. + // A specific server was specified or all servers failed. Give up. return nil, driver.WithStack(err) } server = c.getNextServer() @@ -354,3 +344,14 @@ func (c *clusterConnection) getNextServer() driver.Connection { c.current = (c.current + 1) % len(c.servers) return c.servers[c.current] } + +// canFollowLeader returns true if searching leader is enabled. +func (c *clusterConnection) canFollowLeader(ctx context.Context) bool { + if v := ctx.Value(keyFollowLeaderRedirect); v != nil { + if on, ok := v.(bool); ok && !on { + return false + } + } + + return true +} From 0dbf30c953f131803c7ed86b5f15a195678b05e0 Mon Sep 17 00:00:00 2001 From: Tomasz Mielech Date: Fri, 27 Aug 2021 12:24:32 +0200 Subject: [PATCH 2/2] testing --- http/connection.go | 117 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 117 insertions(+) diff --git a/http/connection.go b/http/connection.go index 7ff749ac..c9bf71c7 100644 --- a/http/connection.go +++ b/http/connection.go @@ -28,6 +28,7 @@ import ( "crypto/tls" "encoding/json" "fmt" + "io" "io/ioutil" "net" "net/http" @@ -227,6 +228,122 @@ func (c *httpConnection) NewRequest(method, path string) (driver.Request, error) } } +// waitForAvailability waits until connection in the pool is available. +// If Context is canceled then error is returned. +func (c *httpConnection) waitForAvailability(ctx context.Context) (context.CancelFunc, error) { + if c.connPool == nil { + return func() {}, nil + } + + select { + case t := <-c.connPool: + // The ticket was taken from the connection pool. + return func() { + // Give back the token. + c.connPool <- t + }, nil + case <-ctx.Done(): + // Context cancelled or expired + return func() {}, driver.WithStack(ctx.Err()) + } + +} + +type DataFetcher interface { + GetData() ([]byte, error) +} + +type bodyRead struct { + cancel context.CancelFunc + io.ReadCloser +} + +type httpRawResponse struct { + resp *http.Response +} + +//func (r *bodyRead) Read([]byte) (n int, error) { +// //if r.closed { +// // return nil, io.EOF +// //} +// // +// //b := make([]byte, 0, 512) +// //n, err := r.resp.Body.Read(b) +// //if err == nil { +// // return b[:n], nil +// //} +// // +// //if err != io.EOF { +// // r.resp.Body.Close() +// // r.cancel() +// // r.closed = true +// // return nil, err +// //} +// // +// //r.resp.Body.Close() +// //r.cancel() +// //r.closed = true +// // +// //return b[:n], nil +//} + +func (b *bodyRead) Close() error { + if b.cancel != nil { + b.cancel() + b.cancel = nil + } + + return b.ReadCloser.Close() +} + +// DoRaw TODO performs a given request, returning its response. +func (c *httpConnection) DoRaw(ctx context.Context, req driver.Request) (*http.Response, error) { + request, ok := req.(*httpRequest) + if !ok { + return nil, driver.WithStack(driver.InvalidArgumentError{Message: "request is not a httpRequest type"}) + } + + r, err := request.createHTTPRequest(c.endpoint) + rctx := ctx + if rctx == nil { + rctx = context.Background() + } + rctx = httptrace.WithClientTrace(rctx, &httptrace.ClientTrace{ + WroteRequest: func(info httptrace.WroteRequestInfo) { + request.WroteRequest(info) + }, + }) + + r = r.WithContext(rctx) + if err != nil { + return nil, driver.WithStack(err) + } + + // Block on too many concurrent connections + cancel, err := c.waitForAvailability(ctx) + if err != nil { + return nil, driver.WithStack(err) + } + + resp, err := c.client.Do(r) + if err != nil { + cancel() + return nil, driver.WithStack(err) + } + + resp.Body = &bodyRead{ + cancel: cancel, + ReadCloser: resp.Body, + } + + return resp, nil + //return &httpRawResponse{ + // resp.Body + // //res + // //resp: resp, + //}, nil +} + // Do performs a given request, returning its response. func (c *httpConnection) Do(ctx context.Context, req driver.Request) (driver.Response, error) { request, ok := req.(*httpRequest)