Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP]Support nested BGP peering with calico-nodes running in local kubevirt VM pods #9875

Open
wants to merge 24 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Review markups
  • Loading branch information
song-jiang committed Feb 26, 2025
commit 3b632021e3404b69ff7088c5a2403a6d1cd378ee
54 changes: 40 additions & 14 deletions felix/dataplane/linux/endpoint_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,20 @@ import (
"reflect"
"regexp"
"strings"
"time"

apiv3 "github.com/projectcalico/api/pkg/apis/projectcalico/v3"
log "github.com/sirupsen/logrus"
"github.com/vishvananda/netlink"

"github.com/projectcalico/calico/felix/dataplane/common"
"github.com/projectcalico/calico/felix/environment"
"github.com/projectcalico/calico/felix/generictables"
"github.com/projectcalico/calico/felix/ifacemonitor"
"github.com/projectcalico/calico/felix/ip"
"github.com/projectcalico/calico/felix/iptables"
"github.com/projectcalico/calico/felix/netlinkshim"
"github.com/projectcalico/calico/felix/netlinkshim/handlemgr"
"github.com/projectcalico/calico/felix/nftables"
"github.com/projectcalico/calico/felix/proto"
"github.com/projectcalico/calico/felix/routetable"
Expand Down Expand Up @@ -211,7 +215,9 @@ type endpointManager struct {
needToCheckDispatchChains bool
needToCheckEndpointMarkChains bool

nlHandle netlinkHandle
nl *handlemgr.HandleManager

newNetlinkHandle func() (netlinkshim.Interface, error)

// Callbacks
OnEndpointStatusUpdate EndpointStatusUpdateCallback
Expand Down Expand Up @@ -242,9 +248,9 @@ func newEndpointManager(
callbacks *common.Callbacks,
floatingIPsEnabled bool,
nft bool,
featureDetector environment.FeatureDetectorIface,
netlinkTimeout time.Duration,
) *endpointManager {
nlHandle, _ := netlink.NewHandle()

return newEndpointManagerWithShims(
rawTable,
mangleTable,
Expand All @@ -263,9 +269,10 @@ func newEndpointManager(
bpfEnabled,
bpfEndpointManager,
callbacks,
nlHandle,
floatingIPsEnabled,
nft,
featureDetector,
netlinkTimeout,
)
}

Expand All @@ -287,9 +294,10 @@ func newEndpointManagerWithShims(
bpfEnabled bool,
bpfEndpointManager hepListener,
callbacks *common.Callbacks,
nlHandle netlinkHandle,
floatingIPsEnabled bool,
nft bool,
featureDetector environment.FeatureDetectorIface,
netlinkTimeout time.Duration,
) *endpointManager {
wlIfacesPattern := "^(" + strings.Join(wlInterfacePrefixes, "|") + ").*"
wlIfacesRegexp := regexp.MustCompile(wlIfacesPattern)
Expand All @@ -301,7 +309,7 @@ func newEndpointManagerWithShims(
actions = nftables.Actions()
}

return &endpointManager{
epManager := &endpointManager{
ipVersion: ipVersion,
wlIfacesRegexp: wlIfacesRegexp,
kubeIPVSSupportEnabled: kubeIPVSSupportEnabled,
Expand Down Expand Up @@ -370,8 +378,16 @@ func newEndpointManagerWithShims(

OnEndpointStatusUpdate: onWorkloadEndpointStatusUpdate,
callbacks: newEndpointManagerCallbacks(callbacks, ipVersion),
nlHandle: nlHandle,
newNetlinkHandle: netlinkshim.NewRealNetlink,
}

epManager.nl = handlemgr.NewHandleManager(
featureDetector,
handlemgr.WithNewHandleOverride(epManager.newNetlinkHandle),
handlemgr.WithSocketTimeout(netlinkTimeout),
)

return epManager
}

func (m *endpointManager) OnUpdate(protoBufMsg interface{}) {
Expand Down Expand Up @@ -1708,14 +1724,19 @@ func netlinkAddrsContains(addrs []netlink.Addr, ip string) bool {
}

func (m *endpointManager) removeBGPPeerIPOnInterface(name string, peerIP string) error {
nl, err := m.nl.Handle()
if err != nil {
return fmt.Errorf("failed to connect to netlink")
}

// Remove local BGP peer IP from the inteface if it is present.
family := netlink.FAMILY_V4
if m.ipVersion == 6 {
family = netlink.FAMILY_V6
}

// Look up the interface.
link, err, notFound := lookupLink(m.nlHandle, name)
link, err, notFound := lookupLink(nl, name)
if notFound {
// The link has been removed. Address already gone.
return nil
Expand All @@ -1724,7 +1745,7 @@ func (m *endpointManager) removeBGPPeerIPOnInterface(name string, peerIP string)
return err
}

addrs, err := m.nlHandle.AddrList(link, family)
addrs, err := nl.AddrList(link, family)
if err != nil {
// Not sure why this would happen, but pass it up.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Link might be deleted under you by CNI plugin

log.WithError(err).Warning("Failed to list address on the link")
Expand All @@ -1743,9 +1764,9 @@ func (m *endpointManager) removeBGPPeerIPOnInterface(name string, peerIP string)
return err
}

if err = m.nlHandle.AddrDel(link, addr); err != nil {
if err = nl.AddrDel(link, addr); err != nil {
// Only emit the following warning log if the link still exists.
if _, _, notFound = lookupLink(m.nlHandle, name); !notFound {
if _, _, notFound = lookupLink(nl, name); !notFound {
log.WithField("address", addr).WithError(err).Warning("Failed to remove host side address on workload interface")
}
return err
Expand All @@ -1759,6 +1780,11 @@ func (m *endpointManager) ensureLocalBGPPeerIPOnInterface(name string) error {
logCtx := log.WithField("iface", name)
logCtx.Debug("Configure interface for local bpg peer role")

nl, err := m.nl.Handle()
if err != nil {
return fmt.Errorf("failed to connect to netlink")
}

if m.ifaceIsForLocalBGPPeer(name) {
if len(m.localBGPPeerIP) == 0 {
logCtx.Warning("no peer ip is defined trying to configure local BGP peer ip on interface")
Expand All @@ -1770,13 +1796,13 @@ func (m *endpointManager) ensureLocalBGPPeerIPOnInterface(name string) error {
family = netlink.FAMILY_V6
}

link, err := m.nlHandle.LinkByName(name)
link, err := nl.LinkByName(name)
if err != nil {
// Presumably the link is not up yet. We will be called again when it is.
log.WithError(err).Warning("Failed to look up device link")
return err
}
addrs, err := m.nlHandle.AddrList(link, family)
addrs, err := nl.AddrList(link, family)
if err != nil {
// Not sure why this would happen, but pass it up.
logCtx.WithError(err).Warning("Failed to list address on the link")
Expand All @@ -1794,7 +1820,7 @@ func (m *endpointManager) ensureLocalBGPPeerIPOnInterface(name string) error {
return err
}

if err = m.nlHandle.AddrAdd(link, addr); err != nil {
if err = nl.AddrAdd(link, addr); err != nil {
log.WithError(err).Warning("Failed to add peer ip")
return err
}
Expand Down
4 changes: 4 additions & 0 deletions felix/dataplane/linux/int_dataplane.go
Original file line number Diff line number Diff line change
Expand Up @@ -1029,6 +1029,8 @@ func NewIntDataplaneDriver(config Config) *InternalDataplane {
callbacks,
config.FloatingIPsEnabled,
config.RulesConfig.NFTables,
featureDetector,
config.NetlinkTimeout,
)
dp.RegisterManager(epManager)
dp.endpointsSourceV4 = epManager
Expand Down Expand Up @@ -1168,6 +1170,8 @@ func NewIntDataplaneDriver(config Config) *InternalDataplane {
callbacks,
config.FloatingIPsEnabled,
config.RulesConfig.NFTables,
featureDetector,
config.NetlinkTimeout,
))
dp.RegisterManager(newFloatingIPManager(natTableV6, ruleRenderer, 6, config.FloatingIPsEnabled))
dp.RegisterManager(newMasqManager(ipSetsV6, natTableV6, ruleRenderer, config.MaxIPSetSize, 6))
Expand Down