Skip to content

Commit

Permalink
Track connections by local index id instead of vpn ip (slackhq#807)
Browse files Browse the repository at this point in the history
  • Loading branch information
nbrownus authored Feb 13, 2023
1 parent 5bd8712 commit a06977b
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 100 deletions.
139 changes: 66 additions & 73 deletions connection_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,23 @@ import (

"github.com/sirupsen/logrus"
"github.com/slackhq/nebula/header"
"github.com/slackhq/nebula/iputil"
)

// TODO: incount and outcount are intended as a shortcut to locking the mutexes for every single packet
// and something like every 10 packets we could lock, send 10, then unlock for a moment

type connectionManager struct {
hostMap *HostMap
in map[iputil.VpnIp]struct{}
in map[uint32]struct{}
inLock *sync.RWMutex
out map[iputil.VpnIp]struct{}
out map[uint32]struct{}
outLock *sync.RWMutex
TrafficTimer *LockingTimerWheel[iputil.VpnIp]
TrafficTimer *LockingTimerWheel[uint32]
intf *Interface

pendingDeletion map[iputil.VpnIp]int
pendingDeletion map[uint32]int
pendingDeletionLock *sync.RWMutex
pendingDeletionTimer *LockingTimerWheel[iputil.VpnIp]
pendingDeletionTimer *LockingTimerWheel[uint32]

checkInterval int
pendingDeletionInterval int
Expand All @@ -36,15 +35,15 @@ type connectionManager struct {
func newConnectionManager(ctx context.Context, l *logrus.Logger, intf *Interface, checkInterval, pendingDeletionInterval int) *connectionManager {
nc := &connectionManager{
hostMap: intf.hostMap,
in: make(map[iputil.VpnIp]struct{}),
in: make(map[uint32]struct{}),
inLock: &sync.RWMutex{},
out: make(map[iputil.VpnIp]struct{}),
out: make(map[uint32]struct{}),
outLock: &sync.RWMutex{},
TrafficTimer: NewLockingTimerWheel[iputil.VpnIp](time.Millisecond*500, time.Second*60),
TrafficTimer: NewLockingTimerWheel[uint32](time.Millisecond*500, time.Second*60),
intf: intf,
pendingDeletion: make(map[iputil.VpnIp]int),
pendingDeletion: make(map[uint32]int),
pendingDeletionLock: &sync.RWMutex{},
pendingDeletionTimer: NewLockingTimerWheel[iputil.VpnIp](time.Millisecond*500, time.Second*60),
pendingDeletionTimer: NewLockingTimerWheel[uint32](time.Millisecond*500, time.Second*60),
checkInterval: checkInterval,
pendingDeletionInterval: pendingDeletionInterval,
l: l,
Expand All @@ -53,77 +52,77 @@ func newConnectionManager(ctx context.Context, l *logrus.Logger, intf *Interface
return nc
}

func (n *connectionManager) In(ip iputil.VpnIp) {
func (n *connectionManager) In(localIndex uint32) {
n.inLock.RLock()
// If this already exists, return
if _, ok := n.in[ip]; ok {
if _, ok := n.in[localIndex]; ok {
n.inLock.RUnlock()
return
}
n.inLock.RUnlock()
n.inLock.Lock()
n.in[ip] = struct{}{}
n.in[localIndex] = struct{}{}
n.inLock.Unlock()
}

func (n *connectionManager) Out(ip iputil.VpnIp) {
func (n *connectionManager) Out(localIndex uint32) {
n.outLock.RLock()
// If this already exists, return
if _, ok := n.out[ip]; ok {
if _, ok := n.out[localIndex]; ok {
n.outLock.RUnlock()
return
}
n.outLock.RUnlock()
n.outLock.Lock()
// double check since we dropped the lock temporarily
if _, ok := n.out[ip]; ok {
if _, ok := n.out[localIndex]; ok {
n.outLock.Unlock()
return
}
n.out[ip] = struct{}{}
n.AddTrafficWatch(ip, n.checkInterval)
n.out[localIndex] = struct{}{}
n.AddTrafficWatch(localIndex, n.checkInterval)
n.outLock.Unlock()
}

func (n *connectionManager) CheckIn(vpnIp iputil.VpnIp) bool {
func (n *connectionManager) CheckIn(localIndex uint32) bool {
n.inLock.RLock()
if _, ok := n.in[vpnIp]; ok {
if _, ok := n.in[localIndex]; ok {
n.inLock.RUnlock()
return true
}
n.inLock.RUnlock()
return false
}

func (n *connectionManager) ClearIP(ip iputil.VpnIp) {
func (n *connectionManager) ClearLocalIndex(localIndex uint32) {
n.inLock.Lock()
n.outLock.Lock()
delete(n.in, ip)
delete(n.out, ip)
delete(n.in, localIndex)
delete(n.out, localIndex)
n.inLock.Unlock()
n.outLock.Unlock()
}

func (n *connectionManager) ClearPendingDeletion(ip iputil.VpnIp) {
func (n *connectionManager) ClearPendingDeletion(localIndex uint32) {
n.pendingDeletionLock.Lock()
delete(n.pendingDeletion, ip)
delete(n.pendingDeletion, localIndex)
n.pendingDeletionLock.Unlock()
}

func (n *connectionManager) AddPendingDeletion(ip iputil.VpnIp) {
func (n *connectionManager) AddPendingDeletion(localIndex uint32) {
n.pendingDeletionLock.Lock()
if _, ok := n.pendingDeletion[ip]; ok {
n.pendingDeletion[ip] += 1
if _, ok := n.pendingDeletion[localIndex]; ok {
n.pendingDeletion[localIndex] += 1
} else {
n.pendingDeletion[ip] = 0
n.pendingDeletion[localIndex] = 0
}
n.pendingDeletionTimer.Add(ip, time.Second*time.Duration(n.pendingDeletionInterval))
n.pendingDeletionTimer.Add(localIndex, time.Second*time.Duration(n.pendingDeletionInterval))
n.pendingDeletionLock.Unlock()
}

func (n *connectionManager) checkPendingDeletion(ip iputil.VpnIp) bool {
func (n *connectionManager) checkPendingDeletion(localIndex uint32) bool {
n.pendingDeletionLock.RLock()
if _, ok := n.pendingDeletion[ip]; ok {
if _, ok := n.pendingDeletion[localIndex]; ok {

n.pendingDeletionLock.RUnlock()
return true
Expand All @@ -132,8 +131,8 @@ func (n *connectionManager) checkPendingDeletion(ip iputil.VpnIp) bool {
return false
}

func (n *connectionManager) AddTrafficWatch(vpnIp iputil.VpnIp, seconds int) {
n.TrafficTimer.Add(vpnIp, time.Second*time.Duration(seconds))
func (n *connectionManager) AddTrafficWatch(localIndex uint32, seconds int) {
n.TrafficTimer.Add(localIndex, time.Second*time.Duration(seconds))
}

func (n *connectionManager) Start(ctx context.Context) {
Expand Down Expand Up @@ -162,36 +161,36 @@ func (n *connectionManager) Run(ctx context.Context) {
func (n *connectionManager) HandleMonitorTick(now time.Time, p, nb, out []byte) {
n.TrafficTimer.Advance(now)
for {
vpnIp, has := n.TrafficTimer.Purge()
localIndex, has := n.TrafficTimer.Purge()
if !has {
break
}

// Check for traffic coming back in from this host.
traf := n.CheckIn(vpnIp)
traf := n.CheckIn(localIndex)

hostinfo, err := n.hostMap.QueryVpnIp(vpnIp)
hostinfo, err := n.hostMap.QueryIndex(localIndex)
if err != nil {
n.l.Debugf("Not found in hostmap: %s", vpnIp)
n.ClearIP(vpnIp)
n.ClearPendingDeletion(vpnIp)
n.l.WithField("localIndex", localIndex).Debugf("Not found in hostmap")
n.ClearLocalIndex(localIndex)
n.ClearPendingDeletion(localIndex)
continue
}

if n.handleInvalidCertificate(now, vpnIp, hostinfo) {
if n.handleInvalidCertificate(now, hostinfo) {
continue
}

// If we saw an incoming packets from this ip and peer's certificate is not
// expired, just ignore.
if traf {
if n.l.Level >= logrus.DebugLevel {
n.l.WithField("vpnIp", vpnIp).
hostinfo.logger(n.l).
WithField("tunnelCheck", m{"state": "alive", "method": "passive"}).
Debug("Tunnel status")
}
n.ClearIP(vpnIp)
n.ClearPendingDeletion(vpnIp)
n.ClearLocalIndex(localIndex)
n.ClearPendingDeletion(localIndex)
continue
}

Expand All @@ -201,76 +200,71 @@ func (n *connectionManager) HandleMonitorTick(now time.Time, p, nb, out []byte)

if hostinfo != nil && hostinfo.ConnectionState != nil {
// Send a test packet to trigger an authenticated tunnel test, this should suss out any lingering tunnel issues
n.intf.SendMessageToVpnIp(header.Test, header.TestRequest, vpnIp, p, nb, out)
n.intf.sendMessageToVpnIp(header.Test, header.TestRequest, hostinfo, p, nb, out)

} else {
hostinfo.logger(n.l).Debugf("Hostinfo sadness: %s", vpnIp)
hostinfo.logger(n.l).Debugf("Hostinfo sadness")
}
n.AddPendingDeletion(vpnIp)
n.AddPendingDeletion(localIndex)
}

}

func (n *connectionManager) HandleDeletionTick(now time.Time) {
n.pendingDeletionTimer.Advance(now)
for {
vpnIp, has := n.pendingDeletionTimer.Purge()
localIndex, has := n.pendingDeletionTimer.Purge()
if !has {
break
}

hostinfo, err := n.hostMap.QueryVpnIp(vpnIp)
hostinfo, err := n.hostMap.QueryIndex(localIndex)
if err != nil {
n.l.Debugf("Not found in hostmap: %s", vpnIp)
n.ClearIP(vpnIp)
n.ClearPendingDeletion(vpnIp)
n.l.WithField("localIndex", localIndex).Debugf("Not found in hostmap")
n.ClearLocalIndex(localIndex)
n.ClearPendingDeletion(localIndex)
continue
}

if n.handleInvalidCertificate(now, vpnIp, hostinfo) {
if n.handleInvalidCertificate(now, hostinfo) {
continue
}

// If we saw an incoming packets from this ip and peer's certificate is not
// expired, just ignore.
traf := n.CheckIn(vpnIp)
traf := n.CheckIn(localIndex)
if traf {
n.l.WithField("vpnIp", vpnIp).
hostinfo.logger(n.l).
WithField("tunnelCheck", m{"state": "alive", "method": "active"}).
Debug("Tunnel status")

n.ClearIP(vpnIp)
n.ClearPendingDeletion(vpnIp)
n.ClearLocalIndex(localIndex)
n.ClearPendingDeletion(localIndex)
continue
}

// If it comes around on deletion wheel and hasn't resolved itself, delete
if n.checkPendingDeletion(vpnIp) {
if n.checkPendingDeletion(localIndex) {
cn := ""
if hostinfo.ConnectionState != nil && hostinfo.ConnectionState.peerCert != nil {
cn = hostinfo.ConnectionState.peerCert.Details.Name
}

hostinfo.logger(n.l).
WithField("tunnelCheck", m{"state": "dead", "method": "active"}).
WithField("certName", cn).
Info("Tunnel status")

n.ClearIP(vpnIp)
n.ClearPendingDeletion(vpnIp)
// TODO: This is only here to let tests work. Should do proper mocking
if n.intf.lightHouse != nil {
n.intf.lightHouse.DeleteVpnIp(vpnIp)
}
n.hostMap.DeleteHostInfo(hostinfo)
} else {
n.ClearIP(vpnIp)
n.ClearPendingDeletion(vpnIp)
}

n.ClearLocalIndex(localIndex)
n.ClearPendingDeletion(localIndex)
}
}

// handleInvalidCertificates will destroy a tunnel if pki.disconnect_invalid is true and the certificate is no longer valid
func (n *connectionManager) handleInvalidCertificate(now time.Time, vpnIp iputil.VpnIp, hostinfo *HostInfo) bool {
func (n *connectionManager) handleInvalidCertificate(now time.Time, hostinfo *HostInfo) bool {
if !n.intf.disconnectInvalid {
return false
}
Expand All @@ -286,16 +280,15 @@ func (n *connectionManager) handleInvalidCertificate(now time.Time, vpnIp iputil
}

fingerprint, _ := remoteCert.Sha256Sum()
n.l.WithField("vpnIp", vpnIp).WithError(err).
WithField("certName", remoteCert.Details.Name).
hostinfo.logger(n.l).WithError(err).
WithField("fingerprint", fingerprint).
Info("Remote certificate is no longer valid, tearing down the tunnel")

// Inform the remote and close the tunnel locally
n.intf.sendCloseTunnel(hostinfo)
n.intf.closeTunnel(hostinfo)

n.ClearIP(vpnIp)
n.ClearPendingDeletion(vpnIp)
n.ClearLocalIndex(hostinfo.localIndexId)
n.ClearPendingDeletion(hostinfo.localIndexId)
return true
}
Loading

0 comments on commit a06977b

Please sign in to comment.