From c7b4b8a7d360366a2ad9cd177ca3a8faa24a68d2 Mon Sep 17 00:00:00 2001 From: g4zhuj Date: Mon, 16 Apr 2018 19:05:28 +0800 Subject: [PATCH] add etcd for registry --- client/client.go | 49 +++++++++ config/config.go | 8 +- config/options.go | 14 +++ examples/examples.go | 1 + plugin/metrics.go | 1 - plugin/rate_limit.go | 0 plugin/zap_logger.go | 3 - {plugin => plugins}/authentication.go | 10 +- plugins/etcd.go | 149 ++++++++++++++++++++++++++ plugins/metrics.go | 1 + plugins/rate_limit.go | 1 + plugins/zap_logger.go | 1 + registry.go | 25 +++++ server/server.go | 53 ++++++++- server/server_options.go | 35 ++++++ 15 files changed, 335 insertions(+), 16 deletions(-) create mode 100644 examples/examples.go delete mode 100644 plugin/metrics.go delete mode 100644 plugin/rate_limit.go delete mode 100644 plugin/zap_logger.go rename {plugin => plugins}/authentication.go (98%) create mode 100644 plugins/etcd.go create mode 100644 plugins/metrics.go create mode 100644 plugins/rate_limit.go create mode 100644 plugins/zap_logger.go create mode 100644 registry.go create mode 100644 server/server_options.go diff --git a/client/client.go b/client/client.go index da13c8e..0c86e4b 100644 --- a/client/client.go +++ b/client/client.go @@ -1 +1,50 @@ package client + +import ( + "sync" + + "google.golang.org/grpc" +) + +//Client client +type Client struct { + sync.RWMutex + connPool map[string]*grpc.ClientConn + dialOpts []grpc.DialOption +} + +//NewClient create a new client +func NewClient(opts []grpc.DialOption) *Client { + return &Client{ + connPool: make(map[string]*grpc.ClientConn), + dialOpts: opts, + } +} + +//GetConn get grpc conn with service name +func (cli *Client) GetConn(serviceName string) (*grpc.ClientConn, error) { + cli.RLock() + if conn, ok := cli.connPool[serviceName]; ok { + cli.RUnlock() + return conn, nil + } + cli.RUnlock() + + cli.Lock() + defer cli.Unlock() + + conn, err := grpc.Dial(serviceName, cli.dialOpts...) + if err != nil { + return nil, err + } + cli.connPool[serviceName] = conn + return conn, nil +} + +//Close close specific service's client +func (cli *Client) Close(serviceName string) (err error) { + if conn, ok := cli.connPool[serviceName]; ok { + return conn.Close() + } + return nil +} diff --git a/config/config.go b/config/config.go index eb64e5b..84ff0db 100644 --- a/config/config.go +++ b/config/config.go @@ -18,9 +18,11 @@ type ServConfiguration struct { //ServiceConfig configures the etcd cluster. type ServiceConfig struct { - ServiceName string `yaml:"service_name"` - ListenAddress string `yaml:"listene_address"` - AdvertisedAddress string `yaml:"advertised_address"` + ServiceName string `yaml:"service_name"` + ListenAddress string `yaml:"listene_address"` + AdvertisedAddress string `yaml:"advertised_address"` + RegistryRefreshTime int `yaml:"registry_refresh_time"` + RegistryTTL int `yaml:"registry_ttl"` } //TokenConfig config of token, default ttl:1 day, default token length 32 bytes. diff --git a/config/options.go b/config/options.go index d912156..b4eed98 100644 --- a/config/options.go +++ b/config/options.go @@ -1 +1,15 @@ package config + +//ServOptions contorl the behavior of server +type ServOptions struct { +} + +//CliOptions contorl the behavior of client +type CliOptions struct { +} + +//ServOption a function sets options on ServOptions +type ServOption func(c *ServOptions) + +//CliOption a function sets options on CliOptions +type CliOption func(c *CliOptions) diff --git a/examples/examples.go b/examples/examples.go new file mode 100644 index 0000000..c7842af --- /dev/null +++ b/examples/examples.go @@ -0,0 +1 @@ +package examples diff --git a/plugin/metrics.go b/plugin/metrics.go deleted file mode 100644 index b0736c3..0000000 --- a/plugin/metrics.go +++ /dev/null @@ -1 +0,0 @@ -package plugin diff --git a/plugin/rate_limit.go b/plugin/rate_limit.go deleted file mode 100644 index e69de29..0000000 diff --git a/plugin/zap_logger.go b/plugin/zap_logger.go deleted file mode 100644 index 7be2f1a..0000000 --- a/plugin/zap_logger.go +++ /dev/null @@ -1,3 +0,0 @@ -package plugin - - diff --git a/plugin/authentication.go b/plugins/authentication.go similarity index 98% rename from plugin/authentication.go rename to plugins/authentication.go index 58c641b..e78594b 100644 --- a/plugin/authentication.go +++ b/plugins/authentication.go @@ -1,7 +1,4 @@ -package plugin - - -package authorization +package plugins import ( "fmt" @@ -15,11 +12,9 @@ import ( "time" - "dana-tech.com/wbw/logs" ) - type BasicToken struct { mutex sync.RWMutex defaultToken string @@ -141,7 +136,6 @@ func InitToken(defaultToken string) { go BscToken.startTask(timeleft) } - func getTokenFile() string { dir, err := filepath.Abs(filepath.Dir(os.Args[0])) if err != nil { @@ -151,4 +145,4 @@ func getTokenFile() string { path := currentDir + "/" + tokenFileName return path -} \ No newline at end of file +} diff --git a/plugins/etcd.go b/plugins/etcd.go new file mode 100644 index 0000000..5bf89f8 --- /dev/null +++ b/plugins/etcd.go @@ -0,0 +1,149 @@ +package plugins + +import ( + "context" + "encoding/json" + "time" + + wrapper "github.com/g4zhuj/grpc-wrapper" + + etcd "github.com/coreos/etcd/clientv3" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/naming" + "google.golang.org/grpc/status" +) + +const ( + resolverTimeOut = 10 * time.Second +) + +type etcdRegistry struct { + cancal context.CancelFunc + cli *etcd.Client +} + +type etcdWatcher struct { + cli *etcd.Client + target string + cancel context.CancelFunc + ctx context.Context + watchChan etcd.WatchChan +} + +//NewEtcdResolver create a resolver for grpc +func NewEtcdResolver(cli *etcd.Client) naming.Resolver { + return &etcdRegistry{ + cli: cli, + } +} + +//NewEtcdRegisty create a reistry for registering server addr +func NewEtcdRegisty(cli *etcd.Client) wrapper.Registry { + return &etcdRegistry{ + cli: cli, + } +} + +func (er *etcdRegistry) Resolve(target string) (naming.Watcher, error) { + ctx, cancel := context.WithTimeout(context.TODO(), resolverTimeOut) + w := &etcdWatcher{ + cli: er.cli, + target: target + "/", + ctx: ctx, + cancel: cancel, + } + return w, nil +} + +func (er *etcdRegistry) Register(ctx context.Context, target string, update naming.Update, opts ...wrapper.RegistryOptions) (err error) { + var upBytes []byte + if upBytes, err = json.Marshal(update); err != nil { + return status.Error(codes.InvalidArgument, err.Error()) + } + + ctx, cancel := context.WithTimeout(context.TODO(), resolverTimeOut) + er.cancal = cancel + lsCli := etcd.NewLease(er.cli) + var rgOpt wrapper.RegistryOption + for _, opt := range opts { + opt(&rgOpt) + } + + switch update.Op { + case naming.Add: + lsRsp, err := lsCli.Grant(ctx, int64(rgOpt.TTL/time.Second)) + etcdOpts := []etcd.OpOption{etcd.WithLease(lsRsp.ID)} + _, err = er.cli.KV.Put(ctx, target+"/"+update.Addr, string(upBytes), etcdOpts...) + lsRspChan, err := lsCli.KeepAlive(ctx, lsRsp.ID) + go func() { + for { + if _, ok := <-lsRspChan; !ok { + break + } + } + }() + case naming.Delete: + _, err = er.cli.Delete(ctx, target+"/"+update.Addr) + default: + return status.Error(codes.InvalidArgument, "unsupported op") + } + return nil +} + +func (er *etcdRegistry) Close() { + er.cancal() + er.cli.Close() +} + +func (ew *etcdWatcher) Next() ([]*naming.Update, error) { + var updates []*naming.Update + if ew.watchChan == nil { + //create new chan + resp, err := ew.cli.Get(ew.ctx, ew.target, etcd.WithPrefix(), etcd.WithSerializable()) + if err != nil { + return nil, err + } + for _, kv := range resp.Kvs { + var upt naming.Update + if err := json.Unmarshal(kv.Value, &upt); err != nil { + continue + } + updates = append(updates, &upt) + } + opts := []etcd.OpOption{etcd.WithRev(resp.Header.Revision + 1), etcd.WithPrefix(), etcd.WithPrevKV()} + ew.watchChan = ew.cli.Watch(ew.ctx, ew.target, opts...) + return updates, nil + } + + wrsp, ok := <-ew.watchChan + if !ok { + err := status.Error(codes.Unavailable, "etcd watch closed") + return nil, err + } + if wrsp.Err() != nil { + return nil, wrsp.Err() + } + for _, e := range wrsp.Events { + var upt naming.Update + var err error + switch e.Type { + case etcd.EventTypePut: + err = json.Unmarshal(e.Kv.Value, &upt) + upt.Op = naming.Add + case etcd.EventTypeDelete: + err = json.Unmarshal(e.PrevKv.Value, &upt) + upt.Op = naming.Delete + } + + if err != nil { + continue + } + updates = append(updates, &upt) + } + return updates, nil +} + +func (ew *etcdWatcher) Close() { + ew.cancel() + ew.cli.Close() +} diff --git a/plugins/metrics.go b/plugins/metrics.go new file mode 100644 index 0000000..d5c343e --- /dev/null +++ b/plugins/metrics.go @@ -0,0 +1 @@ +package plugins diff --git a/plugins/rate_limit.go b/plugins/rate_limit.go new file mode 100644 index 0000000..0b09ec9 --- /dev/null +++ b/plugins/rate_limit.go @@ -0,0 +1 @@ +package plugins \ No newline at end of file diff --git a/plugins/zap_logger.go b/plugins/zap_logger.go new file mode 100644 index 0000000..d5c343e --- /dev/null +++ b/plugins/zap_logger.go @@ -0,0 +1 @@ +package plugins diff --git a/registry.go b/registry.go new file mode 100644 index 0000000..ecfb0cc --- /dev/null +++ b/registry.go @@ -0,0 +1,25 @@ +package wrapper + +import ( + "context" + "time" + + "google.golang.org/grpc/naming" +) + +type RegistryOption struct { + TTL time.Duration +} +type RegistryOptions func(o *RegistryOption) + +func WithTTL(ttl time.Duration) RegistryOptions { + return func(o *RegistryOption) { + o.TTL = ttl + } +} + +//Registry registry +type Registry interface { + Register(ctx context.Context, target string, update naming.Update, opts ...RegistryOptions) (err error) + Close() +} diff --git a/server/server.go b/server/server.go index 9d2af68..fc45825 100644 --- a/server/server.go +++ b/server/server.go @@ -1,5 +1,56 @@ package server +import ( + "context" + "fmt" + "log" + "net" + + "github.com/coreos/etcd/clientv3" + "google.golang.org/grpc" + "google.golang.org/grpc/naming" + "google.golang.org/grpc/reflection" +) + //Server wrapper of grpc server -type Server struct { +type ServerWrapper struct { + grpc.Server + opt ServOption + resolver naming.Resolver +} + +func NewServerWrapper() *ServerWrapper { + + return &ServerWrapper{} +} + +//Start start running server +func (s *ServerWrapper) Start() { + + lis, err := net.Listen("tcp", port) + if err != nil { + log.Fatalf("failed to listen: %v", err) + } + s := grpc.NewServer() + pb.RegisterGreeterServer(s, &server{}) + + cli, err := clientv3.NewFromURL("http://localhost:2379") + if err != nil { + fmt.Printf("nameing err %v\n", err) + return + } + r := &etcdnaming.GRPCResolver{Client: cli} + r.Update(context.TODO(), "my-service", naming.Update{Op: naming.Add, Addr: "127.0.0.1:50052", Metadata: "..."}) + + // Register reflection service on gRPC server. + reflection.Register(s) + if err := s.Serve(lis); err != nil { + log.Fatalf("failed to serve: %v", err) + } +} + +//Stop stop tht server +func (s *ServerWrapper) Stop() { + s.resolver.Resolve + s.Server.Stop() } diff --git a/server/server_options.go b/server/server_options.go new file mode 100644 index 0000000..b00d2bc --- /dev/null +++ b/server/server_options.go @@ -0,0 +1,35 @@ +package server + +import ( + "google.golang.org/grpc" + "google.golang.org/grpc/naming" +) + +//ServOption +type ServOption struct { + serviceName string + binding string + advertisedAddress string + resolver naming.Resolver + grpcOpts []grpc.ServerOption +} + +type ServOptions func(o *ServOption) + + + +func With + +func WithResolver(r naming.Resolver) ServOptions { + return func(o *ServOption) { + o.resolver = r + } +} + +func WithGRPCServOption(opts []grpc.ServerOption) ServOptions { + return func(o *ServOption) { + o.grpcOpts = opts + } +} + +