Skip to content

Commit

Permalink
gossip: added support to gossip NodeHost details
Browse files Browse the repository at this point in the history
  • Loading branch information
lni committed May 17, 2022
1 parent c3cd205 commit 7b0ed3d
Show file tree
Hide file tree
Showing 11 changed files with 417 additions and 67 deletions.
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,9 @@ test-utils:
.PHONY: test-tan
test-tan:
$(GOTEST) $(PKGNAME)/internal/tan
.PHONY: test-registry
test-registry:
$(GOTEST) $(PKGNAME)/internal/registry
.PHONY: test-cov
test-cov:
$(GOTEST) -coverprofile=coverage.txt -covermode=atomic
Expand Down
10 changes: 6 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,24 @@ module github.com/lni/dragonboat/v3

require (
github.com/VictoriaMetrics/metrics v1.18.1
github.com/cespare/xxhash/v2 v2.1.1
github.com/cockroachdb/errors v1.9.0
github.com/cockroachdb/pebble v0.0.0-20220407171941-2120d145e292
github.com/golang/snappy v0.0.4
github.com/hashicorp/memberlist v0.3.1
github.com/juju/ratelimit v1.0.2-0.20191002062651-f60b32039441
github.com/kr/pretty v0.3.0
github.com/lni/goutils v1.3.1-0.20220404072553-ddb2075d2587
github.com/lni/vfs v0.2.1-0.20220408085249-8be85be1c3c1
github.com/pierrec/lz4/v4 v4.1.14
github.com/stretchr/testify v1.7.0
golang.org/x/exp v0.0.0-20200513190911-00229845015e
golang.org/x/sys v0.0.0-20220209214540-3681064d5158
)

require (
github.com/DataDog/zstd v1.4.5 // indirect
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da // indirect
github.com/cespare/xxhash/v2 v2.1.1 // indirect
github.com/cockroachdb/logtags v0.0.0-20211118104740-dabe8e521a4f // indirect
github.com/cockroachdb/redact v1.1.3 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
Expand All @@ -29,19 +33,17 @@ require (
github.com/hashicorp/go-sockaddr v1.0.0 // indirect
github.com/hashicorp/golang-lru v0.5.0 // indirect
github.com/klauspost/compress v1.11.7 // indirect
github.com/kr/pretty v0.3.0 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/miekg/dns v1.1.26 // indirect
github.com/niubaoshu/gotiny v0.0.3 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rogpeppe/go-internal v1.8.1 // indirect
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 // indirect
github.com/valyala/fastrand v1.1.0 // indirect
github.com/valyala/histogram v1.2.0 // indirect
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 // indirect
golang.org/x/exp v0.0.0-20200513190911-00229845015e // indirect
golang.org/x/net v0.0.0-20211008194852-3b03d305991f // indirect
golang.org/x/sys v0.0.0-20220209214540-3681064d5158 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
)

Expand Down
5 changes: 5 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,9 @@ github.com/nats-io/nats.go v1.9.1/go.mod h1:ZjDU1L/7fJ09jvUSRVBR2e7+RnLiiIQyqyzE
github.com/nats-io/nkeys v0.0.2/go.mod h1:dab7URMsZm6Z/jp9Z5UGa87Uutgc2mVpXLC4B7TDb/4=
github.com/nats-io/nkeys v0.1.0/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/niubaoshu/gotiny v0.0.3 h1:aUt+fvr8nQmitT6XqwuBH8JUQz7QyS4A+KyCNXSesGc=
github.com/niubaoshu/gotiny v0.0.3/go.mod h1:QdEauSzqdF5tbLIVtGYO6sqOhUKVPSZGd5x7xK5oeS4=
github.com/niubaoshu/goutils v0.0.0-20180828035119-e8e576f66c2b/go.mod h1:aDwH4aWrEBXw/uvtSvwNwxdtnsx++aP8c8ad4AmlRCg=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.10.3/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
Expand All @@ -233,6 +236,8 @@ github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1y
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c h1:Lgl0gzECD8GnQ5QCWA8o6BtfL6mDH5rQgM4/fX3avOs=
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
github.com/pierrec/lz4/v4 v4.1.14 h1:+fL8AQEZtz/ijeNnpduH0bROTu0O3NZAlPjQxGn8LwE=
github.com/pierrec/lz4/v4 v4.1.14/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4=
github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8=
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
Expand Down
95 changes: 73 additions & 22 deletions internal/registry/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,62 +32,69 @@ import (
var firstError = utils.FirstError
var plog = logger.GetLogger("registry")

// NodeHostIDRegistry is a node registry backed by gossip. It is capable of
type getClusterInfo func() []ClusterInfo

// GossipRegistry is a node registry backed by gossip. It is capable of
// supporting NodeHosts with dynamic RaftAddress values.
type NodeHostIDRegistry struct {
type GossipRegistry struct {
nodes *Registry
gossip *gossipManager
}

// NewNodeHostIDRegistry creates a new NodeHostIDRegistry instance.
func NewNodeHostIDRegistry(nhid string,
// NewGossipRegistry creates a new GossipRegistry instance.
func NewGossipRegistry(nhid string, f getClusterInfo,
nhConfig config.NodeHostConfig, streamConnections uint64,
v config.TargetValidator) (INodeRegistry, error) {
gossip, err := newGossipManager(nhid, nhConfig)
v config.TargetValidator) (*GossipRegistry, error) {
gossip, err := newGossipManager(nhid, f, nhConfig)
if err != nil {
return nil, err
}
r := &NodeHostIDRegistry{
r := &GossipRegistry{
nodes: NewNodeRegistry(streamConnections, v),
gossip: gossip,
}
return r, nil
}

// Close closes the NodeHostIDRegistry instance.
func (n *NodeHostIDRegistry) Close() error {
// GetNodeHostRegistry returns the NodeHostRegistry backed by gossip.
func (n *GossipRegistry) GetNodeHostRegistry() *NodeHostRegistry {
return n.gossip.GetNodeHostRegistry()
}

// Close closes the GossipRegistry instance.
func (n *GossipRegistry) Close() error {
return n.gossip.Close()
}

// AdvertiseAddress returns the advertise address of the gossip service.
func (n *NodeHostIDRegistry) AdvertiseAddress() string {
func (n *GossipRegistry) AdvertiseAddress() string {
return n.gossip.advertiseAddress()
}

// NumMembers returns the number of live nodes known by the gossip service.
func (n *NodeHostIDRegistry) NumMembers() int {
func (n *GossipRegistry) NumMembers() int {
return n.gossip.numMembers()
}

// Add adds a new node with its known NodeHostID to the registry.
func (n *NodeHostIDRegistry) Add(clusterID uint64,
func (n *GossipRegistry) Add(clusterID uint64,
nodeID uint64, target string) {
n.nodes.Add(clusterID, nodeID, target)
}

// Remove removes the specified node from the registry.
func (n *NodeHostIDRegistry) Remove(clusterID uint64, nodeID uint64) {
func (n *GossipRegistry) Remove(clusterID uint64, nodeID uint64) {
n.nodes.Remove(clusterID, nodeID)
}

// RemoveCluster removes the specified node from the registry.
func (n *NodeHostIDRegistry) RemoveCluster(clusterID uint64) {
func (n *GossipRegistry) RemoveCluster(clusterID uint64) {
n.nodes.RemoveCluster(clusterID)
}

// Resolve returns the current RaftAddress and connection key of the specified
// node. It returns ErrUnknownTarget when the RaftAddress is unknown.
func (n *NodeHostIDRegistry) Resolve(clusterID uint64,
func (n *GossipRegistry) Resolve(clusterID uint64,
nodeID uint64) (string, string, error) {
target, key, err := n.nodes.Resolve(clusterID, nodeID)
if err != nil {
Expand Down Expand Up @@ -137,16 +144,42 @@ func (d *eventDelegate) start() {
}

type delegate struct {
raftAddress string
raftAddress string
getClusterInfo getClusterInfo
view *view
}

func (d *delegate) NodeMeta(limit int) []byte {
return []byte(d.raftAddress)
}
func (d *delegate) NotifyMsg([]byte) {}
func (d *delegate) GetBroadcasts(overhead, limit int) [][]byte { return nil }
func (d *delegate) LocalState(join bool) []byte { return nil }
func (d *delegate) MergeRemoteState(buf []byte, join bool) {}
func (d *delegate) NotifyMsg(buf []byte) {
d.view.updateFrom(buf)
}

func (d *delegate) GetBroadcasts(overhead, limit int) [][]byte {
if d.getClusterInfo != nil {
d.view.update(d.getClusterInfo())
}
data := d.view.getGossipData(limit - overhead)
if data == nil {
return nil
}

result := make([][]byte, 1)
result[0] = data
return result
}

func (d *delegate) MergeRemoteState(buf []byte, join bool) {
d.view.updateFrom(buf)
}

func (d *delegate) LocalState(join bool) []byte {
if d.getClusterInfo != nil {
d.view.update(d.getClusterInfo())
}
return d.view.getFullSyncData()
}

func parseAddress(addr string) (string, int, error) {
host, sp, err := net.SplitHostPort(addr)
Expand All @@ -165,16 +198,21 @@ type gossipManager struct {
cfg *memberlist.Config
list *memberlist.Memberlist
ed *eventDelegate
view *view
stopper *syncutil.Stopper
}

func newGossipManager(nhid string,
func newGossipManager(nhid string, f getClusterInfo,
nhConfig config.NodeHostConfig) (*gossipManager, error) {
stopper := syncutil.NewStopper()
ed := newEventDelegate(stopper)
cfg := memberlist.DefaultWANConfig()
cfg.Logger = newGossipLogWrapper()
cfg.Name = nhid
cfg.PushPullInterval = 500 * time.Millisecond
cfg.GossipInterval = 250 * time.Millisecond
cfg.GossipNodes = 6
cfg.UDPBufferSize = 32 * 1024
if nhConfig.Expert.TestGossipProbeInterval > 0 {
plog.Infof("gossip probe interval set to %s",
nhConfig.Expert.TestGossipProbeInterval)
Expand All @@ -194,8 +232,14 @@ func newGossipManager(nhid string,
cfg.AdvertiseAddr = aAddr
cfg.AdvertisePort = aPort
}
cfg.Delegate = &delegate{raftAddress: nhConfig.RaftAddress}
view := newView(nhConfig.GetDeploymentID())
cfg.Delegate = &delegate{
raftAddress: nhConfig.RaftAddress,
getClusterInfo: f,
view: view,
}
cfg.Events = ed

list, err := memberlist.Create(cfg)
if err != nil {
plog.Errorf("failed to create memberlist, %v", err)
Expand All @@ -208,6 +252,7 @@ func newGossipManager(nhid string,
cfg: cfg,
list: list,
ed: ed,
view: view,
stopper: stopper,
}
g.join(seed)
Expand Down Expand Up @@ -251,6 +296,12 @@ func (g *gossipManager) Close() error {
return firstError(err, cerr)
}

func (g *gossipManager) GetNodeHostRegistry() *NodeHostRegistry {
return &NodeHostRegistry{
view: g.view,
}
}

func (g *gossipManager) GetRaftAddress(nhid string) (string, bool) {
if g.cfg.Name == nhid {
return g.nhConfig.RaftAddress, true
Expand Down
12 changes: 6 additions & 6 deletions internal/registry/gossip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"github.com/lni/dragonboat/v3/internal/id"
)

func TestNodeHostIDRegistry(t *testing.T) {
func TestGossipRegistry(t *testing.T) {
defer leaktest.AfterTest(t)()
nhid := "nhid-12345"
nhConfig := config.NodeHostConfig{
Expand All @@ -35,7 +35,7 @@ func TestNodeHostIDRegistry(t *testing.T) {
Seed: []string{"127.0.0.1:26002"},
},
}
r, err := NewNodeHostIDRegistry(nhid, nhConfig, 1, id.IsNodeHostID)
r, err := NewGossipRegistry(nhid, nil, nhConfig, 1, id.IsNodeHostID)
if err != nil {
t.Fatalf("failed to create the registry, %v", err)
}
Expand All @@ -44,7 +44,7 @@ func TestNodeHostIDRegistry(t *testing.T) {
t.Fatalf("failed to close registry %v", err)
}
}()
if r.(*NodeHostIDRegistry).NumMembers() != 1 {
if r.NumMembers() != 1 {
t.Errorf("num member result unexpected")
}
r.Add(123, 456, nhid)
Expand Down Expand Up @@ -87,7 +87,7 @@ func TestGossipManagerCanBeCreatedAndStopped(t *testing.T) {
Seed: []string{"127.0.0.1:26002"},
},
}
m, err := newGossipManager(nhid, nhConfig)
m, err := newGossipManager(nhid, nil, nhConfig)
if err != nil {
t.Fatalf("gossip manager failed to start, %v", err)
}
Expand Down Expand Up @@ -137,7 +137,7 @@ func TestGossipManagerCanGossip(t *testing.T) {
Seed: []string{"127.0.0.1:26001"},
},
}
m1, err := newGossipManager(nhid1, nhConfig1)
m1, err := newGossipManager(nhid1, nil, nhConfig1)
if err != nil {
t.Fatalf("gossip manager failed to start, %v", err)
}
Expand All @@ -146,7 +146,7 @@ func TestGossipManagerCanGossip(t *testing.T) {
t.Fatalf("failed to close gossip manager %v", err)
}
}()
m2, err := newGossipManager(nhid2, nhConfig2)
m2, err := newGossipManager(nhid2, nil, nhConfig2)
if err != nil {
t.Fatalf("gossip manager failed to start, %v", err)
}
Expand Down
26 changes: 26 additions & 0 deletions internal/registry/nodehost.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright 2018-2022 Lei Ni ([email protected]) and other contributors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package registry

// NodeHostRegistry is a NodeHost info registry backed by gossip.
type NodeHostRegistry struct {
view *view
}

// NumOfClusters returns the number of clusters known to the current NodeHost
// instance.
func (r *NodeHostRegistry) NumOfClusters() int {
return r.view.nodeHostCount()
}
Loading

0 comments on commit 7b0ed3d

Please sign in to comment.