forked from projectcalico/calico
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathiface_monitor.go
439 lines (395 loc) · 12.6 KB
/
iface_monitor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
// Copyright (c) 2020-2021 Tigera, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package ifacemonitor
import (
"context"
"errors"
"fmt"
"regexp"
"syscall"
"time"
log "github.com/sirupsen/logrus"
"github.com/vishvananda/netlink"
"golang.org/x/sys/unix"
"github.com/projectcalico/calico/felix/environment"
"github.com/projectcalico/calico/libcalico-go/lib/set"
)
type netlinkStub interface {
Subscribe(
linkUpdates chan netlink.LinkUpdate,
routeUpdates chan netlink.RouteUpdate,
) (cancel chan struct{}, err error)
LinkList() ([]netlink.Link, error)
ListLocalRoutes(link netlink.Link, family int) ([]netlink.Route, error)
}
type State string
const (
StateNotPresent State = ""
StateUp State = "up"
StateDown State = "down"
)
type InterfaceStateCallback func(ifaceName string, ifaceState State, ifIndex int)
type AddrStateCallback func(ifaceName string, addrs set.Set[string])
type InSyncCallback func()
type Config struct {
// InterfaceExcludes is a list of interface names that we don't want callbacks for.
InterfaceExcludes []*regexp.Regexp
// ResyncInterval is the interval at which we rescan all the interfaces. If <0 rescan is disabled.
ResyncInterval time.Duration
NetlinkTimeout time.Duration
}
type InterfaceMonitor struct {
Config
netlinkStub netlinkStub
resyncC <-chan time.Time
ifaceNameToIdx map[string]int
ifaceIdxToInfo map[int]*ifaceInfo
StateCallback InterfaceStateCallback
AddrCallback AddrStateCallback
InSyncCallback InSyncCallback
fatalErrCallback func(error)
}
type ifaceInfo struct {
Idx int
Name string
State State
TrackAddrs bool
Addrs set.Set[string]
}
func New(config Config,
featureDetector environment.FeatureDetectorIface,
fatalErrCallback func(error),
) *InterfaceMonitor {
// Interface monitor using the real netlink.
var resyncC <-chan time.Time
if config.ResyncInterval > 0 {
log.WithField("interval", config.ResyncInterval).Info(
"configured to periodically rescan interfaces.")
resyncTicker := time.NewTicker(config.ResyncInterval)
resyncC = resyncTicker.C
}
return NewWithStubs(config, newRealNetlink(featureDetector, config.NetlinkTimeout), resyncC, fatalErrCallback)
}
func NewWithStubs(config Config, netlinkStub netlinkStub, resyncC <-chan time.Time, fatalErrCallback func(error)) *InterfaceMonitor {
return &InterfaceMonitor{
Config: config,
netlinkStub: netlinkStub,
resyncC: resyncC,
ifaceNameToIdx: map[string]int{},
ifaceIdxToInfo: map[int]*ifaceInfo{},
fatalErrCallback: fatalErrCallback,
}
}
func IsInterfacePresent(name string) bool {
link, _ := netlink.LinkByName(name)
return link != nil
}
func (m *InterfaceMonitor) MonitorInterfaces() {
log.Info("Interface monitoring thread started.")
// Reconnection loop.
for {
var nlCancelC chan struct{}
filterUpdatesCtx, filterUpdatesCancel := context.WithCancel(context.Background())
filteredUpdates := make(chan netlink.LinkUpdate, 10)
filteredRouteUpdates := make(chan netlink.RouteUpdate, 10)
{
updates := make(chan netlink.LinkUpdate, 10)
routeUpdates := make(chan netlink.RouteUpdate, 10)
var err error
if nlCancelC, err = m.netlinkStub.Subscribe(updates, routeUpdates); err != nil {
// If we can't even subscribe, something must have gone very wrong. Bail.
m.fatalErrCallback(fmt.Errorf("failed to subscribe to netlink: %w", err))
filterUpdatesCancel()
return
}
go FilterUpdates(filterUpdatesCtx, filteredRouteUpdates, routeUpdates, filteredUpdates, updates)
}
log.Info("Subscribed to netlink updates.")
// Do a resync to notify all our existing interfaces. We also do periodic
// resyncs because it's not clear what the ordering guarantees are for our netlink
// subscription vs a list operation as used by resync().
err := m.resync()
if err != nil {
m.fatalErrCallback(fmt.Errorf("failed to read from netlink (initial resync): %w", err))
filterUpdatesCancel()
return
}
// Let the main goroutine know that we're in sync in order to unblock dataplane programming.
m.InSyncCallback()
readLoop:
for {
log.WithFields(log.Fields{
"updates": filteredUpdates,
"routeUpdates": filteredRouteUpdates,
"resyncC": m.resyncC,
}).Debug("About to select on possible triggers")
select {
case update, ok := <-filteredUpdates:
log.WithField("update", update).Debug("Link update")
if !ok {
log.Warn("Failed to read a link update")
break readLoop
}
m.handleNetlinkUpdate(update)
case routeUpdate, ok := <-filteredRouteUpdates:
log.WithField("addrUpdate", routeUpdate).Debug("Address update")
if !ok {
log.Warn("Failed to read an address update")
break readLoop
}
m.handleNetlinkRouteUpdate(routeUpdate)
case <-m.resyncC:
log.Debug("Resync trigger")
err := m.resync()
if err != nil {
m.fatalErrCallback(fmt.Errorf("failed to read from netlink (resync): %w", err))
close(nlCancelC)
filterUpdatesCancel()
return
}
}
}
close(nlCancelC)
filterUpdatesCancel()
log.Warn("Reconnecting to netlink after a failure...")
}
}
func (m *InterfaceMonitor) isExcludedInterface(ifName string) bool {
for _, nameExp := range m.InterfaceExcludes {
if nameExp.Match([]byte(ifName)) {
return true
}
}
return false
}
func (m *InterfaceMonitor) handleNetlinkUpdate(update netlink.LinkUpdate) {
attrs := update.Attrs()
linkAttrs := update.Link.Attrs()
if attrs == nil || linkAttrs == nil {
// Defensive, some sort of interface that the netlink lib doesn't understand?
log.WithField("update", update).Warn("Missing attributes on netlink update.")
return
}
msgType := update.Header.Type
ifaceExists := msgType == syscall.RTM_NEWLINK // Alternative is an RTM_DELLINK
m.storeAndNotifyLink(ifaceExists, update.Link)
}
func (m *InterfaceMonitor) handleNetlinkRouteUpdate(update netlink.RouteUpdate) {
ifIndex := update.LinkIndex
info := m.ifaceIdxToInfo[ifIndex]
// Early check: avoid logging anything for excluded interfaces.
if info != nil && !info.TrackAddrs {
return
}
if update.Dst == nil {
return
}
addr := update.Dst.IP.String()
exists := update.Type == unix.RTM_NEWROUTE
logCtx := log.WithFields(log.Fields{
"addr": addr,
"ifIndex": ifIndex,
"exists": exists,
})
if info == nil {
logCtx.Info("Netlink address update but interface isn't yet known. Will handle when interface is signalled.")
return
} else {
logCtx.Info("Netlink address update for known interface. ")
}
if exists {
if !info.Addrs.Contains(addr) {
info.Addrs.Add(addr)
m.notifyIfaceAddrs(info)
}
} else {
if info.Addrs.Contains(addr) {
info.Addrs.Discard(addr)
m.notifyIfaceAddrs(info)
}
}
}
func (m *InterfaceMonitor) notifyIfaceAddrs(info *ifaceInfo) {
logCtx := log.WithFields(log.Fields{
"ifIndex": info.Idx,
"name": info.Name,
})
if !info.TrackAddrs {
logCtx.Debug("Skipping notifying addresses for ignored interface")
return
}
logCtx.Debug("Notifying addresses for interface")
m.AddrCallback(info.Name, info.Addrs.Copy())
}
func (m *InterfaceMonitor) storeAndNotifyLink(ifaceExists bool, link netlink.Link) {
attrs := link.Attrs()
ifIndex := attrs.Index
newName := attrs.Name
log.WithFields(log.Fields{
"ifaceExists": ifaceExists,
"ifIndex": ifIndex,
"name": newName,
}).Debug("storeAndNotifyLink called")
if info := m.ifaceIdxToInfo[ifIndex]; info != nil && info.Name != newName {
log.WithFields(log.Fields{
"oldName": info.Name,
"newName": newName,
}).Info("Interface renamed, simulating deletion of old copy.")
m.storeAndNotifyLinkInner(false, info.Name, link)
}
m.storeAndNotifyLinkInner(ifaceExists, newName, link)
}
func LinkIsOperUp(link netlink.Link) bool {
// We need the operstate of the interface; this is carried in the IFF_RUNNING flag. The
// IFF_UP flag contains the admin state, which doesn't tell us whether we can program routes
// etc.
attrs := link.Attrs()
if attrs == nil {
return false
}
rawFlags := attrs.RawFlags
ifaceIsUp := rawFlags&syscall.IFF_RUNNING != 0
return ifaceIsUp
}
func (m *InterfaceMonitor) storeAndNotifyLinkInner(ifaceExists bool, ifaceName string, link netlink.Link) {
attrs := link.Attrs()
ifIndex := attrs.Index
log.WithFields(log.Fields{
"ifaceExists": ifaceExists,
"ifaceName": ifaceName,
"link": link,
"ifIndex": ifIndex,
}).Debug("storeAndNotifyLinkInner called")
// Calculate the old and new states of the interface.
oldState := StateNotPresent
if info := m.ifaceIdxToInfo[ifIndex]; info != nil {
oldState = info.State
}
newState := StateNotPresent
if ifaceExists {
if LinkIsOperUp(link) {
newState = StateUp
} else {
newState = StateDown
}
}
// Store or remove the information.
trackAddrs := !m.isExcludedInterface(ifaceName)
if ifaceExists {
if m.ifaceIdxToInfo[ifIndex] == nil {
m.ifaceIdxToInfo[ifIndex] = &ifaceInfo{
Idx: ifIndex,
Name: ifaceName,
TrackAddrs: trackAddrs,
Addrs: set.New[string](),
}
}
m.ifaceNameToIdx[ifaceName] = ifIndex
m.ifaceIdxToInfo[ifIndex].State = newState
} else {
delete(m.ifaceIdxToInfo, ifIndex)
delete(m.ifaceNameToIdx, ifaceName)
}
logCxt := log.WithFields(log.Fields{
"ifaceName": ifaceName,
"ifIndex": ifIndex,
"oldState": oldState,
"newState": newState,
})
if oldState != newState {
logCxt.Debug("Interface changed state")
m.StateCallback(ifaceName, newState, ifIndex)
} else {
logCxt.Debug("Interface state hasn't changed, nothing to notify.")
}
if !trackAddrs {
return
}
if newState == StateNotPresent {
if oldState != StateNotPresent {
// We were tracking addresses for this interface before but now it's gone. Signal that.
log.Debug("Notify link non-existence to address callback consumers")
m.AddrCallback(ifaceName, nil)
}
return
}
// The link now exists; get addresses for the link and store and notify those too; then
// we don't have to worry about a possible race between the link and address update
// channels. We deliberately do this regardless of the link state, as in some cases this
// will allow us to secure a Host Endpoint interface _before_ it comes up, and so eliminate
// a small window of insecurity.
newAddrs := set.New[string]()
for _, family := range [2]int{netlink.FAMILY_V4, netlink.FAMILY_V6} {
routes, err := m.netlinkStub.ListLocalRoutes(link, family)
if err != nil {
if errors.Is(err, unix.ENODEV) {
log.Debug("Tried to list routes for interface but it is gone, ignoring...")
continue
}
log.WithError(err).Warn("Netlink route list operation failed.")
}
for _, route := range routes {
if !routeIsLocalUnicast(route) {
log.WithField("route", route).Debug("Ignoring non-local route.")
continue
}
newAddrs.Add(route.Dst.IP.String())
}
}
info := m.ifaceIdxToInfo[ifIndex]
if oldState == StateNotPresent || !info.Addrs.Equals(newAddrs) {
log.WithFields(log.Fields{
"old": info.Addrs,
"new": newAddrs,
}).Debug("Detected interface address change while notifying link")
info.Addrs = newAddrs
m.notifyIfaceAddrs(info)
}
}
func (m *InterfaceMonitor) resync() error {
log.Debug("Resyncing interface state.")
links, err := m.netlinkStub.LinkList()
if err != nil {
log.WithError(err).Warn("Netlink list operation failed.")
return err
}
currentIfaces := set.New[string]()
for _, link := range links {
attrs := link.Attrs()
if attrs == nil {
// Defensive, some sort of interface that the netlink lib doesn't
// understand?
log.WithField("link", link).Warn("Missing attributes on netlink update.")
continue
}
currentIfaces.Add(attrs.Name)
m.storeAndNotifyLink(true, link)
}
for ifIndex, info := range m.ifaceIdxToInfo {
name := info.Name
if currentIfaces.Contains(name) {
continue
}
log.WithField("ifaceName", name).Info("Spotted interface removal on resync.")
m.StateCallback(name, StateNotPresent, ifIndex)
if info.TrackAddrs {
// We were tracking addresses for this interface before but now it's gone. Signal that.
m.AddrCallback(name, nil)
}
delete(m.ifaceNameToIdx, name)
delete(m.ifaceIdxToInfo, ifIndex)
}
log.Debug("Resync complete")
return nil
}