Skip to content

Commit

Permalink
improve http group load balancing (fatedier#3131)
Browse files Browse the repository at this point in the history
  • Loading branch information
fatedier authored Oct 19, 2022
1 parent 3fbe6b6 commit cf66ca1
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 47 deletions.
2 changes: 1 addition & 1 deletion pkg/util/version/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"strings"
)

var version = "0.44.0"
var version = "0.45.0"

func Full() string {
return version
Expand Down
79 changes: 41 additions & 38 deletions pkg/util/vhost/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,19 +60,28 @@ func NewHTTPReverseProxy(option HTTPReverseProxyOptions, vhostRouter *Routers) *
// Modify incoming requests by route policies.
Director: func(req *http.Request) {
req.URL.Scheme = "http"
url := req.Context().Value(RouteInfoURL).(string)
routeByHTTPUser := req.Context().Value(RouteInfoHTTPUser).(string)
oldHost, _ := util.CanonicalHost(req.Context().Value(RouteInfoHost).(string))
rc := rp.GetRouteConfig(oldHost, url, routeByHTTPUser)
reqRouteInfo := req.Context().Value(RouteInfoKey).(*RequestRouteInfo)
oldHost, _ := util.CanonicalHost(reqRouteInfo.Host)

rc := rp.GetRouteConfig(oldHost, reqRouteInfo.URL, reqRouteInfo.HTTPUser)
if rc != nil {
if rc.RewriteHost != "" {
req.Host = rc.RewriteHost
}
// Set {domain}.{location}.{routeByHTTPUser} as URL host here to let http transport reuse connections.
// TODO(fatedier): use proxy name instead?

var endpoint string
if rc.ChooseEndpointFn != nil {
// ignore error here, it will use CreateConnFn instead later
endpoint, _ = rc.ChooseEndpointFn()
reqRouteInfo.Endpoint = endpoint
frpLog.Trace("choose endpoint name [%s] for http request host [%s] path [%s] httpuser [%s]",
endpoint, oldHost, reqRouteInfo.URL, reqRouteInfo.HTTPUser)
}
// Set {domain}.{location}.{routeByHTTPUser}.{endpoint} as URL host here to let http transport reuse connections.
req.URL.Host = rc.Domain + "." +
base64.StdEncoding.EncodeToString([]byte(rc.Location)) + "." +
base64.StdEncoding.EncodeToString([]byte(rc.RouteByHTTPUser))
base64.StdEncoding.EncodeToString([]byte(rc.RouteByHTTPUser)) + "." +
base64.StdEncoding.EncodeToString([]byte(endpoint))

for k, v := range rc.Headers {
req.Header.Set(k, v)
Expand All @@ -85,12 +94,9 @@ func NewHTTPReverseProxy(option HTTPReverseProxyOptions, vhostRouter *Routers) *
Transport: &http.Transport{
ResponseHeaderTimeout: rp.responseHeaderTimeout,
IdleConnTimeout: 60 * time.Second,
MaxIdleConnsPerHost: 5,
DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
url := ctx.Value(RouteInfoURL).(string)
host, _ := util.CanonicalHost(ctx.Value(RouteInfoHost).(string))
routerByHTTPUser := ctx.Value(RouteInfoHTTPUser).(string)
remote := ctx.Value(RouteInfoRemote).(string)
return rp.CreateConnection(host, url, routerByHTTPUser, remote)
return rp.CreateConnection(ctx.Value(RouteInfoKey).(*RequestRouteInfo), true)
},
Proxy: func(req *http.Request) (*url.URL, error) {
// Use proxy mode if there is host in HTTP first request line.
Expand All @@ -100,7 +106,7 @@ func NewHTTPReverseProxy(option HTTPReverseProxyOptions, vhostRouter *Routers) *
// Normal:
// GET / HTTP/1.1
// Host: example.com
urlHost := req.Context().Value(RouteInfoURLHost).(string)
urlHost := req.Context().Value(RouteInfoKey).(*RequestRouteInfo).URLHost
if urlHost != "" {
return req.URL, nil
}
Expand Down Expand Up @@ -143,14 +149,6 @@ func (rp *HTTPReverseProxy) GetRouteConfig(domain, location, routeByHTTPUser str
return nil
}

func (rp *HTTPReverseProxy) GetRealHost(domain, location, routeByHTTPUser string) (host string) {
vr, ok := rp.getVhost(domain, location, routeByHTTPUser)
if ok {
host = vr.payload.(*RouteConfig).RewriteHost
}
return
}

func (rp *HTTPReverseProxy) GetHeaders(domain, location, routeByHTTPUser string) (headers map[string]string) {
vr, ok := rp.getVhost(domain, location, routeByHTTPUser)
if ok {
Expand All @@ -160,15 +158,22 @@ func (rp *HTTPReverseProxy) GetHeaders(domain, location, routeByHTTPUser string)
}

// CreateConnection create a new connection by route config
func (rp *HTTPReverseProxy) CreateConnection(domain, location, routeByHTTPUser string, remoteAddr string) (net.Conn, error) {
vr, ok := rp.getVhost(domain, location, routeByHTTPUser)
func (rp *HTTPReverseProxy) CreateConnection(reqRouteInfo *RequestRouteInfo, byEndpoint bool) (net.Conn, error) {
host, _ := util.CanonicalHost(reqRouteInfo.Host)
vr, ok := rp.getVhost(host, reqRouteInfo.URL, reqRouteInfo.HTTPUser)
if ok {
if byEndpoint {
fn := vr.payload.(*RouteConfig).CreateConnByEndpointFn
if fn != nil {
return fn(reqRouteInfo.Endpoint, reqRouteInfo.RemoteAddr)
}
}
fn := vr.payload.(*RouteConfig).CreateConnFn
if fn != nil {
return fn(remoteAddr)
return fn(reqRouteInfo.RemoteAddr)
}
}
return nil, fmt.Errorf("%v: %s %s %s", ErrNoRouteFound, domain, location, routeByHTTPUser)
return nil, fmt.Errorf("%v: %s %s %s", ErrNoRouteFound, host, reqRouteInfo.URL, reqRouteInfo.HTTPUser)
}

func (rp *HTTPReverseProxy) CheckAuth(domain, location, routeByHTTPUser, user, passwd string) bool {
Expand Down Expand Up @@ -244,12 +249,7 @@ func (rp *HTTPReverseProxy) connectHandler(rw http.ResponseWriter, req *http.Req
return
}

url := req.Context().Value(RouteInfoURL).(string)
routeByHTTPUser := req.Context().Value(RouteInfoHTTPUser).(string)
domain, _ := util.CanonicalHost(req.Context().Value(RouteInfoHost).(string))
remoteAddr := req.Context().Value(RouteInfoRemote).(string)

remote, err := rp.CreateConnection(domain, url, routeByHTTPUser, remoteAddr)
remote, err := rp.CreateConnection(req.Context().Value(RouteInfoKey).(*RequestRouteInfo), false)
if err != nil {
_ = notFoundResponse().Write(client)
client.Close()
Expand Down Expand Up @@ -278,11 +278,6 @@ func parseBasicAuth(auth string) (username, password string, ok bool) {
}

func (rp *HTTPReverseProxy) injectRequestInfoToCtx(req *http.Request) *http.Request {
newctx := req.Context()
newctx = context.WithValue(newctx, RouteInfoURL, req.URL.Path)
newctx = context.WithValue(newctx, RouteInfoHost, req.Host)
newctx = context.WithValue(newctx, RouteInfoURLHost, req.URL.Host)

user := ""
// If url host isn't empty, it's a proxy request. Get http user from Proxy-Authorization header.
if req.URL.Host != "" {
Expand All @@ -294,8 +289,16 @@ func (rp *HTTPReverseProxy) injectRequestInfoToCtx(req *http.Request) *http.Requ
if user == "" {
user, _, _ = req.BasicAuth()
}
newctx = context.WithValue(newctx, RouteInfoHTTPUser, user)
newctx = context.WithValue(newctx, RouteInfoRemote, req.RemoteAddr)

reqRouteInfo := &RequestRouteInfo{
URL: req.URL.Path,
Host: req.Host,
HTTPUser: user,
RemoteAddr: req.RemoteAddr,
URLHost: req.URL.Host,
}
newctx := req.Context()
newctx = context.WithValue(newctx, RouteInfoKey, reqRouteInfo)
return req.Clone(newctx)
}

Expand Down
23 changes: 17 additions & 6 deletions pkg/util/vhost/vhost.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,18 @@ import (
type RouteInfo string

const (
RouteInfoURL RouteInfo = "url"
RouteInfoHost RouteInfo = "host"
RouteInfoHTTPUser RouteInfo = "httpUser"
RouteInfoRemote RouteInfo = "remote"
RouteInfoURLHost RouteInfo = "urlHost"
RouteInfoKey RouteInfo = "routeInfo"
)

type RequestRouteInfo struct {
URL string
Host string
HTTPUser string
RemoteAddr string
URLHost string
Endpoint string
}

type (
muxFunc func(net.Conn) (net.Conn, map[string]string, error)
httpAuthFunc func(net.Conn, string, string, string) (bool, error)
Expand Down Expand Up @@ -75,8 +80,12 @@ func NewMuxer(
return mux, nil
}

type ChooseEndpointFunc func() (string, error)

type CreateConnFunc func(remoteAddr string) (net.Conn, error)

type CreateConnByEndpointFunc func(endpoint, remoteAddr string) (net.Conn, error)

// RouteConfig is the params used to match HTTP requests
type RouteConfig struct {
Domain string
Expand All @@ -87,7 +96,9 @@ type RouteConfig struct {
Headers map[string]string
RouteByHTTPUser string

CreateConnFn CreateConnFunc
CreateConnFn CreateConnFunc
ChooseEndpointFn ChooseEndpointFunc
CreateConnByEndpointFn CreateConnByEndpointFunc
}

// listen for a new domain name, if rewriteHost is not empty and rewriteFunc is not nil
Expand Down
39 changes: 37 additions & 2 deletions server/group/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

type HTTPGroupController struct {
// groups by indexKey
// groups indexed by group name
groups map[string]*HTTPGroup

// register createConn for each group to vhostRouter.
Expand Down Expand Up @@ -65,7 +65,7 @@ type HTTPGroup struct {
location string
routeByHTTPUser string

// CreateConnFuncs indexed by echo proxy name
// CreateConnFuncs indexed by proxy name
createFuncs map[string]vhost.CreateConnFunc
pxyNames []string
index uint64
Expand All @@ -91,6 +91,8 @@ func (g *HTTPGroup) Register(
// the first proxy in this group
tmp := routeConfig // copy object
tmp.CreateConnFn = g.createConn
tmp.ChooseEndpointFn = g.chooseEndpoint
tmp.CreateConnByEndpointFn = g.createConnByEndpoint
err = g.ctl.vhostRouter.Add(routeConfig.Domain, routeConfig.Location, routeConfig.RouteByHTTPUser, &tmp)
if err != nil {
return
Expand Down Expand Up @@ -161,3 +163,36 @@ func (g *HTTPGroup) createConn(remoteAddr string) (net.Conn, error) {

return f(remoteAddr)
}

func (g *HTTPGroup) chooseEndpoint() (string, error) {
newIndex := atomic.AddUint64(&g.index, 1)
name := ""

g.mu.RLock()
group := g.group
domain := g.domain
location := g.location
routeByHTTPUser := g.routeByHTTPUser
if len(g.pxyNames) > 0 {
name = g.pxyNames[int(newIndex)%len(g.pxyNames)]
}
g.mu.RUnlock()

if name == "" {
return "", fmt.Errorf("no healthy endpoint for http group [%s], domain [%s], location [%s], routeByHTTPUser [%s]",
group, domain, location, routeByHTTPUser)
}
return name, nil
}

func (g *HTTPGroup) createConnByEndpoint(endpoint, remoteAddr string) (net.Conn, error) {
var f vhost.CreateConnFunc
g.mu.RLock()
f = g.createFuncs[endpoint]
g.mu.RUnlock()

if f == nil {
return nil, fmt.Errorf("no CreateConnFunc for endpoint [%s] in group [%s]", endpoint, g.group)
}
return f(remoteAddr)
}
23 changes: 23 additions & 0 deletions test/e2e/features/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,29 @@ var _ = ginkgo.Describe("[Feature: Group]", func() {

f.RunProcesses([]string{serverConf}, []string{clientConf})

// send first HTTP request
var contents []string
framework.NewRequestExpect(f).Port(vhostPort).
RequestModify(func(r *request.Request) {
r.HTTP().HTTPHost("example.com")
}).
Ensure(func(resp *request.Response) bool {
contents = append(contents, string(resp.Content))
return true
})

// send second HTTP request, should be forwarded to another service
framework.NewRequestExpect(f).Port(vhostPort).
RequestModify(func(r *request.Request) {
r.HTTP().HTTPHost("example.com")
}).
Ensure(func(resp *request.Response) bool {
contents = append(contents, string(resp.Content))
return true
})

framework.ExpectContainElements(contents, []string{"foo", "bar"})

// check foo and bar is ok
results := doFooBarHTTPRequest(vhostPort, "example.com")
framework.ExpectContainElements(results, []string{"foo", "bar"})
Expand Down

0 comments on commit cf66ca1

Please sign in to comment.