Skip to content

Commit

Permalink
handle FailMode
Browse files Browse the repository at this point in the history
  • Loading branch information
smallnest committed Oct 19, 2017
1 parent 4614e8b commit b86662f
Showing 1 changed file with 126 additions and 3 deletions.
129 changes: 126 additions & 3 deletions client/xclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import (
"context"
"errors"
"fmt"
"reflect"
"strings"
"sync"
"time"

ex "github.com/smallnest/rpcx/errors"
)
Expand Down Expand Up @@ -42,6 +44,7 @@ type ServiceDiscovery interface {
}

type xClient struct {
Retries int
failMode FailMode
selectMode SelectMode
cachedClient map[string]*Client
Expand All @@ -60,6 +63,7 @@ type xClient struct {
// NewXClient creates a XClient that supports service discovery and service governance.
func NewXClient(servicePath, serviceMethod string, failMode FailMode, selectMode SelectMode, discovery ServiceDiscovery, option Option) XClient {
client := &xClient{
Retries: 3,
failMode: failMode,
selectMode: selectMode,
discovery: discovery,
Expand Down Expand Up @@ -176,26 +180,145 @@ func (c *xClient) Call(ctx context.Context, args interface{}, reply interface{})
return ErrXClientShutdown
}

var err error
client, err := c.selectClient(ctx, c.servicePath, c.serviceMethod)
if err != nil {
return err
}

return client.Call(ctx, c.servicePath, c.serviceMethod, args, reply)
switch c.failMode {
case Failtry:
retries := c.Retries
for retries > 0 {
retries--
err = client.call(ctx, c.servicePath, c.serviceMethod, args, reply)
}
return err
case Failover:
retries := c.Retries
for retries > 0 {
retries--
err = client.call(ctx, c.servicePath, c.serviceMethod, args, reply)
if err == nil {
return nil
}

//select another server
client, err = c.selectClient(ctx, c.servicePath, c.serviceMethod)
if err != nil {
return err
}
}
return err

default: //Failfast
return client.call(ctx, c.servicePath, c.serviceMethod, args, reply)
}
}

// Broadcast sends requests to all servers and Success only when all servers return OK.
// FailMode and SelectMode are meanless for this method.
// Please set timeout to avoid hanging.
func (c *xClient) Broadcast(ctx context.Context, args interface{}, reply interface{}) error {
var clients []*Client
c.mu.RLock()
for k := range c.servers {
client, err := c.getCachedClient(k)
if err != nil {
c.mu.RUnlock()
return err
}
clients = append(clients, client)
}
c.mu.RUnlock()

return nil
if len(clients) == 0 {
return ErrXClientNoServer
}

var err error
l := len(clients)
done := make(chan bool, l)
for _, client := range clients {
client := client
go func() {
err = client.Call(ctx, c.servicePath, c.serviceMethod, args, reply)
done <- (err == nil)
return
}()
}

timeout := time.After(time.Minute)
check:
for {
select {
case result := <-done:
l--
if l == 0 || !result { // all returns or some one returns an error
break check
}
case <-timeout:
break check
}
}

return err
}

// Fork sends requests to all servers and Success once one server returns OK.
// FailMode and SelectMode are meanless for this method.
func (c *xClient) Fork(ctx context.Context, args interface{}, reply interface{}) error {
var clients []*Client
c.mu.RLock()
for k := range c.servers {
client, err := c.getCachedClient(k)
if err != nil {
c.mu.RUnlock()
return err
}
clients = append(clients, client)
}
c.mu.RUnlock()

return nil
if len(clients) == 0 {
return ErrXClientNoServer
}

var err error
l := len(clients)
done := make(chan bool, l)
for _, client := range clients {
client := client
go func() {
clonedReply := reflect.New(reflect.ValueOf(reply).Elem().Type()).Interface()
err = client.Call(ctx, c.servicePath, c.serviceMethod, args, clonedReply)
done <- (err == nil)
if err == nil {
reflect.ValueOf(reply).Set(reflect.ValueOf(reply))
}
return
}()
}

timeout := time.After(time.Minute)
check:
for {
select {
case result := <-done:
l--
if result {
return nil
}
if l == 0 { // all returns or some one returns an error
break check
}

case <-timeout:
break check
}
}

return err
}

// Close closes this client and its underlying connnections to services.
Expand Down

0 comments on commit b86662f

Please sign in to comment.