Skip to content

Commit

Permalink
ticlient: import ticlient (pingcap#1168)
Browse files Browse the repository at this point in the history
ticlient: import ticlient to store/tikv
  • Loading branch information
coocood committed Apr 27, 2016
1 parent 0031d3c commit 7072eda
Show file tree
Hide file tree
Showing 25 changed files with 3,697 additions and 6 deletions.
11 changes: 7 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ GO := $(GODEP) go
ARCH := "`uname -s`"
LINUX := "Linux"
MAC := "Darwin"
PACKAGES := $$(go list ./...| grep -vE 'tikv')

LDFLAGS += -X "github.com/pingcap/tidb/util/printer.TiDBBuildTS=$(shell date -u '+%Y-%m-%d %I:%M:%S')"
LDFLAGS += -X "github.com/pingcap/tidb/util/printer.TiDBGitHash=$(shell git rev-parse HEAD)"
Expand All @@ -48,7 +49,7 @@ godep:
go get -d github.com/pingcap/go-themis
go get -d github.com/pingcap/tso/client
go get -d github.com/pingcap/pd/pd-client
go get -d github.com/pingcap/ticlient
go get -d gopkg.in/fatih/pool.v2
go get -d github.com/pingcap/tipb/go-tipb

build:
Expand All @@ -61,7 +62,6 @@ update:
go get -u -d github.com/pingcap/go-hbase
go get -u -d github.com/pingcap/go-themis
go get -u -d github.com/pingcap/tso/client
go get -u -d github.com/pingcap/ticlient
go get -u -d github.com/pingcap/tipb/go-tipb

TEMP_FILE = temp_parser_file
Expand Down Expand Up @@ -121,17 +121,20 @@ todo:
test: gotest

gotest:
$(GO) test -cover ./...
$(GO) test -cover $(PACKAGES)

race:
$(GO) test --race -cover ./...
$(GO) test --race -cover $(PACKAGES)

ddl_test:
$(GO) test ./ddl/... -skip_ddl=false

ddl_race_test:
$(GO) test --race ./ddl/... -skip_ddl=false

tikv_test:
$(GO) test ./store/tikv/...

interpreter:
@cd interpreter && $(GO) build -ldflags '$(LDFLAGS)'

Expand Down
70 changes: 70 additions & 0 deletions store/tikv/backoff.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright 2016 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package tikv

import (
"math"
"math/rand"
"time"

"github.com/juju/errors"
)

const (
// NoJitter makes the backoff sequence strict exponential.
NoJitter = 1 + iota
// FullJitter applies random factors to strict exponential.
FullJitter
// EqualJitter is also randomized, but prevents very short sleeps.
EqualJitter
// DecorrJitter increases the maximum jitter based on the last random value.
DecorrJitter
)

// NewBackoff creates a backoff func which implements exponential backoff with
// optional jitters.
// See: http://www.awsarchitectureblog.com/2015/03/backoff.html
func NewBackoff(retry, base, cap, jitter int) func() error {
attempts := 0
totalSleep := 0
lastSleep := base
return func() error {
if attempts >= retry {
return errors.Errorf("still fail after %d retries, total sleep %dms", attempts, totalSleep)
}
var sleep int
switch jitter {
case NoJitter:
sleep = expo(base, cap, attempts)
case FullJitter:
v := expo(base, cap, attempts)
sleep = rand.Intn(v)
case EqualJitter:
v := expo(base, cap, attempts)
sleep = v/2 + rand.Intn(v/2)
case DecorrJitter:
sleep = int(math.Min(float64(cap), float64(base+rand.Intn(lastSleep*3-base))))
}
time.Sleep(time.Duration(sleep) * time.Millisecond)

attempts++
totalSleep += sleep
lastSleep = sleep
return nil
}
}

func expo(base, cap, n int) int {
return int(math.Min(float64(cap), float64(base)*math.Pow(2.0, float64(n))))
}
169 changes: 169 additions & 0 deletions store/tikv/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
// Copyright 2016 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

// Package tikv provides tcp connection to kvserver.
package tikv

import (
"net"
"sync/atomic"
"time"

"github.com/juju/errors"
"github.com/ngaut/log"
"github.com/pingcap/kvproto/pkg/coprocessor"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/msgpb"
"github.com/pingcap/kvproto/pkg/util"
"gopkg.in/fatih/pool.v2"
)

// Client is a client that sends RPC.
// It should not be used after calling Close().
type Client interface {
// Close should release all data.
Close() error
// SendKVReq sends kv request.
SendKVReq(req *kvrpcpb.Request) (*kvrpcpb.Response, error)
// SendCopReq sends coprocessor request.
SendCopReq(req *coprocessor.Request) (*coprocessor.Response, error)
}

// rpcClient connects kvserver to send Request by TCP.
type rpcClient struct {
dst string
msgID uint64
pool pool.Pool
}

const (
initConnection int = 0
maxConnecion int = 20
readTimeout time.Duration = 5 * time.Second // seconds
writeTimeout time.Duration = 5 * time.Second // seconds
connectTimeout time.Duration = 5 * time.Second // seconds
)

// rpcBackoff is for RPC (with TiKV) retry.
// It is expected to sleep for about 10s(+/-3s) in total before abort.
func rpcBackoff() func() error {
const (
maxRetry = 10
sleepBase = 100
sleepCap = 2000
)
return NewBackoff(maxRetry, sleepBase, sleepCap, EqualJitter)
}

// NewPBClient new client that sends protobuf, e.g.: NewPBClient("192.168.1.2:61234").
func NewPBClient(srvHost string) (Client, error) {
factory := func() (net.Conn, error) {
conn, err := net.DialTimeout("tcp", srvHost, connectTimeout)
if err != nil {
return nil, errors.Trace(err)
}
return conn, nil
}
p, err := pool.NewChannelPool(initConnection, maxConnecion, factory)
if err != nil {
return nil, errors.Errorf("new channel pool failed err[%s]", err)
}
return &rpcClient{
dst: srvHost,
msgID: 0,
pool: p,
}, nil
}

// SendCopReq sends a Request to co-processor and receives Response.
func (c *rpcClient) SendCopReq(req *coprocessor.Request) (*coprocessor.Response, error) {
conn, err := c.pool.Get()
if err != nil {
return nil, errors.Trace(err)
}
defer conn.Close()
msg := msgpb.Message{
MsgType: msgpb.MessageType_CopReq.Enum(),
CopReq: req,
}
err = c.doSend(conn, &msg)
if err == nil {
if msg.GetMsgType() != msgpb.MessageType_CopResp || msg.GetCopResp() == nil {
err = errors.Trace(errInvalidResponse)
}
}
if err != nil {
// This connection is not valid any more, so close its underlying conn.
if poolConn, ok := conn.(*pool.PoolConn); ok {
poolConn.MarkUnusable()
}
return nil, errors.Trace(err)
}
return msg.GetCopResp(), nil
}

// SendKVReq sends a Request to kv server and receives Response.
func (c *rpcClient) SendKVReq(req *kvrpcpb.Request) (*kvrpcpb.Response, error) {
conn, err := c.pool.Get()
if err != nil {
return nil, errors.Trace(err)
}
defer conn.Close()
msg := msgpb.Message{
MsgType: msgpb.MessageType_KvReq.Enum(),
KvReq: req,
}
err = c.doSend(conn, &msg)
if err == nil {
if msg.GetMsgType() != msgpb.MessageType_KvResp || msg.GetKvResp() == nil {
err = errors.Trace(errInvalidResponse)
}
}
if err != nil {
// This connection is not valid any more, so close its underlying conn.
if poolConn, ok := conn.(*pool.PoolConn); ok {
poolConn.MarkUnusable()
}
return nil, errors.Trace(err)
}
return msg.GetKvResp(), nil
}

// Close close client.
func (c *rpcClient) Close() error {
c.pool.Close()
return nil
}

func (c *rpcClient) doSend(conn net.Conn, msg *msgpb.Message) error {
curMsgID := atomic.AddUint64(&c.msgID, 1)
log.Debugf("Send request msgID[%d] type[%v]", curMsgID, msg.GetMsgType())
if err := conn.SetWriteDeadline(time.Now().Add(writeTimeout)); err != nil {
log.Warn("Set write deadline failed, it may be blocked.")
}
if err := util.WriteMessage(conn, curMsgID, msg); err != nil {
return errors.Trace(err)
}
if err := conn.SetReadDeadline(time.Now().Add(readTimeout)); err != nil {
log.Warn("Set read deadline failed, it may be blocked.")
}
msgID, err := util.ReadMessage(conn, msg)
if err != nil {
return errors.Trace(err)
}
if curMsgID != msgID {
log.Errorf("Sent msgID[%d] mismatches recv msgID[%d]", curMsgID, msgID)
}
log.Debugf("Receive response msgID[%d] type[%v]", msgID, msg.GetMsgType())
return nil
}
Loading

0 comments on commit 7072eda

Please sign in to comment.