Skip to content

Commit

Permalink
remove broadcaster methods from gossip- don't use sendAsync anywhere
Browse files Browse the repository at this point in the history
  • Loading branch information
jaffee committed Jun 18, 2018
1 parent 6ef9b0b commit 56ed9bf
Show file tree
Hide file tree
Showing 5 changed files with 4 additions and 53 deletions.
2 changes: 1 addition & 1 deletion cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -910,7 +910,7 @@ func (c *Cluster) open() error {
Event: uint32(NodeJoin),
Node: EncodeNode(c.Node),
}
if err := c.Broadcaster.SendAsync(msg); err != nil {
if err := c.Broadcaster.SendSync(msg); err != nil {
return fmt.Errorf("sending restart NodeJoin: %v", err)
}

Expand Down
46 changes: 0 additions & 46 deletions gossip/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ import (
"sync"
"time"

"golang.org/x/sync/errgroup"

"github.com/gogo/protobuf/proto"
"github.com/hashicorp/memberlist"
"github.com/pilosa/pilosa"
Expand All @@ -36,7 +34,6 @@ import (

// Ensure GossipMemberSet implements interfaces.
var _ pilosa.BroadcastReceiver = &GossipMemberSet{}
var _ pilosa.Gossiper = &GossipMemberSet{}
var _ memberlist.Delegate = &GossipMemberSet{}

// GossipMemberSet represents a gossip implementation of MemberSet using memberlist.
Expand Down Expand Up @@ -240,49 +237,6 @@ func NewGossipMemberSet(name string, host string, cfg Config, ger *GossipEventRe
return g, nil
}

// SendSync implementation of the Broadcaster interface.
func (g *GossipMemberSet) SendSync(pb proto.Message) error {
msg, err := pilosa.MarshalMessage(pb)
if err != nil {
return fmt.Errorf("marshal message: %s", err)
}

mlist := g.memberlist

// Direct sends the message directly to every node.
// An error from any node raises an error on the entire operation.
//
// Gossip uses the gossip protocol to eventually deliver the message
// to every node.
var eg errgroup.Group
for _, n := range mlist.Members() {
// Don't send the message to the local node.
if n == mlist.LocalNode() {
continue
}
node := n
eg.Go(func() error {
return mlist.SendToTCP(node, msg)
})
}
return eg.Wait()
}

// SendAsync implementation of the Gossiper interface.
func (g *GossipMemberSet) SendAsync(pb proto.Message) error {
msg, err := pilosa.MarshalMessage(pb)
if err != nil {
return fmt.Errorf("marshal message: %s", err)
}

b := &broadcast{
msg: msg,
notify: nil,
}
g.broadcasts.QueueBroadcast(b)
return nil
}

// NodeMeta implementation of the memberlist.Delegate interface.
func (g *GossipMemberSet) NodeMeta(limit int) []byte {
buf, err := proto.Marshal(pilosa.EncodeNode(g.node))
Expand Down
3 changes: 1 addition & 2 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ type Server struct {
handler Handler
Broadcaster Broadcaster
BroadcastReceiver BroadcastReceiver
Gossiper Gossiper
systemInfo SystemInfo
gcNotifier GCNotifier
NewAttrStore func(string) AttrStore
Expand Down Expand Up @@ -547,7 +546,7 @@ func (s *Server) SendSync(pb proto.Message) error {

// SendAsync represents an implementation of Broadcaster.
func (s *Server) SendAsync(pb proto.Message) error {
return s.Gossiper.SendAsync(pb)
return ErrNotImplemented
}

// SendTo represents an implementation of Broadcaster.
Expand Down
2 changes: 0 additions & 2 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,6 @@ func (m *Command) SetupNetworking() error {
m.Server.Broadcaster = pilosa.NopBroadcaster
m.Server.Cluster.MemberSet = pilosa.NewStaticMemberSet(m.Server.Cluster.Nodes)
m.Server.BroadcastReceiver = pilosa.NopBroadcastReceiver
m.Server.Gossiper = pilosa.NopGossiper
return nil
}

Expand Down Expand Up @@ -313,7 +312,6 @@ func (m *Command) SetupNetworking() error {
m.Server.Cluster.MemberSet = gossipMemberSet
m.Server.Broadcaster = m.Server
m.Server.BroadcastReceiver = gossipMemberSet
m.Server.Gossiper = gossipMemberSet
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions view.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,13 +237,13 @@ func (v *View) createFragmentIfNotExists(slice uint64) (*Fragment, error) {
v.maxSlice = slice

// Send the create slice message to all nodes.
err := v.broadcaster.SendAsync(
err := v.broadcaster.SendSync(
&internal.CreateSliceMessage{
Index: v.index,
Slice: slice,
})
if err != nil {
return nil, errors.Wrap(err, "sending message")
return nil, errors.Wrap(err, "sending createslice message")
}
}

Expand Down

0 comments on commit 56ed9bf

Please sign in to comment.