Skip to content

Commit

Permalink
Run cAdvisor inside the Kubelet.
Browse files Browse the repository at this point in the history
cAdvisor is started as a Kubelet dependency during startup of the
Kubelet before the sync loops start.
  • Loading branch information
vmarmol committed Mar 13, 2015
1 parent b00e82e commit dc96ea6
Show file tree
Hide file tree
Showing 7 changed files with 204 additions and 31 deletions.
6 changes: 4 additions & 2 deletions cmd/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
nodeControllerPkg "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/controller"
replicationControllerPkg "github.com/GoogleCloudPlatform/kubernetes/pkg/controller"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/cadvisor"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/volume/empty_dir"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
Expand Down Expand Up @@ -211,17 +212,18 @@ func startComponents(manifestURL string) (apiServerURL string) {

nodeController := nodeControllerPkg.NewNodeController(nil, "", machineList, nodeResources, cl, fakeKubeletClient{}, 10, 5*time.Minute)
nodeController.Run(5*time.Second, true, false)
cadvisorInterface := new(cadvisor.Fake)

// Kubelet (localhost)
testRootDir := makeTempDirOrDie("kubelet_integ_1.")
glog.Infof("Using %s as root dir for kubelet #1", testRootDir)
kubeletapp.SimpleRunKubelet(cl, &fakeDocker1, machineList[0], testRootDir, manifestURL, "127.0.0.1", 10250, api.NamespaceDefault, empty_dir.ProbeVolumePlugins(), nil)
kubeletapp.SimpleRunKubelet(cl, &fakeDocker1, machineList[0], testRootDir, manifestURL, "127.0.0.1", 10250, api.NamespaceDefault, empty_dir.ProbeVolumePlugins(), nil, cadvisorInterface)
// Kubelet (machine)
// Create a second kubelet so that the guestbook example's two redis slaves both
// have a place they can schedule.
testRootDir = makeTempDirOrDie("kubelet_integ_2.")
glog.Infof("Using %s as root dir for kubelet #2", testRootDir)
kubeletapp.SimpleRunKubelet(cl, &fakeDocker2, machineList[1], testRootDir, "", "127.0.0.1", 10251, api.NamespaceDefault, empty_dir.ProbeVolumePlugins(), nil)
kubeletapp.SimpleRunKubelet(cl, &fakeDocker2, machineList[1], testRootDir, "", "127.0.0.1", 10251, api.NamespaceDefault, empty_dir.ProbeVolumePlugins(), nil, cadvisorInterface)

return apiServer.URL
}
Expand Down
32 changes: 14 additions & 18 deletions cmd/kubelet/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"fmt"
"math/rand"
"net"
"strconv"
"time"

"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
Expand All @@ -39,7 +38,6 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"

"github.com/golang/glog"
cadvisorClient "github.com/google/cadvisor/client"
"github.com/spf13/pflag"
)

Expand Down Expand Up @@ -69,7 +67,7 @@ type KubeletServer struct {
MinimumGCAge time.Duration
MaxContainerCount int
AuthPath string
CAdvisorPort uint
CadvisorPort uint
OOMScoreAdj int
APIServerList util.StringList
ClusterDomain string
Expand All @@ -95,7 +93,7 @@ func NewKubeletServer() *KubeletServer {
EnableDebuggingHandlers: true,
MinimumGCAge: 1 * time.Minute,
MaxContainerCount: 5,
CAdvisorPort: 4194,
CadvisorPort: 4194,
OOMScoreAdj: -900,
MasterServiceNamespace: api.NamespaceDefault,
}
Expand Down Expand Up @@ -124,7 +122,7 @@ func (s *KubeletServer) AddFlags(fs *pflag.FlagSet) {
fs.DurationVar(&s.MinimumGCAge, "minimum_container_ttl_duration", s.MinimumGCAge, "Minimum age for a finished container before it is garbage collected. Examples: '300ms', '10s' or '2h45m'")
fs.IntVar(&s.MaxContainerCount, "maximum_dead_containers_per_container", s.MaxContainerCount, "Maximum number of old instances of a container to retain per container. Each container takes up some disk space. Default: 5.")
fs.StringVar(&s.AuthPath, "auth_path", s.AuthPath, "Path to .kubernetes_auth file, specifying how to authenticate to API server.")
fs.UintVar(&s.CAdvisorPort, "cadvisor_port", s.CAdvisorPort, "The port of the localhost cAdvisor endpoint")
fs.UintVar(&s.CadvisorPort, "cadvisor_port", s.CadvisorPort, "The port of the localhost cAdvisor endpoint")
fs.IntVar(&s.OOMScoreAdj, "oom_score_adj", s.OOMScoreAdj, "The oom_score_adj value for kubelet process. Values must be within the range [-1000, 1000]")
fs.Var(&s.APIServerList, "api_servers", "List of Kubernetes API servers for publishing events, and reading pods and services. (ip:port), comma separated.")
fs.StringVar(&s.ClusterDomain, "cluster_domain", s.ClusterDomain, "Domain for this cluster. If set, kubelet will configure all containers to search this domain in addition to the host's search domains")
Expand Down Expand Up @@ -152,6 +150,11 @@ func (s *KubeletServer) Run(_ []string) error {

credentialprovider.SetPreferredDockercfgPath(s.RootDirectory)

cadvisorInterface, err := cadvisor.New(s.CadvisorPort)
if err != nil {
return err
}

kcfg := KubeletConfig{
Address: s.Address,
AllowPrivileged: s.AllowPrivileged,
Expand All @@ -172,7 +175,7 @@ func (s *KubeletServer) Run(_ []string) error {
ClusterDNS: s.ClusterDNS,
Runonce: s.RunOnce,
Port: s.Port,
CAdvisorPort: s.CAdvisorPort,
CadvisorInterface: cadvisorInterface,
EnableServer: s.EnableServer,
EnableDebuggingHandlers: s.EnableDebuggingHandlers,
DockerClient: dockertools.ConnectToDockerOrDie(s.DockerEndpoint),
Expand Down Expand Up @@ -240,7 +243,8 @@ func SimpleRunKubelet(client *client.Client,
port uint,
masterServiceNamespace string,
volumePlugins []volume.Plugin,
tlsOptions *kubelet.TLSOptions) {
tlsOptions *kubelet.TLSOptions,
cadvisorInterface cadvisor.Interface) {
kcfg := KubeletConfig{
KubeClient: client,
DockerClient: dockerClient,
Expand All @@ -259,6 +263,7 @@ func SimpleRunKubelet(client *client.Client,
MasterServiceNamespace: masterServiceNamespace,
VolumePlugins: volumePlugins,
TLSOptions: tlsOptions,
CadvisorInterface: cadvisorInterface,
}
RunKubelet(&kcfg)
}
Expand Down Expand Up @@ -336,7 +341,7 @@ func makePodSourceConfig(kc *KubeletConfig) *config.PodConfig {
type KubeletConfig struct {
KubeClient *client.Client
DockerClient dockertools.DockerInterface
CAdvisorPort uint
CadvisorInterface cadvisor.Interface
Address util.IP
AllowPrivileged bool
HostnameOverride string
Expand Down Expand Up @@ -379,15 +384,6 @@ func createAndInitKubelet(kc *KubeletConfig, pc *config.PodConfig) (*kubelet.Kub
kubeClient = kc.KubeClient
}

cc, err := cadvisorClient.NewClient("http://127.0.0.1:" + strconv.Itoa(int(kc.CAdvisorPort)))
if err != nil {
return nil, err
}
cadvisorInterface, err := cadvisor.New(cc)
if err != nil {
return nil, err
}

k, err := kubelet.NewMainKubelet(
kc.Hostname,
kc.DockerClient,
Expand All @@ -406,7 +402,7 @@ func createAndInitKubelet(kc *KubeletConfig, pc *config.PodConfig) (*kubelet.Kub
kc.VolumePlugins,
kc.StreamingConnectionIdleTimeout,
kc.Recorder,
cadvisorInterface,
kc.CadvisorInterface,
kc.StatusUpdateFrequency)

if err != nil {
Expand Down
7 changes: 6 additions & 1 deletion cmd/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
nodeControllerPkg "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/controller"
"github.com/GoogleCloudPlatform/kubernetes/pkg/controller"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/cadvisor"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/master"
"github.com/GoogleCloudPlatform/kubernetes/pkg/master/ports"
Expand Down Expand Up @@ -146,7 +147,11 @@ func startComponents(etcdClient tools.EtcdClient, cl *client.Client, addr net.IP
runControllerManager(machineList, cl, *nodeMilliCPU, *nodeMemory)

dockerClient := dockertools.ConnectToDockerOrDie(*dockerEndpoint)
kubeletapp.SimpleRunKubelet(cl, dockerClient, machineList[0], "/tmp/kubernetes", "", "127.0.0.1", 10250, *masterServiceNamespace, kubeletapp.ProbeVolumePlugins(), nil)
cadvisorInterface, err := cadvisor.New(0)
if err != nil {
glog.Fatalf("Failed to create cAdvisor: %v", err)
}
kubeletapp.SimpleRunKubelet(cl, dockerClient, machineList[0], "/tmp/kubernetes", "", "127.0.0.1", 10250, *masterServiceNamespace, kubeletapp.ProbeVolumePlugins(), nil, cadvisorInterface)
}

func newApiClient(addr net.IP, port int) *client.Client {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,23 @@ limitations under the License.
package cadvisor

import (
"github.com/google/cadvisor/client"
cadvisorApi "github.com/google/cadvisor/info/v1"
)

type cadvisorClient struct {
*client.Client
// Fake cAdvisor implementation.
type Fake struct {
}

func New(cc *client.Client) (Interface, error) {
return &cadvisorClient{
Client: cc,
}, nil
var _ Interface = new(Fake)

func (c *Fake) ContainerInfo(name string, req *cadvisorApi.ContainerInfoRequest) (*cadvisorApi.ContainerInfo, error) {
return new(cadvisorApi.ContainerInfo), nil
}

func (c *Fake) DockerContainer(name string, req *cadvisorApi.ContainerInfoRequest) (cadvisorApi.ContainerInfo, error) {
return cadvisorApi.ContainerInfo{}, nil
}

func (c *Fake) MachineInfo() (*cadvisorApi.MachineInfo, error) {
return new(cadvisorApi.MachineInfo), nil
}
112 changes: 112 additions & 0 deletions pkg/kubelet/cadvisor/cadvisor_linux.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// +build cgo,linux

/*
Copyright 2015 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package cadvisor

import (
"fmt"
"net/http"
"time"

"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/golang/glog"
cadvisorHttp "github.com/google/cadvisor/http"
cadvisorApi "github.com/google/cadvisor/info/v1"
"github.com/google/cadvisor/manager"
"github.com/google/cadvisor/storage/memory"
"github.com/google/cadvisor/utils/sysfs"
)

type cadvisorClient struct {
manager.Manager
}

var _ Interface = new(cadvisorClient)

// TODO(vmarmol): Make configurable.
// The number of stats to keep in memory.
const statsToCache = 60

// Creates a cAdvisor and exports its API on the specified port if port > 0.
func New(port uint) (Interface, error) {
sysFs, err := sysfs.NewRealSysFs()
if err != nil {
return nil, err
}

// Create and start the cAdvisor container manager.
m, err := manager.New(memory.New(statsToCache, nil), sysFs)
if err != nil {
return nil, err
}
err = m.Start()
if err != nil {
return nil, err
}

cadvisorClient := &cadvisorClient{
Manager: m,
}

// Export the HTTP endpoint if a port was specified.
if port > 0 {
err = cadvisorClient.exportHTTP(port)
if err != nil {
return nil, err
}
}

return cadvisorClient, nil
}

func (self *cadvisorClient) exportHTTP(port uint) error {
mux := http.NewServeMux()
err := cadvisorHttp.RegisterHandlers(mux, self, "", "", "", "", "/metrics")
if err != nil {
return err
}

serv := &http.Server{
Addr: fmt.Sprintf(":%d", port),
Handler: mux,
}

// TODO(vmarmol): Remove this when the cAdvisor port is once again free.
// If export failed, retry in the background until we are able to bind.
// This allows an existing cAdvisor to be killed before this one registers.
go func() {
defer util.HandleCrash()

err := serv.ListenAndServe()
for err != nil {
glog.Infof("Failed to register cAdvisor on port %d, retrying. Error: %v", port, err)
time.Sleep(time.Minute)
err = serv.ListenAndServe()
}
}()

return nil
}

func (self *cadvisorClient) ContainerInfo(name string, req *cadvisorApi.ContainerInfoRequest) (*cadvisorApi.ContainerInfo, error) {
return self.GetContainerInfo(name, req)
}

func (self *cadvisorClient) MachineInfo() (*cadvisorApi.MachineInfo, error) {
return self.GetMachineInfo()
}
8 changes: 5 additions & 3 deletions pkg/kubelet/cadvisor/cadvisor_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,21 @@ type Mock struct {
mock.Mock
}

// ContainerInfo is a mock implementation of CadvisorInterface.ContainerInfo.
var _ Interface = new(Mock)

// ContainerInfo is a mock implementation of Interface.ContainerInfo.
func (c *Mock) ContainerInfo(name string, req *cadvisorApi.ContainerInfoRequest) (*cadvisorApi.ContainerInfo, error) {
args := c.Called(name, req)
return args.Get(0).(*cadvisorApi.ContainerInfo), args.Error(1)
}

// DockerContainer is a mock implementation of CadvisorInterface.DockerContainer.
// DockerContainer is a mock implementation of Interface.DockerContainer.
func (c *Mock) DockerContainer(name string, req *cadvisorApi.ContainerInfoRequest) (cadvisorApi.ContainerInfo, error) {
args := c.Called(name, req)
return args.Get(0).(cadvisorApi.ContainerInfo), args.Error(1)
}

// MachineInfo is a mock implementation of CadvisorInterface.MachineInfo.
// MachineInfo is a mock implementation of Interface.MachineInfo.
func (c *Mock) MachineInfo() (*cadvisorApi.MachineInfo, error) {
args := c.Called()
return args.Get(0).(*cadvisorApi.MachineInfo), args.Error(1)
Expand Down
48 changes: 48 additions & 0 deletions pkg/kubelet/cadvisor/cadvisor_unsupported.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// +build !cgo !linux

/*
Copyright 2015 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package cadvisor

import (
"errors"

cadvisorApi "github.com/google/cadvisor/info/v1"
)

type cadvisorUnsupported struct {
}

var _ Interface = new(cadvisorUnsupported)

func New(port uint) (Interface, error) {
return &cadvisorUnsupported{}, nil
}

var unsupportedErr = errors.New("cAdvisor is unsupported in this build")

func (self *cadvisorUnsupported) DockerContainer(name string, req *cadvisorApi.ContainerInfoRequest) (cadvisorApi.ContainerInfo, error) {
return cadvisorApi.ContainerInfo{}, unsupportedErr
}

func (self *cadvisorUnsupported) ContainerInfo(name string, req *cadvisorApi.ContainerInfoRequest) (*cadvisorApi.ContainerInfo, error) {
return nil, unsupportedErr
}

func (self *cadvisorUnsupported) MachineInfo() (*cadvisorApi.MachineInfo, error) {
return nil, unsupportedErr
}

0 comments on commit dc96ea6

Please sign in to comment.