Skip to content

Commit

Permalink
implement all Selectors
Browse files Browse the repository at this point in the history
  • Loading branch information
smallnest committed Oct 19, 2017
1 parent b86662f commit a75b66a
Show file tree
Hide file tree
Showing 11 changed files with 503 additions and 20 deletions.
10 changes: 5 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ ineffassign:
ineffassign .

gocyclo:
@ gocyclo -over 20 $(shell find . -name "*.go" |egrep -v "pb\.go|_test\.go")
@ gocyclo -over 20 $(shell find . -name "*.go" |egrep -v "_testutils/*|vendor/*|pb\.go|_test\.go")

check: staticcheck gosimple unused ineffassign gocyclo
check: staticcheck gosimple ineffassign gocyclo

doc:
godoc -http=:6060
Expand All @@ -43,10 +43,10 @@ fmt:
go fmt ./...

build:
go build -tags "udp zookeeper etcd consul" ./...
go build -tags "udp zookeeper etcd consul ping" ./...

buildu:
go build -tags "udp zookeeper etcd consul" ./...
go build -tags "udp zookeeper etcd consul ping" ./...

test:
go test -tags "udp zookeeper etcd consul" ./...
go test -tags "udp zookeeper etcd consul ping" ./...
6 changes: 2 additions & 4 deletions client/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,6 @@ func newDirectHTTPConn(c *Client, network, address string) (net.Conn, error) {
path = share.DefaultRPCPath
}

network = "tcp"

var conn net.Conn
var tlsConn *tls.Conn
var err error
Expand All @@ -90,12 +88,12 @@ func newDirectHTTPConn(c *Client, network, address string) (net.Conn, error) {
dialer := &net.Dialer{
Timeout: c.option.ConnectTimeout,
}
tlsConn, err = tls.DialWithDialer(dialer, network, address, c.option.TLSConfig)
tlsConn, err = tls.DialWithDialer(dialer, "tcp", address, c.option.TLSConfig)
//or conn:= tls.Client(netConn, &config)

conn = net.Conn(tlsConn)
} else {
conn, err = net.DialTimeout(network, address, c.option.ConnectTimeout)
conn, err = net.DialTimeout("tcp", address, c.option.ConnectTimeout)
}
if err != nil {
log.Errorf("failed to dial server: %v", err)
Expand Down
62 changes: 62 additions & 0 deletions client/geo_utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package client

import (
"math"
"net/url"
"strconv"
)

func getClosestServer(lat1, lon1 float64, servers map[string]string) []string {
var server []string
min := math.MaxFloat64

for s, metadata := range servers {
if v, err := url.ParseQuery(metadata); err == nil {
lat2Str := v.Get("latitude")
lon2Str := v.Get("longitude")

if lat2Str == "" || lon2Str == "" {
continue
}

lat2, err := strconv.ParseFloat(lat2Str, 64)
if err != nil {
continue
}
lon2, err := strconv.ParseFloat(lon2Str, 64)
if err != nil {
continue
}

d := getDistanceFrom(lat1, lon1, lat2, lon2)
if d < min {
server = []string{s}
min = d
} else if d == min {
server = append(server, s)
}

}
}
return server
}

//https://gist.github.com/cdipaolo/d3f8db3848278b49db68
func getDistanceFrom(lat1, lon1, lat2, lon2 float64) float64 {
var la1, lo1, la2, lo2, r float64
la1 = lat1 * math.Pi / 180
lo1 = lon1 * math.Pi / 180
la2 = lat2 * math.Pi / 180
lo2 = lon2 * math.Pi / 180

r = 6378100 // Earth radius in METERS

// calculate
h := hsin(la2-la1) + math.Cos(la1)*math.Cos(la2)*hsin(lo2-lo1)

return 2 * r * math.Asin(math.Sqrt(h))
}

func hsin(theta float64) float64 {
return math.Pow(math.Sin(theta/2), 2)
}
51 changes: 51 additions & 0 deletions client/hash_utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package client

import (
"fmt"
"hash/fnv"
)

// Hash consistently chooses a hash bucket number in the range [0, numBuckets) for the given key. numBuckets must be >= 1.
func Hash(key uint64, buckets int32) int32 {
if buckets <= 0 {
buckets = 1
}

var b, j int64

for j < int64(buckets) {
b = j
key = key*2862933555777941757 + 1
j = int64(float64(b+1) * (float64(int64(1)<<31) / float64((key>>33)+1)))
}

return int32(b)
}

// HashString get a hash value of a string
func HashString(s string) uint64 {
h := fnv.New64a()
h.Write([]byte(s))
return h.Sum64()
}

// HashServiceAndArgs define a hash function
type HashServiceAndArgs func(len int, options ...interface{}) int

// ConsistentFunction define a hash function
// Return service address, like "[email protected]:8970"
type ConsistentAddrStrFunction func(options ...interface{}) string

// JumpConsistentHash selects a server by serviceMethod and args
func JumpConsistentHash(len int, options ...interface{}) int {
keyString := ""
for _, opt := range options {
keyString = keyString + "/" + toString(opt)
}
key := HashString(keyString)
return int(Hash(key, int32(len)))
}

func toString(obj interface{}) string {
return fmt.Sprintf("%v", obj)
}
20 changes: 20 additions & 0 deletions client/ping_excluded.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// +build !ping

package client

import (
"context"
"errors"
)

func newWeightedICMPSelector(servers map[string]string) Selector {
panic(errors.New("this lib has not been with tag 'ping' "))
}

func (s weightedICMPSelector) Select(ctx context.Context, servicePath, serviceMethod string, args interface{}) string {
return ""
}

func (s *weightedICMPSelector) UpdateServer(servers map[string]string) {

}
95 changes: 95 additions & 0 deletions client/ping_utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// +build ping

package client

import (
"context"
"net"
"strings"
"time"

fastping "github.com/tatsushid/go-fastping"
)

func newWeightedICMPSelector(servers map[string]string) Selector {
ss := createICMPWeighted(servers)
return &weightedICMPSelector{servers: ss}
}

func (s weightedICMPSelector) Select(ctx context.Context, servicePath, serviceMethod string, args interface{}) string {
ss := s.servers
if len(ss) == 0 {
return ""
}
w := nextWeighted(ss)
if w == nil {
return ""
}
return w.Server
}

func (s *weightedICMPSelector) UpdateServer(servers map[string]string) {
ss := createICMPWeighted(servers)
s.servers = ss
}

func createICMPWeighted(servers map[string]string) []*Weighted {
var ss []*Weighted
for k := range servers {
w := &Weighted{Server: k, Weight: 1, EffectiveWeight: 1}
server := strings.Split(k, "@")
host, _, _ := net.SplitHostPort(server[1])
rtt, _ := Ping(host)
rtt = CalculateWeight(rtt)
w.Weight = rtt
w.EffectiveWeight = rtt
ss = append(ss, w)
}

return ss
}

// Ping gets network traffic by ICMP
func Ping(host string) (rtt int, err error) {
rtt = 1000 //default and timeout is 1000 ms

p := fastping.NewPinger()
p.Network("udp")
ra, err := net.ResolveIPAddr("ip4:icmp", host)
if err != nil {
return 0, err
}
p.AddIPAddr(ra)

p.OnRecv = func(addr *net.IPAddr, r time.Duration) {
rtt = int(r.Nanoseconds() / 1000000)
}
// p.OnIdle = func() {

// }
err = p.Run()

return
}

// CalculateWeight converts the rtt to weighted by:
// 1. weight=191 if t <= 10
// 2. weight=201 -t if 10 < t <=200
// 3. weight=1 if 200 < t < 1000
// 4. weight = 0 if t >= 1000
//
// It means servers that ping time t < 10 will be preferred
// and servers won't be selected if t > 1000.
// It is hard coded based on Ops experience.
func CalculateWeight(rtt int) int {
switch {
case rtt > 0 && rtt <= 10:
return 191
case rtt > 10 && rtt <= 200:
return 201 - rtt
case rtt > 100 && rtt < 1000:
return 1
default:
return 0
}
}
Loading

0 comments on commit a75b66a

Please sign in to comment.