Skip to content

Commit

Permalink
multipool: Support allocating from new pools on demand
Browse files Browse the repository at this point in the history
This commit rewrites the way we compute the number of requested IPs for
each pool. Previously, we required the user to specify in advance what
pool the agent would allocate from. This however is very limiting, since
it's often not known in advance what workload a particular node will
run.

Therefore, this commit adds a mechanism where upon observing a
allocation attempt for a new pool, we update the local CiliumNode CRD to
request additional IPs for that pool. This then allows the allocation
request to be successful upon retry. This retry mechanism is similar to
how Cilium IPAM also works e.g. in ENI mode (where if the local node is
out of IPs, we fail the CNI ADD request and expect kubelet to re-try
again later).

This PR also implicitly adds support for allocating from pools without a
"pre-allocate" value. This allows users to configure very small pools
for specific workloads and assign those few IPs only if they are
actually used.

Signed-off-by: Sebastian Wicki <[email protected]>
  • Loading branch information
gandro committed Jun 1, 2023
1 parent 01237a8 commit ea63399
Show file tree
Hide file tree
Showing 2 changed files with 231 additions and 44 deletions.
147 changes: 106 additions & 41 deletions pkg/ipam/multipool.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ type multiPoolManager struct {
owner Owner

preallocatedIPsPerPool preAllocatePerPool
pendingIPsPerPool *pendingAllocationsPerPool

pools map[Pool]*poolPair
poolsUpdated chan struct{}
Expand Down Expand Up @@ -227,6 +228,7 @@ func newMultiPoolManager(conf Configuration, nodeWatcher nodeWatcher, owner Owne
owner: owner,
conf: conf,
preallocatedIPsPerPool: preallocMap,
pendingIPsPerPool: newPendingAllocationsPerPool(),
pools: map[Pool]*poolPair{},
poolsUpdated: make(chan struct{}, 1),
node: nil,
Expand Down Expand Up @@ -347,61 +349,112 @@ func neededIPCeil(numIP int, preAlloc int) int {
return (quotient + 1) * preAlloc
}

func (m *multiPoolManager) updateCiliumNode(ctx context.Context) error {
m.mutex.Lock()
newNode := m.node.DeepCopy()
requested := []types.IPAMPoolRequest{}
allocated := []types.IPAMPoolAllocation{}
// computeNeededIPsPerPoolLocked computes how many IPs we want to request from
// the operator for each pool. The formula we use for each pool is basically
//
// neededIPs = roundUp(inUseIPs + pendingIPs + preAllocIPs, preAllocIPs)
//
// inUseIPs Number of IPs that are currently actively in use
// pendingIPs Number of IPs that have been requested, but not yet assigned
// preAllocIPs Minimum number of IPs that we want to pre-allocate as a buffer
//
// Rounded up to the next multiple of preAllocIPs.
func (m *multiPoolManager) computeNeededIPsPerPoolLocked() map[Pool]types.IPAMPoolDemand {
demand := make(map[Pool]types.IPAMPoolDemand, len(m.pools))

// Only pools present in multi-pool-node-pre-alloc can be requested
for poolName, preAlloc := range m.preallocatedIPsPerPool {
var neededIPv4, neededIPv6 int
pool, ok := m.pools[poolName]
if ok {
if pool.v4 != nil {
neededIPv4 = pool.v4.inUseIPCount()
}
if pool.v6 != nil {
neededIPv6 = pool.v6.inUseIPCount()
}
// inUseIPs
for poolName, pool := range m.pools {
ipv4Addrs := 0
if p := pool.v4; p != nil {
ipv4Addrs = p.inUseIPCount()
}
ipv6Addrs := 0
if p := pool.v6; p != nil {
ipv6Addrs = p.inUseIPCount()
}

demand[poolName] = types.IPAMPoolDemand{
IPv4Addrs: ipv4Addrs,
IPv6Addrs: ipv6Addrs,
}
}

// + pendingIPs
for poolName, pending := range m.pendingIPsPerPool.pools {
ipv4Addrs := demand[poolName].IPv4Addrs + pending.pendingForFamily(IPv4)
ipv6Addrs := demand[poolName].IPv6Addrs + pending.pendingForFamily(IPv6)

demand[poolName] = types.IPAMPoolDemand{
IPv4Addrs: ipv4Addrs,
IPv6Addrs: ipv6Addrs,
}
}

// + preAllocIPs
for poolName, preAlloc := range m.preallocatedIPsPerPool {
ipv4Addrs := demand[poolName].IPv4Addrs
if m.conf.IPv4Enabled() {
neededIPv4 = neededIPCeil(neededIPv4, int(preAlloc))
if ok && pool.v4 != nil {
pool.v4.releaseExcessCIDRsMultiPool(neededIPv4)
}
ipv4Addrs = neededIPCeil(ipv4Addrs, preAlloc)
}
ipv6Addrs := demand[poolName].IPv6Addrs
if m.conf.IPv6Enabled() {
neededIPv6 = neededIPCeil(neededIPv6, int(preAlloc))
if ok && pool.v6 != nil {
pool.v6.releaseExcessCIDRsMultiPool(neededIPv6)
}
ipv6Addrs = neededIPCeil(ipv6Addrs, preAlloc)
}

demand[poolName] = types.IPAMPoolDemand{
IPv4Addrs: ipv4Addrs,
IPv6Addrs: ipv6Addrs,
}
}

return demand
}

func (m *multiPoolManager) updateCiliumNode(ctx context.Context) error {
m.mutex.Lock()
newNode := m.node.DeepCopy()
requested := []types.IPAMPoolRequest{}
allocated := []types.IPAMPoolAllocation{}

m.pendingIPsPerPool.removeExpiredEntries()
neededIPsPerPool := m.computeNeededIPsPerPoolLocked()
for poolName, needed := range neededIPsPerPool {
if needed.IPv4Addrs == 0 && needed.IPv6Addrs == 0 {
continue // no need to request "0" IPs
}

requested = append(requested, types.IPAMPoolRequest{
Pool: poolName.String(),
Needed: types.IPAMPoolDemand{
IPv4Addrs: neededIPv4,
IPv6Addrs: neededIPv6,
},
Pool: poolName.String(),
Needed: needed,
})
}

// Write in-use pools to podCIDR. This removes any released pod CIDRs
for poolName, pool := range m.pools {
neededIPs := neededIPsPerPool[poolName]

cidrs := []types.IPAMPodCIDR{}
if pool.v4 != nil {
v4CIDRs := pool.v4.inUsePodCIDRs()
if v4Pool := pool.v4; v4Pool != nil {
v4Pool.releaseExcessCIDRsMultiPool(neededIPs.IPv4Addrs)
v4CIDRs := v4Pool.inUsePodCIDRs()

slices.Sort(v4CIDRs)
cidrs = append(cidrs, v4CIDRs...)
}
if pool.v6 != nil {
v6CIDRs := pool.v6.inUsePodCIDRs()
if v6Pool := pool.v6; v6Pool != nil {
v6Pool.releaseExcessCIDRsMultiPool(neededIPs.IPv6Addrs)
v6CIDRs := v6Pool.inUsePodCIDRs()

slices.Sort(v6CIDRs)
cidrs = append(cidrs, v6CIDRs...)
}

// remove pool if we've released all CIDRs
if len(cidrs) == 0 {
delete(m.pools, poolName)
continue
}

allocated = append(allocated, types.IPAMPoolAllocation{
Pool: poolName.String(),
CIDRs: cidrs,
Expand Down Expand Up @@ -547,39 +600,51 @@ func (m *multiPoolManager) allocateNext(owner string, poolName Pool, family Fami
m.mutex.Lock()
defer m.mutex.Unlock()

defer func() {
if syncUpstream {
m.k8sUpdater.TriggerWithReason("allocation of next IP")
}
}()

pool := m.poolByFamilyLocked(poolName, family)
if pool == nil {
return nil, fmt.Errorf("unable to allocate from unknown pool %q (family %s)", poolName, family)
m.pendingIPsPerPool.upsertPendingAllocation(poolName, owner, family)
return nil, fmt.Errorf("unable to allocate from pool %q (family %s): pool not (yet) available", poolName, family)
}

ip, err := pool.allocateNext()
if err != nil {
m.pendingIPsPerPool.upsertPendingAllocation(poolName, owner, family)
return nil, err
}

if syncUpstream {
m.k8sUpdater.TriggerWithReason("allocation of next IP")
}
m.pendingIPsPerPool.markAsAllocated(poolName, owner, family)
return &AllocationResult{IP: ip, IPPoolName: poolName}, nil
}

func (m *multiPoolManager) allocateIP(ip net.IP, owner string, poolName Pool, family Family, syncUpstream bool) (*AllocationResult, error) {
m.mutex.Lock()
defer m.mutex.Unlock()

defer func() {
if syncUpstream {
m.k8sUpdater.TriggerWithReason("allocation of specific IP")
}
}()

pool := m.poolByFamilyLocked(poolName, family)
if pool == nil {
return nil, fmt.Errorf("unable to reserve IP %s from unknown pool %q (family %s)", ip, poolName, family)
m.pendingIPsPerPool.upsertPendingAllocation(poolName, owner, family)
return nil, fmt.Errorf("unable to reserve IP %s from pool %q (family %s): pool not (yet) available", ip, poolName, family)
}

err := pool.allocate(ip)
if err != nil {
m.pendingIPsPerPool.upsertPendingAllocation(poolName, owner, family)
return nil, err
}

if syncUpstream {
m.k8sUpdater.TriggerWithReason("allocation of IP")
}
m.pendingIPsPerPool.markAsAllocated(poolName, owner, family)
return &AllocationResult{IP: ip, IPPoolName: poolName}, nil
}

Expand Down
128 changes: 125 additions & 3 deletions pkg/ipam/multipool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package ipam

import (
"context"
"errors"
"fmt"
"net"
Expand Down Expand Up @@ -111,14 +112,135 @@ func Test_MultiPoolManager(t *testing.T) {
assert.Error(t, err, ipallocator.ErrAllocated)
assert.Nil(t, faultyAllocation)

// allocation from an unknown pool should return an error
// Allocation from an unknown pool should create a new pending allocation
jupiterIPv4CIDR := cidr.MustParseCIDR("192.168.1.0/16")
juptierIPv6CIDR := cidr.MustParseCIDR("fc00:33::/96")

faultyAllocation, err = c.allocateIP(net.ParseIP("192.168.1.1"), "jupiter-pod-0", "jupiter", IPv4, false)
assert.ErrorContains(t, err, "unknown pool")
assert.ErrorContains(t, err, "pool not (yet) available")
assert.Nil(t, faultyAllocation)
faultyAllocation, err = c.allocateNext("jupiter-pod-1", "jupiter", IPv6, false)
assert.ErrorContains(t, err, "unknown pool")
assert.ErrorContains(t, err, "pool not (yet) available")
assert.Nil(t, faultyAllocation)
// Try again. This should still fail, but not request an additional third IP
// (since the owner has already attempted to allocate). This however sets
// upstreamSync to 'true', which should populate .Spec.IPAM.Pools.Requested
// with pending requests for the "jupiter" pool
faultyAllocation, err = c.allocateNext("jupiter-pod-1", "jupiter", IPv6, true)
assert.ErrorContains(t, err, "pool not (yet) available")
assert.Nil(t, faultyAllocation)

// Check if the agent now requests one IPv4 and one IPv6 IP for the jupiter pool
assert.Equal(t, <-events, "upsert")
currentNode = fakeK8sCiliumNodeAPI.currentNode()
assert.Equal(t, []types.IPAMPoolRequest{
{
Pool: "default",
Needed: types.IPAMPoolDemand{
IPv4Addrs: 32, // 1 allocated + 16 pre-allocate, rounded up to multiple of 16
IPv6Addrs: 16, // 0 allocated + 16 pre-allocate
},
},
{
Pool: "jupiter",
Needed: types.IPAMPoolDemand{
IPv4Addrs: 1, // 1 pending, no pre-allocate
IPv6Addrs: 1, // 1 pending, no pre-allocate
},
},
{
Pool: "mars",
Needed: types.IPAMPoolDemand{
IPv4Addrs: 8, // 0 allocated + 8 pre-allocate
IPv6Addrs: 8, // 0 allocated + 8 pre-allocate
},
},
}, currentNode.Spec.IPAM.Pools.Requested)

// Assign the jupiter pool
currentNode.Spec.IPAM.Pools.Allocated = []types.IPAMPoolAllocation{
{
Pool: "default",
CIDRs: []types.IPAMPodCIDR{
types.IPAMPodCIDR(defaultIPv6CIDR1.String()),
types.IPAMPodCIDR(defaultIPv4CIDR1.String()),
},
},
{
Pool: "jupiter",
CIDRs: []types.IPAMPodCIDR{
types.IPAMPodCIDR(jupiterIPv4CIDR.String()),
types.IPAMPodCIDR(juptierIPv6CIDR.String()),
},
},
{
Pool: "mars",
CIDRs: []types.IPAMPodCIDR{
types.IPAMPodCIDR(marsIPv6CIDR1.String()),
types.IPAMPodCIDR(marsIPv4CIDR1.String()),
},
},
}
fakeK8sCiliumNodeAPI.updateNode(currentNode)
assert.Equal(t, <-events, "upsert")

c.waitForPool(context.TODO(), IPv4, "jupiter")
c.waitForPool(context.TODO(), IPv6, "jupiter")

// Allocations should now succeed
jupiterIP0 := net.ParseIP("192.168.1.1")
allocatedJupiterIP0, err := c.allocateIP(jupiterIP0, "jupiter-pod-0", "jupiter", IPv4, false)
assert.Nil(t, err)
assert.True(t, jupiterIP0.Equal(allocatedJupiterIP0.IP))
allocatedJupiterIP1, err := c.allocateNext("jupiter-pod-1", "jupiter", IPv6, false)
assert.Nil(t, err)
assert.True(t, juptierIPv6CIDR.Contains(allocatedJupiterIP1.IP))

// Release IPs from jupiter pool. This should fully remove it from both
// "requested" and "allocated"
err = c.releaseIP(allocatedJupiterIP0.IP, "jupiter", IPv4, false)
assert.Nil(t, err)
err = c.releaseIP(allocatedJupiterIP1.IP, "jupiter", IPv6, true) // triggers sync
assert.Nil(t, err)

// Wait for agent to release jupiter CIDRs
assert.Equal(t, <-events, "upsert")
currentNode = fakeK8sCiliumNodeAPI.currentNode()
assert.Equal(t, types.IPAMPoolSpec{
Requested: []types.IPAMPoolRequest{
{
Pool: "default",
Needed: types.IPAMPoolDemand{
IPv4Addrs: 32, // 1 allocated + 16 pre-allocate, rounded up to multiple of 16
IPv6Addrs: 16, // 0 allocated + 16 pre-allocate
},
},
{
Pool: "mars",
Needed: types.IPAMPoolDemand{
IPv4Addrs: 8, // 0 allocated + 8 pre-allocate
IPv6Addrs: 8, // 0 allocated + 8 pre-allocate
},
},
},
Allocated: []types.IPAMPoolAllocation{
{
Pool: "default",
CIDRs: []types.IPAMPodCIDR{
types.IPAMPodCIDR(defaultIPv4CIDR1.String()),
types.IPAMPodCIDR(defaultIPv6CIDR1.String()),
},
},
{
Pool: "mars",
CIDRs: []types.IPAMPodCIDR{
types.IPAMPodCIDR(marsIPv4CIDR1.String()),
types.IPAMPodCIDR(marsIPv6CIDR1.String()),
},
},
},
}, currentNode.Spec.IPAM.Pools)

// exhaust mars ipv4 pool (/27 contains 30 IPs)
allocatedMarsIPs := []net.IP{}
numMarsIPs := 30
Expand Down

0 comments on commit ea63399

Please sign in to comment.