Skip to content

Commit

Permalink
New load balance strategy: latency.
Browse files Browse the repository at this point in the history
Select proxy with lowest connection latency.
  • Loading branch information
cyfdecyf committed Jul 9, 2014
1 parent a4fa281 commit edea358
Show file tree
Hide file tree
Showing 5 changed files with 206 additions and 19 deletions.
3 changes: 3 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type LoadBalanceMode byte
const (
loadBalanceBackup LoadBalanceMode = iota
loadBalanceHash
loadBalanceLatency
)

// allow the same tunnel ports as polipo
Expand Down Expand Up @@ -446,6 +447,8 @@ func (p configParser) ParseLoadBalance(val string) {
config.LoadBalance = loadBalanceBackup
case "hash":
config.LoadBalance = loadBalanceHash
case "latency":
config.LoadBalance = loadBalanceLatency
default:
Fatalf("invalid loadBalance mode: %s\n", val)
}
Expand Down
5 changes: 3 additions & 2 deletions doc/sample-config/rc
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ listen = http://127.0.0.1:7777

# 指定多个二级代理时使用的负载均衡策略,可选策略如下
#
# backup: 默认策略,优先使用第一个指定的二级代理,其他仅作备份使用
# hash: 根据请求的 host name,优先使用 hash 到的某一个二级代理
# backup: 默认策略,优先使用第一个指定的二级代理,其他仅作备份使用
# hash: 根据请求的 host name,优先使用 hash 到的某一个二级代理
# latency: 优先选择连接延迟最低的二级代理
#
# 一个二级代理连接失败后会依次尝试其他二级代理
# 失败的二级代理会以一定的概率再次尝试使用,因此恢复后会重新启用
Expand Down
4 changes: 2 additions & 2 deletions estimate_timeout.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ func estimateTimeout() {
}
return
onErr:
dialTimeout += 2
readTimeout += 2
dialTimeout += 2 * time.Second
readTimeout += 2 * time.Second
}

func runEstimateTimeout() {
Expand Down
193 changes: 187 additions & 6 deletions parent_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,16 @@ import (
"io"
"math/rand"
"net"
"sort"
"strconv"
"sync"
"time"
)

// Interface that all types of parent proxies should support.
type ParentProxy interface {
connect(*URL) (net.Conn, error)
getServer() string // for use in updating server latency
genConfig() string // for upgrading config
}

Expand Down Expand Up @@ -52,6 +56,10 @@ func initParentPool() {
case loadBalanceHash:
debug.Println("hash parent pool", len(backPool.parent))
parentProxy = &hashParentPool{*backPool}
case loadBalanceLatency:
debug.Println("latency parent pool", len(backPool.parent))
go updateParentProxyLatency()
parentProxy = newLatencyParentPool(backPool.parent)
}
}

Expand Down Expand Up @@ -149,6 +157,157 @@ func connectInOrder(url *URL, pp []ParentWithFail, start int) (srvconn net.Conn,
return nil, err
}

type ParentWithLatency struct {
ParentProxy
latency time.Duration
}

type latencyParentPool struct {
parent []ParentWithLatency
}

func newLatencyParentPool(parent []ParentWithFail) *latencyParentPool {
lp := &latencyParentPool{}
for _, p := range parent {
lp.add(p.ParentProxy)
}
return lp
}

func (pp *latencyParentPool) empty() bool {
return len(pp.parent) == 0
}

func (pp *latencyParentPool) add(parent ParentProxy) {
pp.parent = append(pp.parent, ParentWithLatency{parent, 0})
}

// Sort interface.
func (pp *latencyParentPool) Len() int {
return len(pp.parent)
}

func (pp *latencyParentPool) Swap(i, j int) {
p := pp.parent
p[i], p[j] = p[j], p[i]
}

func (pp *latencyParentPool) Less(i, j int) bool {
p := pp.parent
return p[i].latency < p[j].latency
}

const latencyMax = time.Hour

var latencyMutex sync.RWMutex

func (pp *latencyParentPool) connect(url *URL) (srvconn net.Conn, err error) {
var lp []ParentWithLatency
// Read slice first.
latencyMutex.RLock()
lp = pp.parent
latencyMutex.RUnlock()

var skipped []int
nproxy := len(lp)
if nproxy == 0 {
return nil, errors.New("no parent proxy")
}

for i := 0; i < nproxy; i++ {
parent := lp[i]
if parent.latency >= latencyMax {
skipped = append(skipped, i)
continue
}
if srvconn, err = parent.connect(url); err == nil {
debug.Println("lowest latency proxy", parent.getServer())
return
}
parent.latency = latencyMax
}
// last resort, try skipped one, not likely to succeed
for _, skippedId := range skipped {
if srvconn, err = lp[skippedId].connect(url); err == nil {
return
}
}
return nil, err
}

func (parent *ParentWithLatency) updateLatency(wg *sync.WaitGroup) {
defer wg.Done()
proxy := parent.ParentProxy
server := proxy.getServer()

host, port, err := net.SplitHostPort(server)
if err != nil {
panic("split host port parent server error" + err.Error())
}

// Resolve host name first, so latency does not include resolve time.
ip, err := net.LookupHost(host)
if err != nil {
parent.latency = latencyMax
return
}
ipPort := net.JoinHostPort(ip[0], port)

const N = 3
var total time.Duration
for i := 0; i < N; i++ {
now := time.Now()
cn, err := net.DialTimeout("tcp", ipPort, dialTimeout)
if err != nil {
debug.Println("latency update dial:", err)
total += time.Minute // 1 minute as penalty
continue
}
total += time.Now().Sub(now)
cn.Close()

time.Sleep(5 * time.Millisecond)
}
parent.latency = total / N
debug.Println("latency", server, parent.latency)
}

func (pp *latencyParentPool) updateLatency() {
// Create a copy, update latency for the copy.
var cp latencyParentPool
cp.parent = append(cp.parent, pp.parent...)

// cp.parent is value instead of pointer, if we use `_, p := range cp.parent`,
// the value in cp.parent will not be updated.
var wg sync.WaitGroup
wg.Add(len(cp.parent))
for i, _ := range cp.parent {
cp.parent[i].updateLatency(&wg)
}
wg.Wait()

// Sort according to latency.
sort.Stable(&cp)
debug.Println("lantency lowest proxy", cp.parent[0].getServer())

// Update parent slice.
latencyMutex.Lock()
pp.parent = cp.parent
latencyMutex.Unlock()
}

func updateParentProxyLatency() {
lp, ok := parentProxy.(*latencyParentPool)
if !ok {
return
}

for {
lp.updateLatency()
time.Sleep(60 * time.Second)
}
}

// http parent proxy
type httpParent struct {
server string
Expand All @@ -169,6 +328,10 @@ func newHttpParent(server string) *httpParent {
return &httpParent{server: server}
}

func (hp *httpParent) getServer() string {
return hp.server
}

func (hp *httpParent) genConfig() string {
if hp.userPasswd != "" {
return fmt.Sprintf("proxy = http://%s@%s", hp.userPasswd, hp.server)
Expand Down Expand Up @@ -223,12 +386,16 @@ func newShadowsocksParent(server string) *shadowsocksParent {
return &shadowsocksParent{server: server}
}

func (sp *shadowsocksParent) getServer() string {
return sp.server
}

func (sp *shadowsocksParent) genConfig() string {
if sp.method == "" {
return fmt.Sprintf("proxy = ss://table:%s@%s", sp.passwd, sp.server)
} else {
return fmt.Sprintf("proxy = ss://%s:%s@%s", sp.method, sp.passwd, sp.server)
method := sp.method
if method == "" {
method = "table"
}
return fmt.Sprintf("proxy = ss://%s:%s@%s", method, sp.passwd, sp.server)
}

func (sp *shadowsocksParent) initCipher(method, passwd string) {
Expand All @@ -255,6 +422,8 @@ func (sp *shadowsocksParent) connect(url *URL) (net.Conn, error) {
// cow parent proxy
type cowParent struct {
server string
method string
passwd string
cipher *ss.Cipher
}

Expand All @@ -272,11 +441,19 @@ func newCowParent(srv, method, passwd string) *cowParent {
if err != nil {
Fatal("create cow cipher:", err)
}
return &cowParent{srv, cipher}
return &cowParent{srv, method, passwd, cipher}
}

func (cp *cowParent) getServer() string {
return cp.server
}

func (cp *cowParent) genConfig() string {
return "" // no upgrading need
method := cp.method
if method == "" {
method = "table"
}
return fmt.Sprintf("proxy = cow://%s:%s@%s", method, cp.passwd, cp.server)
}

func (cp *cowParent) connect(url *URL) (net.Conn, error) {
Expand Down Expand Up @@ -332,6 +509,10 @@ func newSocksParent(server string) *socksParent {
return &socksParent{server}
}

func (sp *socksParent) getServer() string {
return sp.server
}

func (sp *socksParent) genConfig() string {
return fmt.Sprintf("proxy = socks5://%s", sp.server)
}
Expand Down
20 changes: 11 additions & 9 deletions proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,11 +192,11 @@ func newCowProxy(method, passwd, addr string) *cowProxy {
}

func (cp *cowProxy) genConfig() string {
if cp.method == "" {
return fmt.Sprintf("listen = cow://table:%s@%s", cp.passwd, cp.addr)
} else {
return fmt.Sprintf("listen = cow://%s:%s@%s", cp.method, cp.passwd, cp.addr)
method := cp.method
if method == "" {
method = "table"
}
return fmt.Sprintf("listen = cow://%s:%s@%s", method, cp.passwd, cp.addr)
}

func (cp *cowProxy) Addr() string {
Expand Down Expand Up @@ -1333,12 +1333,14 @@ func sendBodyChunked(w io.Writer, r *bufio.Reader, rdSize int) (err error) {
errl.Println("chunk size invalid:", err)
return
}
if debug {
// To debug getting malformed response status line with "0\r\n".
if c, ok := w.(*clientConn); ok {
debug.Printf("cli(%s) chunk size %d %#v\n", c.RemoteAddr(), size, string(s))
/*
if debug {
// To debug getting malformed response status line with "0\r\n".
if c, ok := w.(*clientConn); ok {
debug.Printf("cli(%s) chunk size %d %#v\n", c.RemoteAddr(), size, string(s))
}
}
}
*/
if size == 0 {
r.Skip(len(s))
if err = skipCRLF(r); err != nil {
Expand Down

0 comments on commit edea358

Please sign in to comment.