Skip to content

Commit

Permalink
add etcd for registry
Browse files Browse the repository at this point in the history
  • Loading branch information
g4zhuj committed Apr 16, 2018
1 parent 16ab0af commit c7b4b8a
Show file tree
Hide file tree
Showing 15 changed files with 335 additions and 16 deletions.
49 changes: 49 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
@@ -1 +1,50 @@
package client

import (
"sync"

"google.golang.org/grpc"
)

//Client client
type Client struct {
sync.RWMutex
connPool map[string]*grpc.ClientConn
dialOpts []grpc.DialOption
}

//NewClient create a new client
func NewClient(opts []grpc.DialOption) *Client {
return &Client{
connPool: make(map[string]*grpc.ClientConn),
dialOpts: opts,
}
}

//GetConn get grpc conn with service name
func (cli *Client) GetConn(serviceName string) (*grpc.ClientConn, error) {
cli.RLock()
if conn, ok := cli.connPool[serviceName]; ok {
cli.RUnlock()
return conn, nil
}
cli.RUnlock()

cli.Lock()
defer cli.Unlock()

conn, err := grpc.Dial(serviceName, cli.dialOpts...)
if err != nil {
return nil, err
}
cli.connPool[serviceName] = conn
return conn, nil
}

//Close close specific service's client
func (cli *Client) Close(serviceName string) (err error) {
if conn, ok := cli.connPool[serviceName]; ok {
return conn.Close()
}
return nil
}
8 changes: 5 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ type ServConfiguration struct {

//ServiceConfig configures the etcd cluster.
type ServiceConfig struct {
ServiceName string `yaml:"service_name"`
ListenAddress string `yaml:"listene_address"`
AdvertisedAddress string `yaml:"advertised_address"`
ServiceName string `yaml:"service_name"`
ListenAddress string `yaml:"listene_address"`
AdvertisedAddress string `yaml:"advertised_address"`
RegistryRefreshTime int `yaml:"registry_refresh_time"`
RegistryTTL int `yaml:"registry_ttl"`
}

//TokenConfig config of token, default ttl:1 day, default token length 32 bytes.
Expand Down
14 changes: 14 additions & 0 deletions config/options.go
Original file line number Diff line number Diff line change
@@ -1 +1,15 @@
package config

//ServOptions contorl the behavior of server
type ServOptions struct {
}

//CliOptions contorl the behavior of client
type CliOptions struct {
}

//ServOption a function sets options on ServOptions
type ServOption func(c *ServOptions)

//CliOption a function sets options on CliOptions
type CliOption func(c *CliOptions)
1 change: 1 addition & 0 deletions examples/examples.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package examples
1 change: 0 additions & 1 deletion plugin/metrics.go

This file was deleted.

Empty file removed plugin/rate_limit.go
Empty file.
3 changes: 0 additions & 3 deletions plugin/zap_logger.go

This file was deleted.

10 changes: 2 additions & 8 deletions plugin/authentication.go → plugins/authentication.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
package plugin


package authorization
package plugins

import (
"fmt"
Expand All @@ -15,11 +12,9 @@ import (

"time"


"dana-tech.com/wbw/logs"
)


type BasicToken struct {
mutex sync.RWMutex
defaultToken string
Expand Down Expand Up @@ -141,7 +136,6 @@ func InitToken(defaultToken string) {
go BscToken.startTask(timeleft)
}


func getTokenFile() string {
dir, err := filepath.Abs(filepath.Dir(os.Args[0]))
if err != nil {
Expand All @@ -151,4 +145,4 @@ func getTokenFile() string {

path := currentDir + "/" + tokenFileName
return path
}
}
149 changes: 149 additions & 0 deletions plugins/etcd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package plugins

import (
"context"
"encoding/json"
"time"

wrapper "github.com/g4zhuj/grpc-wrapper"

etcd "github.com/coreos/etcd/clientv3"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/naming"
"google.golang.org/grpc/status"
)

const (
resolverTimeOut = 10 * time.Second
)

type etcdRegistry struct {
cancal context.CancelFunc
cli *etcd.Client
}

type etcdWatcher struct {
cli *etcd.Client
target string
cancel context.CancelFunc
ctx context.Context
watchChan etcd.WatchChan
}

//NewEtcdResolver create a resolver for grpc
func NewEtcdResolver(cli *etcd.Client) naming.Resolver {
return &etcdRegistry{
cli: cli,
}
}

//NewEtcdRegisty create a reistry for registering server addr
func NewEtcdRegisty(cli *etcd.Client) wrapper.Registry {
return &etcdRegistry{
cli: cli,
}
}

func (er *etcdRegistry) Resolve(target string) (naming.Watcher, error) {
ctx, cancel := context.WithTimeout(context.TODO(), resolverTimeOut)
w := &etcdWatcher{
cli: er.cli,
target: target + "/",
ctx: ctx,
cancel: cancel,
}
return w, nil
}

func (er *etcdRegistry) Register(ctx context.Context, target string, update naming.Update, opts ...wrapper.RegistryOptions) (err error) {
var upBytes []byte
if upBytes, err = json.Marshal(update); err != nil {
return status.Error(codes.InvalidArgument, err.Error())
}

ctx, cancel := context.WithTimeout(context.TODO(), resolverTimeOut)
er.cancal = cancel
lsCli := etcd.NewLease(er.cli)
var rgOpt wrapper.RegistryOption
for _, opt := range opts {
opt(&rgOpt)
}

switch update.Op {
case naming.Add:
lsRsp, err := lsCli.Grant(ctx, int64(rgOpt.TTL/time.Second))
etcdOpts := []etcd.OpOption{etcd.WithLease(lsRsp.ID)}
_, err = er.cli.KV.Put(ctx, target+"/"+update.Addr, string(upBytes), etcdOpts...)
lsRspChan, err := lsCli.KeepAlive(ctx, lsRsp.ID)
go func() {
for {
if _, ok := <-lsRspChan; !ok {
break
}
}
}()
case naming.Delete:
_, err = er.cli.Delete(ctx, target+"/"+update.Addr)
default:
return status.Error(codes.InvalidArgument, "unsupported op")
}
return nil
}

func (er *etcdRegistry) Close() {
er.cancal()
er.cli.Close()
}

func (ew *etcdWatcher) Next() ([]*naming.Update, error) {
var updates []*naming.Update
if ew.watchChan == nil {
//create new chan
resp, err := ew.cli.Get(ew.ctx, ew.target, etcd.WithPrefix(), etcd.WithSerializable())
if err != nil {
return nil, err
}
for _, kv := range resp.Kvs {
var upt naming.Update
if err := json.Unmarshal(kv.Value, &upt); err != nil {
continue
}
updates = append(updates, &upt)
}
opts := []etcd.OpOption{etcd.WithRev(resp.Header.Revision + 1), etcd.WithPrefix(), etcd.WithPrevKV()}
ew.watchChan = ew.cli.Watch(ew.ctx, ew.target, opts...)
return updates, nil
}

wrsp, ok := <-ew.watchChan
if !ok {
err := status.Error(codes.Unavailable, "etcd watch closed")
return nil, err
}
if wrsp.Err() != nil {
return nil, wrsp.Err()
}
for _, e := range wrsp.Events {
var upt naming.Update
var err error
switch e.Type {
case etcd.EventTypePut:
err = json.Unmarshal(e.Kv.Value, &upt)
upt.Op = naming.Add
case etcd.EventTypeDelete:
err = json.Unmarshal(e.PrevKv.Value, &upt)
upt.Op = naming.Delete
}

if err != nil {
continue
}
updates = append(updates, &upt)
}
return updates, nil
}

func (ew *etcdWatcher) Close() {
ew.cancel()
ew.cli.Close()
}
1 change: 1 addition & 0 deletions plugins/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package plugins
1 change: 1 addition & 0 deletions plugins/rate_limit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package plugins
1 change: 1 addition & 0 deletions plugins/zap_logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package plugins
25 changes: 25 additions & 0 deletions registry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package wrapper

import (
"context"
"time"

"google.golang.org/grpc/naming"
)

type RegistryOption struct {
TTL time.Duration
}
type RegistryOptions func(o *RegistryOption)

func WithTTL(ttl time.Duration) RegistryOptions {
return func(o *RegistryOption) {
o.TTL = ttl
}
}

//Registry registry
type Registry interface {
Register(ctx context.Context, target string, update naming.Update, opts ...RegistryOptions) (err error)
Close()
}
53 changes: 52 additions & 1 deletion server/server.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,56 @@
package server

import (
"context"
"fmt"
"log"
"net"

"github.com/coreos/etcd/clientv3"
"google.golang.org/grpc"
"google.golang.org/grpc/naming"
"google.golang.org/grpc/reflection"
)

//Server wrapper of grpc server
type Server struct {
type ServerWrapper struct {
grpc.Server
opt ServOption
resolver naming.Resolver
}

func NewServerWrapper() *ServerWrapper {

return &ServerWrapper{}
}

//Start start running server
func (s *ServerWrapper) Start() {

lis, err := net.Listen("tcp", port)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
s := grpc.NewServer()
pb.RegisterGreeterServer(s, &server{})

cli, err := clientv3.NewFromURL("http://localhost:2379")
if err != nil {
fmt.Printf("nameing err %v\n", err)
return
}
r := &etcdnaming.GRPCResolver{Client: cli}
r.Update(context.TODO(), "my-service", naming.Update{Op: naming.Add, Addr: "127.0.0.1:50052", Metadata: "..."})

// Register reflection service on gRPC server.
reflection.Register(s)
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}

//Stop stop tht server
func (s *ServerWrapper) Stop() {
s.resolver.Resolve
s.Server.Stop()
}
Loading

0 comments on commit c7b4b8a

Please sign in to comment.