Skip to content

Commit

Permalink
Make jump map index assignment more robust
Browse files Browse the repository at this point in the history
- Track who owns which index.
- Check assignment when freeing indexes.
- Let object constructor handle pre-populating the
  free set.
- Add ability to forcefully claim a free index for
  start-of-day sync.
- Detect conflicts during the sync and claim new index.
- Add tests for jump map clean up and collision detection.
  • Loading branch information
fasaxc committed Nov 21, 2023
1 parent e673bee commit 2f353e9
Show file tree
Hide file tree
Showing 3 changed files with 524 additions and 81 deletions.
9 changes: 6 additions & 3 deletions felix/bpf/ifstate/map.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ const (
const (
FlgWEP = uint32(0x1)
FlgReady = uint32(0x2)
FlgMax = uint32(0x3)
)

var flagsToStr = map[uint32]string{
Expand Down Expand Up @@ -151,7 +152,8 @@ func (v Value) String() string {
fstr := ""
f := v.Flags()

for k, v := range flagsToStr {
for k := FlgWEP; k < FlgMax; k++ {
v := flagsToStr[k]
if f&k != 0 {
fstr = fstr + v + ","
}
Expand All @@ -161,8 +163,9 @@ func (v Value) String() string {
fstr = "host,"
}

return fmt.Sprintf("{flags: %s XDPPolicy: %d, IngressPolicy: %d, EgressPolicy: %d, name: %s}",
fstr, v.XDPPolicy(), v.IngressPolicy(), v.EgressPolicy(), v.IfName())
return fmt.Sprintf(
"{flags: %s XDPPolicy: %d, IngressPolicy: %d, EgressPolicy: %d, IngressFilter: %d, EgressFilter: %d, name: %s}",
fstr, v.XDPPolicy(), v.IngressPolicy(), v.EgressPolicy(), v.TcIngressFilter(), v.TcEgressFilter(), v.IfName())
}

func ValueFromBytes(b []byte) Value {
Expand Down
204 changes: 134 additions & 70 deletions felix/dataplane/linux/bpf_ep_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -796,35 +796,35 @@ func (m *bpfEndpointManager) updateIfaceStateMap(name string, iface *bpfInterfac
if err := m.jumpMapDelete(hook.XDP, iface.dpState.policyIdx[hook.XDP]); err != nil {
log.WithError(err).Warn("Policy program may leak.")
}
if err := m.xdpJumpMapAlloc.Put(iface.dpState.policyIdx[hook.XDP]); err != nil {
if err := m.xdpJumpMapAlloc.Put(iface.dpState.policyIdx[hook.XDP], name); err != nil {
log.WithError(err).Error("XDP")
}

if err := m.jumpMapDelete(hook.Ingress, iface.dpState.policyIdx[hook.Ingress]); err != nil {
log.WithError(err).Warn("Policy program may leak.")
}
if err := m.jumpMapAlloc.Put(iface.dpState.policyIdx[hook.Ingress]); err != nil {
if err := m.jumpMapAlloc.Put(iface.dpState.policyIdx[hook.Ingress], name); err != nil {
log.WithError(err).Error("Ingress")
}

if err := m.jumpMapDelete(hook.Egress, iface.dpState.policyIdx[hook.Egress]); err != nil {
log.WithError(err).Warn("Policy program may leak.")
}
if err := m.jumpMapAlloc.Put(iface.dpState.policyIdx[hook.Egress]); err != nil {
if err := m.jumpMapAlloc.Put(iface.dpState.policyIdx[hook.Egress], name); err != nil {
log.WithError(err).Error("Ingress")
}

if err := m.jumpMapDelete(hook.Egress, iface.dpState.filterIdx[hook.Egress]); err != nil {
log.WithError(err).Warn("Filter program may leak.")
}
if err := m.jumpMapAlloc.Put(iface.dpState.filterIdx[hook.Egress]); err != nil {
if err := m.jumpMapAlloc.Put(iface.dpState.filterIdx[hook.Egress], name); err != nil {
log.WithError(err).Error("Ingress")
}

if err := m.jumpMapDelete(hook.Ingress, iface.dpState.filterIdx[hook.Ingress]); err != nil {
log.WithError(err).Warn("Filter program may leak.")
}
if err := m.jumpMapAlloc.Put(iface.dpState.filterIdx[hook.Ingress]); err != nil {
if err := m.jumpMapAlloc.Put(iface.dpState.filterIdx[hook.Ingress], name); err != nil {
log.WithError(err).Error("Ingress")
}

Expand Down Expand Up @@ -1124,8 +1124,8 @@ func (m *bpfEndpointManager) interfaceByIndex(ifindex int) (*net.Interface, erro
}

func (m *bpfEndpointManager) syncIfStateMap() {
palloc := set.New[int]()
xdpPalloc := set.New[int]()
tcSeenIndexes := set.New[int]()
xdpSeenIndexes := set.New[int]()

m.ifacesLock.Lock()
defer m.ifacesLock.Unlock()
Expand Down Expand Up @@ -1174,33 +1174,39 @@ func (m *bpfEndpointManager) syncIfStateMap() {
iface.dpState.readiness = ifaceIsReadyNotAssured
}
}

var idx int

if idx = v.XDPPolicy(); idx != -1 {
xdpPalloc.Add(idx)
}
iface.dpState.policyIdx[hook.XDP] = idx

if idx = v.IngressPolicy(); idx != -1 {
palloc.Add(idx)
}
iface.dpState.policyIdx[hook.Ingress] = idx

if idx = v.EgressPolicy(); idx != -1 {
palloc.Add(idx)
checkAndReclaimIdx := func(idx int, h hook.Hook, indexMap []int) {
if idx < 0 {
return
}
var alloc *jumpMapAlloc
var seenIndexes set.Set[int]
if h == hook.XDP {
alloc = m.xdpJumpMapAlloc
seenIndexes = xdpSeenIndexes
} else {
alloc = m.jumpMapAlloc
seenIndexes = tcSeenIndexes
}
if err := alloc.Assign(idx, netiface.Name); err != nil {
// Conflict with another program; need to alloc a new index.
log.WithError(err).Error("Start of day resync found invalid jump map index, " +
"allocate a fresh one.")
idx = -1
} else {
seenIndexes.Add(idx)
}
indexMap[h] = idx
}
iface.dpState.policyIdx[hook.Egress] = idx

if idx = v.TcIngressFilter(); idx != -1 {
palloc.Add(idx)
if !m.isWorkloadIface(netiface.Name) {
// We don't use XDP for WEPs so any ID we read back must be a mistake.
checkAndReclaimIdx(v.XDPPolicy(), hook.XDP, iface.dpState.policyIdx[:])
}
iface.dpState.filterIdx[hook.Ingress] = idx
checkAndReclaimIdx(v.IngressPolicy(), hook.Ingress, iface.dpState.policyIdx[:])
checkAndReclaimIdx(v.EgressPolicy(), hook.Egress, iface.dpState.policyIdx[:])

if idx = v.TcEgressFilter(); idx != -1 {
palloc.Add(idx)
}
iface.dpState.filterIdx[hook.Egress] = idx
checkAndReclaimIdx(v.TcIngressFilter(), hook.Ingress, iface.dpState.filterIdx[:])
checkAndReclaimIdx(v.TcEgressFilter(), hook.Egress, iface.dpState.filterIdx[:])

// Mark all interfaces that we knew about, that we still manage and
// that exist as dirty. Since they exist, we either have to deal
Expand All @@ -1217,18 +1223,6 @@ func (m *bpfEndpointManager) syncIfStateMap() {
m.ifStateMap.Desired().Delete(k)
}
})

// Fill unallocated indexes.
for i := 0; i < jump.TCMaxEntryPoints; i++ {
if !palloc.Contains(i) {
_ = m.jumpMapAlloc.Put(i)
}
}
for i := 0; i < jump.XDPMaxEntryPoints; i++ {
if !xdpPalloc.Contains(i) {
_ = m.xdpJumpMapAlloc.Put(i)
}
}
}

func (m *bpfEndpointManager) syncIfaceProperties() error {
Expand Down Expand Up @@ -1443,33 +1437,34 @@ func (m *bpfEndpointManager) applyProgramsToDirtyDataInterfaces() {

m.ifacesLock.Lock()
defer m.ifacesLock.Unlock()
ifaceName := iface
m.withIface(iface, func(iface *bpfInterface) bool {
up = iface.info.ifaceIsUp()

if xdpIdx = iface.dpState.policyIdx[hook.XDP]; xdpIdx == -1 {
if xdpIdx, err = m.xdpJumpMapAlloc.Get(); err != nil {
if xdpIdx, err = m.xdpJumpMapAlloc.Get(ifaceName); err != nil {
return false
}
}
iface.dpState.policyIdx[hook.XDP] = xdpIdx

if ingressIdx = iface.dpState.policyIdx[hook.Ingress]; ingressIdx == -1 {
if ingressIdx, err = m.jumpMapAlloc.Get(); err != nil {
if ingressIdx, err = m.jumpMapAlloc.Get(ifaceName); err != nil {
return false
}
}
iface.dpState.policyIdx[hook.Ingress] = ingressIdx

if egressIdx = iface.dpState.policyIdx[hook.Egress]; egressIdx == -1 {
if egressIdx, err = m.jumpMapAlloc.Get(); err != nil {
if egressIdx, err = m.jumpMapAlloc.Get(ifaceName); err != nil {
return false
}
}
iface.dpState.policyIdx[hook.Egress] = egressIdx

if ingressFilterIdx = iface.dpState.filterIdx[hook.Ingress]; ingressFilterIdx == -1 {
if m.bpfLogLevel == "debug" {
if ingressFilterIdx, err = m.jumpMapAlloc.Get(); err != nil {
if ingressFilterIdx, err = m.jumpMapAlloc.Get(ifaceName); err != nil {
return false
}
}
Expand All @@ -1478,7 +1473,7 @@ func (m *bpfEndpointManager) applyProgramsToDirtyDataInterfaces() {

if egressFilterIdx = iface.dpState.filterIdx[hook.Egress]; egressFilterIdx == -1 {
if m.bpfLogLevel == "debug" {
if egressFilterIdx, err = m.jumpMapAlloc.Get(); err != nil {
if egressFilterIdx, err = m.jumpMapAlloc.Get(ifaceName); err != nil {
return false
}
}
Expand Down Expand Up @@ -1679,33 +1674,33 @@ func (m *bpfEndpointManager) updateWEPsInDataplane() {
}
}

func (m *bpfEndpointManager) wepStateFillJumps(state *bpfInterfaceState) error {
func (m *bpfEndpointManager) wepStateFillJumps(ifaceName string, state *bpfInterfaceState) error {
var err error

if state.policyIdx[hook.Ingress] == -1 {
state.policyIdx[hook.Ingress], err = m.jumpMapAlloc.Get()
state.policyIdx[hook.Ingress], err = m.jumpMapAlloc.Get(ifaceName)
if err != nil {
return err
}
}

if state.policyIdx[hook.Egress] == -1 {
state.policyIdx[hook.Egress], err = m.jumpMapAlloc.Get()
state.policyIdx[hook.Egress], err = m.jumpMapAlloc.Get(ifaceName)
if err != nil {
return err
}
}

if m.bpfLogLevel == "debug" {
if state.filterIdx[hook.Ingress] == -1 {
state.filterIdx[hook.Ingress], err = m.jumpMapAlloc.Get()
state.filterIdx[hook.Ingress], err = m.jumpMapAlloc.Get(ifaceName)
if err != nil {
return err
}
}

if state.filterIdx[hook.Egress] == -1 {
state.filterIdx[hook.Egress], err = m.jumpMapAlloc.Get()
state.filterIdx[hook.Egress], err = m.jumpMapAlloc.Get(ifaceName)
if err != nil {
return err
}
Expand Down Expand Up @@ -1747,7 +1742,7 @@ func (m *bpfEndpointManager) doApplyPolicy(ifaceName string) (bpfInterfaceState,
return state, nil
}

if err := m.wepStateFillJumps(&state); err != nil {
if err := m.wepStateFillJumps(ifaceName, &state); err != nil {
return state, err
}

Expand Down Expand Up @@ -3375,35 +3370,104 @@ func (m *bpfEndpointManager) ruleMatchID(dir, action, owner, name string, idx in
}

func newJumpMapAlloc(entryPoints int) *jumpMapAlloc {
return &jumpMapAlloc{
max: entryPoints,
free: make(chan int, entryPoints),
a := &jumpMapAlloc{
max: entryPoints,
free: set.New[int](),
freeStack: make([]int, entryPoints),
inUse: map[int]string{},
}
for i := 0; i < entryPoints; i++ {
a.free.Add(i)
a.freeStack[entryPoints-1-i] = i
}
return a
}

type jumpMapAlloc struct {
lock sync.Mutex
max int
free chan int

free set.Set[int]
freeStack []int
inUse map[int]string
}

func (pa *jumpMapAlloc) Get() (int, error) {
select {
case i := <-pa.free:
return i, nil
default:
return -1, errors.New("ran out of policy map indexes")
func (pa *jumpMapAlloc) Get(owner string) (int, error) {
pa.lock.Lock()
defer pa.lock.Unlock()

if len(pa.freeStack) == 0 {
return -1, errors.New("jumpMapAlloc: ran out of policy map indexes")
}
idx := pa.freeStack[len(pa.freeStack)-1]
pa.freeStack = pa.freeStack[:len(pa.freeStack)-1]
pa.free.Discard(idx)
pa.inUse[idx] = owner

log.WithFields(log.Fields{"owner": owner, "index": idx}).Debug("jumpMapAlloc: Allocated policy map index")
pa.checkFreeLockHeld(idx)
return idx, nil
}

// Assign explicitly assigns ownership of a specific free index to the given
// owner. Used at start-of-day to re-establish existing ownerships.
func (pa *jumpMapAlloc) Assign(idx int, owner string) error {
if idx < 0 || idx >= pa.max {
return fmt.Errorf("index %d out of jump map range", idx)
}

pa.lock.Lock()
defer pa.lock.Unlock()

if recordedOwner, ok := pa.inUse[idx]; ok {
err := fmt.Errorf("jumpMapAlloc: trying to set owner of %d to %q but it is owned by %q", idx, owner, recordedOwner)
return err
}

pa.free.Discard(idx)
pa.inUse[idx] = owner
// Iterate backwards because it's most likely that the previously-used
// item came from the lower indexes (which start life at the end of
// the stack slice).
for i := len(pa.freeStack) - 1; i >= 0; i-- {
if pa.freeStack[i] == idx {
pa.freeStack[i] = pa.freeStack[len(pa.freeStack)-1]
pa.freeStack = pa.freeStack[:len(pa.freeStack)-1]
break
}
}
pa.checkFreeLockHeld(idx)
return nil
}

func (pa *jumpMapAlloc) Put(i int) error {
if i < 0 || i >= pa.max {
// Put puts an index into the free pool. The recorded owner must match the
// given owner.
func (pa *jumpMapAlloc) Put(idx int, owner string) error {
if idx < 0 || idx >= pa.max {
return nil // ignore, especially if an index is -1 aka unused
}

select {
case pa.free <- i:
return nil
default:
return errors.New("returning more policy indexes than previously allocated!")
pa.lock.Lock()
defer pa.lock.Unlock()

if recordedOwner, ok := pa.inUse[idx]; !ok || recordedOwner != owner {
err := fmt.Errorf("jumpMapAlloc: %q trying to free index %d but it is owned by %q", owner, idx, recordedOwner)
return err
}
log.WithFields(log.Fields{"owner": owner, "index": idx}).Debug("jumpMapAlloc: Released policy map index")
delete(pa.inUse, idx)
pa.free.Add(idx)
pa.freeStack = append(pa.freeStack, idx)
pa.checkFreeLockHeld(idx)
return nil
}

func (pa *jumpMapAlloc) checkFreeLockHeld(idx int) {
if len(pa.freeStack) != pa.free.Len() {
log.WithFields(log.Fields{
"assigning": idx,
"set": pa.free,
"stack": pa.freeStack,
}).Panic("jumpMapAlloc: Free set and free stack got out of sync")
}
}
Loading

0 comments on commit 2f353e9

Please sign in to comment.