Skip to content

Commit

Permalink
Enable discovery gossiping
Browse files Browse the repository at this point in the history
  • Loading branch information
calmh committed May 11, 2014
1 parent b783169 commit 1d602b9
Show file tree
Hide file tree
Showing 5 changed files with 203 additions and 59 deletions.
46 changes: 35 additions & 11 deletions discover/PROTOCOL.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,28 @@ The Announcement packet has the following structure:
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Magic Number (0x029E4C77) |
| Magic (0x029E4C77) |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Length of Node ID |
/ /
\ Node Structure \
/ /
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Number of Extra Nodes |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
/ /
\ Node ID (variable length) \
\ Zero or more Node Structures \
/ /
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+

Node Structure:

0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Length of ID |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
/ /
\ ID (variable length) \
/ /
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Number of Addresses |
Expand All @@ -62,29 +78,37 @@ The Announcement packet has the following structure:
\ IP (variable length) \
/ /
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Port Number | 0x0000 |
| Port | 0x0000 |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+

This is the XDR encoding of:

struct Announcement {
unsigned int MagicNumber;
string NodeID<>;
unsigned int Magic;
Node This;
Node Extra<>;
}

struct Node {
string ID<>;
Address Addresses<>;
}

struct Address {
opaque IP<>;
unsigned short PortNumber;
unsigned short Port;
}

NodeID is padded to a multiple of 32 bits and all fields are in sent in
network (big endian) byte order. In the Address structure, the IP field
can be of three differnt kinds;
The first Node structure contains information about the sending node.
The following zero or more Extra nodes contain information about other
nodes known to the sending node.

In the Address structure, the IP field can be of three differnt kinds;

- A zero length indicates that the IP address should be taken from the
source address of the announcement packet, be it IPv4 or IPv6. The
source address must be a valid unicast address.
source address must be a valid unicast address. This is only valid
in the first node structure, not in the list of extras.

- A four byte length indicates that the address is an IPv4 unicast
address.
Expand Down
132 changes: 97 additions & 35 deletions discover/discover.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/hex"
"errors"
"fmt"
"io"
"log"
"net"
"sync"
Expand Down Expand Up @@ -108,18 +109,34 @@ func (d *Discoverer) announcementPkt() []byte {
}
}
var pkt = AnnounceV2{
Magic: AnnouncementMagicV2,
NodeID: d.myID,
Addresses: addrs,
Magic: AnnouncementMagicV2,
This: Node{d.myID, addrs},
}
return pkt.MarshalXDR()
}

func (d *Discoverer) sendLocalAnnouncements() {
var buf = d.announcementPkt()
var addrs = resolveAddrs(d.listenAddrs)

var pkt = AnnounceV2{
Magic: AnnouncementMagicV2,
This: Node{d.myID, addrs},
}

for {
d.beacon.Send(buf)
pkt.Extra = nil
d.registryLock.RLock()
for node, addrs := range d.registry {
if len(pkt.Extra) == 16 {
break
}

anode := Node{node, resolveAddrs(addrs)}
pkt.Extra = append(pkt.Extra, anode)
}
d.registryLock.RUnlock()

d.beacon.Send(pkt.MarshalXDR())

select {
case <-d.localBcastTick:
Expand All @@ -144,9 +161,8 @@ func (d *Discoverer) sendExternalAnnouncements() {
var buf []byte
if d.extPort != 0 {
var pkt = AnnounceV2{
Magic: AnnouncementMagicV2,
NodeID: d.myID,
Addresses: []Address{{Port: d.extPort}},
Magic: AnnouncementMagicV2,
This: Node{d.myID, []Address{{Port: d.extPort}}},
}
buf = pkt.MarshalXDR()
} else {
Expand Down Expand Up @@ -203,43 +219,63 @@ func (d *Discoverer) recvAnnouncements() {

var pkt AnnounceV2
err := pkt.UnmarshalXDR(buf)
if err != nil {
if err != nil && err != io.EOF {
continue
}

if debug {
dlog.Printf("parsed announcement: %#v", pkt)
}

if pkt.NodeID != d.myID {
var addrs []string
for _, a := range pkt.Addresses {
var nodeAddr string
if len(a.IP) > 0 {
nodeAddr = fmt.Sprintf("%s:%d", net.IP(a.IP), a.Port)
} else {
ua := addr.(*net.UDPAddr)
ua.Port = int(a.Port)
nodeAddr = ua.String()
}
addrs = append(addrs, nodeAddr)
}
if debug {
dlog.Printf("register: %#v", addrs)
var newNode bool
if pkt.This.ID != d.myID {
n := d.registerNode(addr, pkt.This)
newNode = newNode || n
}
for _, node := range pkt.Extra {
if node.ID != d.myID {
n := d.registerNode(nil, node)
newNode = newNode || n
}
d.registryLock.Lock()
_, seen := d.registry[pkt.NodeID]
if !seen {
select {
case d.forcedBcastTick <- time.Now():
}
}

if newNode {
select {
case d.forcedBcastTick <- time.Now():
}
d.registry[pkt.NodeID] = addrs
d.registryLock.Unlock()
}
}
}

func (d *Discoverer) registerNode(addr net.Addr, node Node) bool {
var addrs []string
for _, a := range node.Addresses {
var nodeAddr string
if len(a.IP) > 0 {
nodeAddr = fmt.Sprintf("%s:%d", net.IP(a.IP), a.Port)
addrs = append(addrs, nodeAddr)
} else if addr != nil {
ua := addr.(*net.UDPAddr)
ua.Port = int(a.Port)
nodeAddr = ua.String()
addrs = append(addrs, nodeAddr)
}
}
if len(addrs) == 0 {
if debug {
dlog.Println("no valid address for", node.ID)
}
}
if debug {
dlog.Printf("register: %s -> %#v", node.ID, addrs)
}
d.registryLock.Lock()
_, seen := d.registry[node.ID]
d.registry[node.ID] = addrs
d.registryLock.Unlock()
return !seen
}

func (d *Discoverer) externalLookup(node string) []string {
extIP, err := net.ResolveUDPAddr("udp", d.extServer)
if err != nil {
Expand Down Expand Up @@ -268,7 +304,7 @@ func (d *Discoverer) externalLookup(node string) []string {
}
buffers.Put(buf)

buf = buffers.Get(256)
buf = buffers.Get(2048)
defer buffers.Put(buf)

n, err := conn.Read(buf)
Expand All @@ -287,7 +323,7 @@ func (d *Discoverer) externalLookup(node string) []string {

var pkt AnnounceV2
err = pkt.UnmarshalXDR(buf[:n])
if err != nil {
if err != nil && err != io.EOF {
log.Println("discover/external/decode:", err)
return nil
}
Expand All @@ -297,9 +333,35 @@ func (d *Discoverer) externalLookup(node string) []string {
}

var addrs []string
for _, a := range pkt.Addresses {
for _, a := range pkt.This.Addresses {
nodeAddr := fmt.Sprintf("%s:%d", net.IP(a.IP), a.Port)
addrs = append(addrs, nodeAddr)
}
return addrs
}

func addrToAddr(addr *net.TCPAddr) Address {
if len(addr.IP) == 0 || addr.IP.IsUnspecified() {
return Address{Port: uint16(addr.Port)}
} else if bs := addr.IP.To4(); bs != nil {
return Address{IP: bs, Port: uint16(addr.Port)}
} else if bs := addr.IP.To16(); bs != nil {
return Address{IP: bs, Port: uint16(addr.Port)}
}
return Address{}
}

func resolveAddrs(addrs []string) []Address {
var raddrs []Address
for _, addrStr := range addrs {
addrRes, err := net.ResolveTCPAddr("tcp", addrStr)
if err != nil {
continue
}
addr := addrToAddr(addrRes)
if len(addr.IP) > 0 {
raddrs = append(raddrs, addr)
}
}
return raddrs
}
9 changes: 7 additions & 2 deletions discover/packets.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,13 @@ type QueryV2 struct {
}

type AnnounceV2 struct {
Magic uint32
NodeID string // max:64
Magic uint32
This Node
Extra []Node // max:16
}

type Node struct {
ID string // max:64
Addresses []Address // max:16
}

Expand Down
62 changes: 55 additions & 7 deletions discover/packets_xdr.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,59 @@ func (o AnnounceV2) MarshalXDR() []byte {

func (o AnnounceV2) encodeXDR(xw *xdr.Writer) (int, error) {
xw.WriteUint32(o.Magic)
if len(o.NodeID) > 64 {
o.This.encodeXDR(xw)
if len(o.Extra) > 16 {
return xw.Tot(), xdr.ErrElementSizeExceeded
}
xw.WriteString(o.NodeID)
xw.WriteUint32(uint32(len(o.Extra)))
for i := range o.Extra {
o.Extra[i].encodeXDR(xw)
}
return xw.Tot(), xw.Error()
}

func (o *AnnounceV2) DecodeXDR(r io.Reader) error {
xr := xdr.NewReader(r)
return o.decodeXDR(xr)
}

func (o *AnnounceV2) UnmarshalXDR(bs []byte) error {
var buf = bytes.NewBuffer(bs)
var xr = xdr.NewReader(buf)
return o.decodeXDR(xr)
}

func (o *AnnounceV2) decodeXDR(xr *xdr.Reader) error {
o.Magic = xr.ReadUint32()
(&o.This).decodeXDR(xr)
_ExtraSize := int(xr.ReadUint32())
if _ExtraSize > 16 {
return xdr.ErrElementSizeExceeded
}
o.Extra = make([]Node, _ExtraSize)
for i := range o.Extra {
(&o.Extra[i]).decodeXDR(xr)
}
return xr.Error()
}

func (o Node) EncodeXDR(w io.Writer) (int, error) {
var xw = xdr.NewWriter(w)
return o.encodeXDR(xw)
}

func (o Node) MarshalXDR() []byte {
var buf bytes.Buffer
var xw = xdr.NewWriter(&buf)
o.encodeXDR(xw)
return buf.Bytes()
}

func (o Node) encodeXDR(xw *xdr.Writer) (int, error) {
if len(o.ID) > 64 {
return xw.Tot(), xdr.ErrElementSizeExceeded
}
xw.WriteString(o.ID)
if len(o.Addresses) > 16 {
return xw.Tot(), xdr.ErrElementSizeExceeded
}
Expand All @@ -73,20 +122,19 @@ func (o AnnounceV2) encodeXDR(xw *xdr.Writer) (int, error) {
return xw.Tot(), xw.Error()
}

func (o *AnnounceV2) DecodeXDR(r io.Reader) error {
func (o *Node) DecodeXDR(r io.Reader) error {
xr := xdr.NewReader(r)
return o.decodeXDR(xr)
}

func (o *AnnounceV2) UnmarshalXDR(bs []byte) error {
func (o *Node) UnmarshalXDR(bs []byte) error {
var buf = bytes.NewBuffer(bs)
var xr = xdr.NewReader(buf)
return o.decodeXDR(xr)
}

func (o *AnnounceV2) decodeXDR(xr *xdr.Reader) error {
o.Magic = xr.ReadUint32()
o.NodeID = xr.ReadStringMax(64)
func (o *Node) decodeXDR(xr *xdr.Reader) error {
o.ID = xr.ReadStringMax(64)
_AddressesSize := int(xr.ReadUint32())
if _AddressesSize > 16 {
return xdr.ErrElementSizeExceeded
Expand Down
Loading

0 comments on commit 1d602b9

Please sign in to comment.