diff --git a/go.mod b/go.mod index 3e8d517a..cc8d0006 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ replace github.com/insomniacslk/dhcp => github.com/harvester/dhcp v0.0.0-2022042 require ( github.com/cloudflare/ipvs v0.8.0 github.com/davecgh/go-spew v1.1.1 + github.com/florianl/go-conntrack v0.3.0 github.com/ghodss/yaml v1.0.0 github.com/golang/protobuf v1.5.2 github.com/insomniacslk/dhcp v0.0.0-20220119180841-3c283ff8b7dd @@ -38,6 +39,7 @@ require ( github.com/alessio/shellescape v1.4.1 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect + github.com/coreos/go-iptables v0.6.0 github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13 // indirect github.com/eapache/channels v1.1.0 // indirect github.com/eapache/queue v1.1.0 // indirect diff --git a/go.sum b/go.sum index 2a4c1148..d4fd5242 100644 --- a/go.sum +++ b/go.sum @@ -108,6 +108,8 @@ github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWH github.com/cncf/xds/go v0.0.0-20211001041855-01bcc9b48dfe/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20211130200136-a8f946100490/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= +github.com/coreos/go-iptables v0.6.0 h1:is9qnZMPYjLd8LYqmm/qlE+wwEgJIkTYdhV3rfZo4jk= +github.com/coreos/go-iptables v0.6.0/go.mod h1:Qe8Bv2Xik5FyTXwgIbLAnv2sWSBmvWdFETJConOQ//Q= github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= @@ -146,6 +148,8 @@ github.com/fanliao/go-promise v0.0.0-20141029170127-1890db352a72/go.mod h1:Pjfxu github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU= github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= +github.com/florianl/go-conntrack v0.3.0 h1:DUY84Mce+/lE9dJi2EWvGYacQtX2X96J9aVWV99l8UE= +github.com/florianl/go-conntrack v0.3.0/go.mod h1:Q+Um4J/nWUXSbnyzQRMOP4eweSeEQ2G8sfCO5gMz6Pw= github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k= github.com/form3tech-oss/jwt-go v3.2.3+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k= github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM0I9ntUbOk+k= @@ -803,6 +807,7 @@ golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210816183151-1e6c022a8912/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210831042530-f4d43177bf5e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/main.go b/main.go index 9e20cb03..9137ea6a 100644 --- a/main.go +++ b/main.go @@ -11,6 +11,73 @@ var Version string var Build string func main() { + + // i, err := vip.CreateIptablesClient() + // if err != nil { + // panic(err) + // } + // if os.Getenv("CREATE") != "" { + + // b, err := i.CheckMangleChain(vip.MangleChainName) + // if err != nil { + // panic(err) + // } + // if b == false { + // err = i.CreateMangleChain(vip.MangleChainName) + // if err != nil { + // panic(err) + // } + // } + // // Add entries to our mangle chaing + // err = i.AppendReturnRulesForDestinationSubnet(vip.MangleChainName, "10.0.0.0/16") + // if err != nil { + // panic(err) + // } + // err = i.AppendReturnRulesForDestinationSubnet(vip.MangleChainName, "10.96.0.0/12") + // if err != nil { + // panic(err) + // } + // err = i.AppendReturnRulesForMarking(vip.MangleChainName, "10.0.77.36/32") + // if err != nil { + // panic(err) + // } + // err = i.InsertMangeTableIntoPrerouting(vip.MangleChainName) + // if err != nil { + // panic(err) + // } + // err = i.InsertSourceNat("192.168.0.221", "10.0.77.36") + // if err != nil { + // panic(err) + // } + + // err = i.DumpChain(vip.MangleChainName) + // vip.ExampleNfct_Dump() + // if err == nil { + // return + // } + // } + // if os.Getenv("DELETE") != "" { + // err = i.DeleteManglePrerouting(vip.MangleChainName) + // if err != nil { + // panic(err) + // } + // err = i.DeleteMangleChain(vip.MangleChainName) + // if err != nil { + // panic(err) + // } + // err = i.DeleteSourceNat("10.0.77.36", "192.168.0.221") + // if err != nil { + // panic(err) + // } + // //err = i.DumpChain(vip.MangleChainName) + // vip.ExampleNfct_Dump() + // if err == nil { + // return + // } + // if err != nil { + // panic(err) + // } + // } cmd.Release.Version = Version cmd.Release.Build = Build cmd.Execute() diff --git a/pkg/egress/egress.go b/pkg/egress/egress.go new file mode 100644 index 00000000..95c52c3d --- /dev/null +++ b/pkg/egress/egress.go @@ -0,0 +1,175 @@ +package egress + +//https://github.com/Trojan295/kube-router/commit/d48fd0a275249eb44e272d7f936ac91610c987cd#diff-3b65e4098d69eede2c4abfedc10116dda8fa05b9e308c18c1cb62b1a3fc8c119 + +import ( + "bufio" + "bytes" + "fmt" + "net" + "os" + "os/exec" + "regexp" + "strconv" + "strings" + + "github.com/coreos/go-iptables/iptables" + log "github.com/sirupsen/logrus" +) + +const ( + preroutingMarkChain = "CHINCHILLA-PREROUTING-MARK" + postroutingSnatChain = "CHINCHILLA-POSTROUTING-SNAT" + + egressIPAnnotation = "egressSNAT.IPAddress" + egressFixedPortsAnnotation = "egressSNAT.FixedPorts" + + routingTableFile = "/opt/rt_tables" +) + +func iptablesChainExists(table string, chain string, it *iptables.IPTables) (bool, error) { + chains, err := it.ListChains(table) + if err != nil { + return false, err + } + + for _, c := range chains { + if c == chain { + return true, nil + } + } + return false, nil +} + +func iptablesEnsureChain(table string, chain string, it *iptables.IPTables) error { + if exists, err := iptablesChainExists(table, chain, it); err != nil { + return nil + } else if !exists { + return it.NewChain(table, chain) + } + return nil +} + +func iptablesEnsureRuleAtPosition(table, chain string, position int, it *iptables.IPTables, rule ...string) error { + if exists, err := it.Exists(table, chain, rule...); err != nil { + return err + } else if exists { + if err2 := it.Delete(table, chain, rule...); err2 != nil { + return err2 + } + } + + return it.Insert(table, chain, position, rule...) +} + +type routeTable struct { + ID int + Name string + Subnet net.IPNet +} + +func FindRouteTableForIP(ip net.IP, rts []routeTable) *routeTable { + for _, rt := range rts { + if rt.Subnet.Contains(ip) { + return &rt + } + } + return nil +} + +func EnsureRouteRule(rt *routeTable) error { + fwmark := fmt.Sprintf("%x/0xff", rt.ID) + + out, err := exec.Command("ip", "rule", "show", "fwmark", fwmark, "table", rt.Name).Output() + if err != nil { + return err + } + + if string(out) != "" { + return nil + } + + _, err = exec.Command("ip", "rule", "add", "fwmark", fwmark, "table", rt.Name).Output() + if err != nil { + return err + } + + return nil +} + +func GetRouteTables() ([]routeTable, error) { + tables := make([]routeTable, 0) + + fp, err := os.Open(routingTableFile) + if err != nil { + return tables, err + } + defer fp.Close() + + r := bufio.NewScanner(fp) + for r.Scan() { + line := strings.Trim(r.Text(), " ") + if strings.HasPrefix(line, "#") { + continue + } + + cols := strings.Fields(line) + name := cols[1] + ID, err := strconv.Atoi(cols[0]) + if err != nil { + log.Error("invalid route table entry in /etc/iproute2/rt_tables") + continue + } + + rt := routeTable{ + ID: ID, + Name: name, + } + + if rt.ID == 0 { + continue + } + + cidr, err := getCIDRForRouteTable(&rt) + if err != nil || cidr == nil { + continue + } + + rt.Subnet = *cidr + + tables = append(tables, rt) + } + + return tables, nil +} + +func getCIDRForRouteTable(rt *routeTable) (*net.IPNet, error) { + tableID := fmt.Sprintf("%d", rt.ID) + + out, err := exec.Command("ip", "rule", "show", "table", tableID).Output() + if err != nil { + return nil, err + } + + r := bufio.NewScanner(bytes.NewBuffer(out)) + + var cidr *net.IPNet = nil + + pattern := fmt.Sprintf(`\d+:.+from (.+) lookup.+`) + re := regexp.MustCompile(pattern) + + for r.Scan() { + line := r.Text() + result := re.FindStringSubmatch(line) + + if len(result) > 0 { + _, cidr, _ = net.ParseCIDR(result[1]) + if cidr != nil { + break + } + } + } + + return cidr, nil +} + diff --git a/pkg/manager/service_egress.go b/pkg/manager/service_egress.go new file mode 100644 index 00000000..3fde353b --- /dev/null +++ b/pkg/manager/service_egress.go @@ -0,0 +1,90 @@ +package manager + +import ( + "context" + "fmt" + "strings" + + "github.com/kube-vip/kube-vip/pkg/vip" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func (sm *Manager) configureEgress(vipIP, podIP string) error { + serviceCIDR, podCIDR, err := sm.AutoDiscoverCIDRs() + if err != nil { + serviceCIDR = "10.96.0.0/12" + podCIDR = "10.0.0.0/16" + } + i, err := vip.CreateIptablesClient() + if err != nil { + return fmt.Errorf("error Creating iptables client [%s]", err) + } + // Check if the kube-vip mangle chain exists, if not create it + exists, err := i.CheckMangleChain(vip.MangleChainName) + if err != nil { + return fmt.Errorf("error checking for existence of mangle chain [%s], error [%s]", vip.MangleChainName, err) + } + if !exists { + err = i.CreateMangleChain(vip.MangleChainName) + if err != nil { + return fmt.Errorf("error creating mangle chain [%s], error [%s]", vip.MangleChainName, err) + } + } + err = i.AppendReturnRulesForDestinationSubnet(vip.MangleChainName, podCIDR) + if err != nil { + panic(err) + } + err = i.AppendReturnRulesForDestinationSubnet(vip.MangleChainName, serviceCIDR) + if err != nil { + panic(err) + } + err = i.AppendReturnRulesForMarking(vip.MangleChainName, podIP+"/32") + if err != nil { + panic(err) + } + + err = i.InsertMangeTableIntoPrerouting(vip.MangleChainName) + if err != nil { + panic(err) + } + err = i.InsertSourceNat(vipIP, podIP) + if err != nil { + panic(err) + } + + _ = i.DumpChain(vip.MangleChainName) + err = vip.DeleteExistingSessions(podIP) + if err != nil { + return err + } + + return nil +} + +func (sm *Manager) AutoDiscoverCIDRs() (serviceCIDR, podCIDR string, err error) { + pod, err := sm.clientSet.CoreV1().Pods("kube-system").Get(context.TODO(), "kube-controller-manager", v1.GetOptions{}) + if err != nil { + return "", "", err + } + for flags := range pod.Spec.Containers[0].Command { + if strings.Contains(pod.Spec.Containers[0].Command[flags], "--cluster-cidr=") { + podCIDR = strings.ReplaceAll(pod.Spec.Containers[0].Command[flags], "--cluster-cidr=", "") + } + if strings.Contains(pod.Spec.Containers[0].Command[flags], "--service-cluster-ip-range=") { + serviceCIDR = strings.ReplaceAll(pod.Spec.Containers[0].Command[flags], "--service-cluster-ip-range=", "") + } + } + if podCIDR == "" || serviceCIDR == "" { + err = fmt.Errorf("unable to fully determine cluster CIDR configurations") + } + + return +} + +func TeardownEgress(podIP, serviceIP string) error { + i, err := vip.CreateIptablesClient() + if err != nil { + return fmt.Errorf("error Creating iptables client [%s]", err) + } + return i.DeleteSourceNat(podIP, serviceIP) +} diff --git a/pkg/manager/watch_services.go b/pkg/manager/watch_services.go index c2608d36..6c0a89d7 100644 --- a/pkg/manager/watch_services.go +++ b/pkg/manager/watch_services.go @@ -44,6 +44,9 @@ func (sm *Manager) servicesWatcher(ctx context.Context, serviceFunc func(context ch := rw.ResultChan() //defer rw.Stop() + // Used for tracking an active endpoint / pod + var podIP string + for event := range ch { sm.countServiceWatchEvent.With(prometheus.Labels{"type": string(event.Type)}).Add(1) @@ -70,7 +73,7 @@ func (sm *Manager) servicesWatcher(ctx context.Context, serviceFunc func(context if svc.Spec.ExternalTrafficPolicy == v1.ServiceExternalTrafficPolicyTypeLocal { ep, err := sm.clientSet.CoreV1().Endpoints(svc.Namespace).Get(ctx, svc.Name, metav1.GetOptions{}) if err != nil { - return fmt.Errorf("unable to parse service endpoints [%v]", err) + log.Errorf("unable to parse service endpoints [%v]", err) } exists := false for subset := range ep.Subsets { @@ -79,6 +82,7 @@ func (sm *Manager) servicesWatcher(ctx context.Context, serviceFunc func(context if ep.Subsets[subset].Addresses[address].NodeName != nil { if id == *ep.Subsets[subset].Addresses[address].NodeName { exists = true + podIP = ep.Subsets[subset].Addresses[address].IP } } } @@ -87,6 +91,14 @@ func (sm *Manager) servicesWatcher(ctx context.Context, serviceFunc func(context log.Warnf("loadBalancer has External Traffic Policy: Local, no local pods found") break } + // Check if we want to rewrite egress + if svc.Annotations["kube-vip.io/egress"] == "true" && exists { + // We will need to modify the iptables rules + err = sm.configureEgress(svc.Spec.LoadBalancerIP, podIP) + if err != nil { + log.Errorf("Error configuring egress for loadbalancer [%s]", err) + } + } } // Check the loadBalancer class @@ -143,6 +155,12 @@ func (sm *Manager) servicesWatcher(ctx context.Context, serviceFunc func(context break } + // We will need to tear down the egress + if svc.Annotations["kube-vip.io/egress"] == "true" { + log.Infof("service [%s] has an egress re-write enabled", svc.Name) + TeardownEgress(podIP, svc.Spec.LoadBalancerIP) + } + err = sm.deleteService(string(svc.UID)) if err != nil { log.Error(err) diff --git a/pkg/vip/egress.go b/pkg/vip/egress.go new file mode 100644 index 00000000..d2d1fc8f --- /dev/null +++ b/pkg/vip/egress.go @@ -0,0 +1,130 @@ +package vip + +import ( + "fmt" + + iptables "github.com/coreos/go-iptables/iptables" + log "github.com/sirupsen/logrus" + + ct "github.com/florianl/go-conntrack" +) + +//Notes: https://github.com/cloudnativelabs/kube-router/issues/434 + +// This file contains all of the functions related to changing SNAT for a +// pod so that it appears to be coming from a VIP. + +// 1. Create a new chain in the mangle table +// 2. Ignore (or RETURN) packets going to a service or other pod address +// 3. Mark packets coming from a pod +// 4. Add a rule in the mangle chain PREROUTING to jump to the new chain created above +// 5. Mark packets going through this host (not originating) (might not be needed) +// 6. Perform source nating on marked packets + +// Create new iptables client +// Test to find out what exists before hand + +const MangleChainName = "KUBE-VIP-EGRESS" +const iptableCommentSuffiv = "KUBE-VIP-" + +// DEBUG +const podSubnet = "10.0.0.0/26" +const serviceSubnet = "10.96.0.0/12" + +// Create new table + +func (e *Egress) DumpChain(name string) error { + log.Infof("Dumping chain [%s]", name) + c, err := e.ipTablesClient.List("mangle", name) + if err != nil { + return err + } + for x := range c { + log.Infof("Rule -> %s", c[x]) + } + return nil +} + +type Egress struct { + ipTablesClient *iptables.IPTables +} + +func CreateIptablesClient() (*Egress, error) { + e := new(Egress) + var err error + e.ipTablesClient, err = iptables.New() + return e, err +} + +func (e *Egress) CheckMangleChain(name string) (bool, error) { + + log.Infof("[Egress] Cheching for Chain [%s]", name) + return e.ipTablesClient.ChainExists("mangle", name) + +} + +func (e *Egress) DeleteMangleChain(name string) error { + return e.ipTablesClient.ClearAndDeleteChain("mangle", name) +} + +func (e *Egress) DeleteManglePrerouting(name string) error { + return e.ipTablesClient.Delete("mangle", "PREROUTING", "-j", name) +} + +func (e *Egress) DeleteSourceNat(podIP, vip string) error { + return e.ipTablesClient.Delete("nat", "POSTROUTING", "-s", podIP+"/32", "-m", "mark", "--mark", "64/64", "-j", "SNAT", "--to-source", vip) +} + +func (e *Egress) CreateMangleChain(name string) error { + + log.Infof("[Egress] Creating Chain [%s]", name) + // Creates a new chain in the mangle table + return e.ipTablesClient.NewChain("mangle", name) + +} +func (e *Egress) AppendReturnRulesForDestinationSubnet(name, subnet string) error { + log.Infof("[Egress] Adding jump for subnet [%s] to RETURN to previous chain/rules", subnet) + return e.ipTablesClient.Append("mangle", name, "-d", subnet, "-j", "RETURN") +} + +func (e *Egress) AppendReturnRulesForMarking(name, subnet string) error { + log.Infof("[Egress] Marking packets on network [%s]", subnet) + return e.ipTablesClient.Append("mangle", name, "-s", subnet, "-j", "MARK", "--set-mark", "64/64") +} + +func (e *Egress) InsertMangeTableIntoPrerouting(name string) error { + log.Infof("[Egress] Adding jump from mangle prerouting to [%s]", name) + return e.ipTablesClient.Insert("mangle", "PREROUTING", 1, "-j", name) +} + +func (e *Egress) InsertSourceNat(vip, podIP string) error { + log.Infof("[Egress] Adding jump from mangle prerouting to [%s]", "name") + return e.ipTablesClient.Insert("nat", "POSTROUTING", 1, "-s", podIP+"/32", "-m", "mark", "--mark", "64/64", "-j", "SNAT", "--to-source", vip) +} + +func DeleteExistingSessions(podIP string) error { + nfct, err := ct.Open(&ct.Config{}) + if err != nil { + fmt.Println("could not create nfct:", err) + return nil + } + defer nfct.Close() + sessions, err := nfct.Dump(ct.Conntrack, ct.IPv4) + if err != nil { + fmt.Println("could not dump sessions:", err) + return nil + } + + for _, session := range sessions { + if session.Origin.Dst.String() == podIP /*&& *session.Origin.Proto.DstPort == uint16(destinationPort)*/ { + fmt.Printf("Source -> %s Destination -> %s:%d\n", session.Origin.Src.String(), session.Origin.Dst.String(), *session.Origin.Proto.DstPort) + + err = nfct.Delete(ct.Conntrack, ct.IPv4, session) + if err != nil { + fmt.Println("could not delete sessions:", err) + } + + } + } + return nil +}