Skip to content

Commit

Permalink
Refactor DNS and network-agent setup
Browse files Browse the repository at this point in the history
  • Loading branch information
ldx committed Apr 24, 2020
1 parent 5e774de commit 70fd702
Show file tree
Hide file tree
Showing 6 changed files with 120 additions and 146 deletions.
2 changes: 2 additions & 0 deletions cmd/virtual-kubelet/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ import "github.com/spf13/pflag"
type ServerConfig struct {
DebugServer bool
NetworkAgentSecret string
ClusterDNS string
}

func (c *ServerConfig) FlagSet() *pflag.FlagSet {
flags := pflag.NewFlagSet("serverconfig", pflag.ContinueOnError)
flags.BoolVar(&c.DebugServer, "debug-server", c.DebugServer, "Enable a listener in the server for inspecting internal kip structures.")
flags.StringVar(&c.NetworkAgentSecret, "network-agent-secret", c.NetworkAgentSecret, "Service account secret for the cell network agent, in the form of <namespace>/<name>")
flags.StringVar(&c.ClusterDNS, "cluster-dns", c.ClusterDNS, "Default cluster DNS server to use")
return flags
}
2 changes: 2 additions & 0 deletions cmd/virtual-kubelet/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ func main() {
internalIP,
serverURL,
serverConfig.NetworkAgentSecret,
serverConfig.ClusterDNS,
cfg.KubeClusterDomain,
cfg.DaemonPort,
serverConfig.DebugServer,
cfg.ResourceManager,
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/deploy_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
"github.com/elotl/kip/pkg/util"
"github.com/kubernetes/kubernetes/pkg/kubelet/network/dns"
"github.com/virtual-kubelet/node-cli/manager"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/clientcmd"
Expand Down
87 changes: 87 additions & 0 deletions pkg/server/helpers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package server

import (
"fmt"
"io/ioutil"
"net"
"strings"

"github.com/elotl/kip/pkg/server/cloud"
"github.com/elotl/kip/pkg/util"
"github.com/elotl/kip/pkg/util/k8s"
"github.com/elotl/kip/pkg/util/k8s/eventrecorder"
"github.com/kubernetes/kubernetes/pkg/kubelet/network/dns"
"github.com/virtual-kubelet/node-cli/manager"
v1 "k8s.io/api/core/v1"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
"k8s.io/klog"
)

func createResolverFile(nameservers, searches []string) (string, error) {
tmpf, err := ioutil.TempFile("", "resolv-conf")
if err != nil {
klog.Warningf("creating resolver tempfile: %v", err)
return "", err
}
defer tmpf.Close()
for _, ns := range nameservers {
tmpf.Write([]byte(fmt.Sprintf("nameserver %s\n", ns)))
}
if len(searches) > 0 {
searchList := strings.Join(searches, " ")
tmpf.Write([]byte(fmt.Sprintf("search %s\n", searchList)))
}
resolverConfig := tmpf.Name()
return resolverConfig, nil
}

func createDNSConfigurer(kubernetesNodeName, clusterDNS, clusterDomain string, cloudClient cloud.CloudClient, rm *manager.ResourceManager) (*dns.Configurer, error) {
loggingEventRecorder := eventrecorder.NewLoggingEventRecorder(4)
nodeRef := &v1.ObjectReference{
Kind: "Node",
APIVersion: "v1",
Name: kubernetesNodeName,
}
nameservers, searches, err := cloudClient.GetDNSInfo()
if err != nil {
return nil, util.WrapError(err, "getting cloud DNS info")
}
klog.V(2).Infof("host nameservers %v searches %v", nameservers, searches)
resolverConfig, err := createResolverFile(nameservers, searches)
ip := net.ParseIP(clusterDNS)
if ip == nil || ip.IsUnspecified() {
services, err := rm.ListServices()
if err != nil {
return nil, util.WrapError(err, "looking up kube-dns service")
}
for _, svc := range services {
if svc.Name != "kube-dns" || svc.Namespace != "kube-system" {
continue
}
ip = net.ParseIP(svc.Spec.ClusterIP)
}
}
if ip == nil || ip.IsUnspecified() {
return nil, fmt.Errorf("missing or misconfigured kube-dns service")
}
return dns.NewConfigurer(
loggingEventRecorder,
nodeRef,
nil,
[]net.IP{ip},
clusterDomain,
resolverConfig,
), nil
}

func createNetworkAgentKubeconfig(kubernetesNodeName, networkAgentSecret, serverURL string, rm *manager.ResourceManager) (*clientcmdapi.Config, error) {
kc, err := k8s.CreateNetworkAgentKubeconfig(
rm, serverURL, networkAgentSecret)
if err != nil {
return nil, util.WrapError(err, "creating network-agent kubeconfig")
}
if err := k8s.ValidateKubeconfig(kc); err != nil {
return nil, util.WrapError(err, "validating network-agent kubeconfig")
}
return kc, err
}
129 changes: 0 additions & 129 deletions pkg/server/pod_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@ package server
import (
"encoding/json"
"fmt"
"io/ioutil"
"net"
"os"
"strings"
"sync"
"time"
Expand All @@ -35,12 +33,9 @@ import (
"github.com/elotl/kip/pkg/server/registry"
"github.com/elotl/kip/pkg/util"
"github.com/elotl/kip/pkg/util/conmap"
"github.com/elotl/kip/pkg/util/k8s"
"github.com/elotl/kip/pkg/util/k8s/eventrecorder"
"github.com/elotl/kip/pkg/util/stats"
"github.com/kubernetes/kubernetes/pkg/kubelet/network/dns"
"github.com/virtual-kubelet/node-cli/manager"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
"k8s.io/klog"
Expand Down Expand Up @@ -76,8 +71,6 @@ type PodController struct {
cleanTimer stats.LoopTimer
lastStatusReply *conmap.StringTimeTime
kubernetesNodeName string
serverURL string
networkAgentSecret string
networkAgentKubeconfig *clientcmdapi.Config
dnsConfigurer *dns.Configurer
}
Expand All @@ -93,55 +86,7 @@ type FullPodStatus struct {
Error error
}

func (c *PodController) createNetworkAgentKubeconfig() {
defer func() {
if c.networkAgentKubeconfig == nil {
klog.Fatal("cell network agent won't run")
}
}()
klog.V(2).Infof("checking kubernetes node name")
c.kubernetesNodeName = os.Getenv("NODE_NAME")
if c.kubernetesNodeName == "" {
klog.Warningf("failed to get NODE_NAME")
return
}
klog.V(2).Infof("creating cell network agent kubeconfig")
var (
kc *clientcmdapi.Config
err error
)
// TODO: get rid of this retry loop once VK is fixed.
err = util.Retry(
// Timeout.
15*time.Second,
func() error {
// Get kubeconfig. This will fail if the informers have not started
// up yet.
kc, err = k8s.CreateNetworkAgentKubeconfig(
c.resourceManager, c.serverURL, c.networkAgentSecret)
return err
},
func(err error) bool {
// Always retry.
return true
},
)
if err != nil {
klog.Warningf("creating kubeconfig: %v", err)
return
}
err = k8s.ValidateKubeconfig(kc)
if err != nil {
klog.Warningf("validating kubeconfig: %v", err)
return
}
c.networkAgentKubeconfig = kc
klog.V(2).Infof("created cell network agent kubeconfig")
}

func (c *PodController) Start(quit <-chan struct{}, wg *sync.WaitGroup) {
c.createNetworkAgentKubeconfig()
c.createDNSConfigurer()
c.registerEventHandlers()
c.failDispatchingPods()
go c.ControlLoop(quit, wg)
Expand Down Expand Up @@ -1091,77 +1036,3 @@ func (c *PodController) ControlPods() {
}
}
}

func (c *PodController) createDNSConfigurer() {
if err := c.doCreateDNSConfigurer(); err != nil {
klog.Fatalf("creating DNS configurer: %v", err)
}
}

func createResolverFile(nameservers, searches []string) (string, error) {
tmpf, err := ioutil.TempFile("", "resolv-conf")
if err != nil {
klog.Warningf("creating resolver tempfile: %v", err)
return "", err
}
defer tmpf.Close()
for _, ns := range nameservers {
tmpf.Write([]byte(fmt.Sprintf("nameserver %s\n", ns)))
}
if len(searches) > 0 {
tmpf.Write(
[]byte(fmt.Sprintf("search %s\n", strings.Join(searches, " "))))
}
resolverConfig := tmpf.Name()
return resolverConfig, nil
}

func (c *PodController) doCreateDNSConfigurer() error {
loggingEventRecorder := eventrecorder.NewLoggingEventRecorder(4)
nodeRef := &v1.ObjectReference{
Kind: "Node",
APIVersion: "v1",
Name: c.kubernetesNodeName,
}
// ClusterDNS, clusterDomain and resolverConfig can be overridden in the
// kubelet via the config file or command line parameters. For clusterDNS,
// we can look up the VIP of kube-dns assuming this is a standard setup
// where the name of the service is "kube-dns" and it resides in the
// "kube-system" namespace. The other two are tricky, though. We might
// want to optionally accept a kubelet config file to be able to better
// match kubelet behavior.
clusterDomain := "cluster.local"
nameservers, searches, err := c.cloudClient.GetDNSInfo()
if err != nil {
klog.Warningf("getting cloud DNS info: %v", err)
return err
}
klog.V(2).Infof("host nameservers %v searches %v", nameservers, searches)
resolverConfig, err := createResolverFile(nameservers, searches)
clusterDNS := net.IP{}
services, err := c.resourceManager.ListServices()
if err != nil {
klog.Warningf("looking up kube-dns service: %v", err)
return err
}
for _, svc := range services {
if svc.Name != "kube-dns" || svc.Namespace != "kube-system" {
continue
}
clusterDNS = net.ParseIP(svc.Spec.ClusterIP)
}
if clusterDNS.IsUnspecified() {
msg := fmt.Sprintf("missing or invalid kube-dns service")
klog.Warningf(msg)
return fmt.Errorf(msg)
}
c.dnsConfigurer = dns.NewConfigurer(
loggingEventRecorder,
nodeRef,
nil,
[]net.IP{clusterDNS},
clusterDomain,
resolverConfig,
)
return nil
}
44 changes: 28 additions & 16 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func ensureRegionUnchanged(etcdClient *etcd.SimpleEtcd, region string) error {
}

// InstanceProvider should implement node.PodLifecycleHandler
func NewInstanceProvider(configFilePath, nodeName, internalIP, serverURL, networkAgentSecret string, daemonEndpointPort int32, debugServer bool, rm *manager.ResourceManager, systemQuit <-chan struct{}) (*InstanceProvider, error) {
func NewInstanceProvider(configFilePath, nodeName, internalIP, serverURL, networkAgentSecret, clusterDNS, clusterDomain string, daemonEndpointPort int32, debugServer bool, rm *manager.ResourceManager, systemQuit <-chan struct{}) (*InstanceProvider, error) {
systemWG := &sync.WaitGroup{}

execer := utilexec.New()
Expand Down Expand Up @@ -271,26 +271,38 @@ func NewInstanceProvider(configFilePath, nodeName, internalIP, serverURL, networ
"Metric": metricsRegistry,
}

dnsConfigurer, err := createDNSConfigurer(
nodeName, clusterDNS, clusterDomain, cloudClient, rm)
if err != nil {
return nil, util.WrapError(err, "creating DNS configurer")
}

networkAgentKubeconfig, err := createNetworkAgentKubeconfig(
nodeName, networkAgentSecret, serverURL, rm)
if err != nil {
return nil, util.WrapError(err, "creating network-agent kubeconfig")
}

connectWithPublicIPs := cloudClient.ConnectWithPublicIPs()
itzoClientFactory := nodeclient.NewItzoFactory(
&certFactory.Root, *clientCert, connectWithPublicIPs)
nodeDispenser := nodemanager.NewNodeDispenser()
podController := &PodController{
podRegistry: podRegistry,
logRegistry: logRegistry,
metricsRegistry: metricsRegistry,
nodeLister: nodeRegistry,
resourceManager: rm,
nodeDispenser: nodeDispenser,
nodeClientFactory: itzoClientFactory,
events: eventSystem,
cloudClient: cloudClient,
controllerID: controllerID,
nametag: nametag,
lastStatusReply: conmap.NewStringTimeTime(),
serverURL: serverURL,
networkAgentSecret: networkAgentSecret,
kubernetesNodeName: nodeName,
podRegistry: podRegistry,
logRegistry: logRegistry,
metricsRegistry: metricsRegistry,
nodeLister: nodeRegistry,
resourceManager: rm,
nodeDispenser: nodeDispenser,
nodeClientFactory: itzoClientFactory,
events: eventSystem,
cloudClient: cloudClient,
controllerID: controllerID,
nametag: nametag,
lastStatusReply: conmap.NewStringTimeTime(),
kubernetesNodeName: nodeName,
dnsConfigurer: dnsConfigurer,
networkAgentKubeconfig: networkAgentKubeconfig,
}
imageIdCache := timeoutmap.New(false, nil)
cloudInitFile, err := cloudinitfile.New(serverConfigFile.Cells.CloudInitFile)
Expand Down

0 comments on commit 70fd702

Please sign in to comment.