Skip to content

Commit

Permalink
kubelet post node status to master
Browse files Browse the repository at this point in the history
  • Loading branch information
Deyuan Deng authored and ddysher committed Mar 11, 2015
1 parent 6d465c4 commit 9982aaa
Show file tree
Hide file tree
Showing 6 changed files with 414 additions and 66 deletions.
2 changes: 1 addition & 1 deletion cmd/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func startComponents(manifestURL string) (apiServerURL string) {
nodeResources := &api.NodeResources{}

nodeController := nodeControllerPkg.NewNodeController(nil, "", machineList, nodeResources, cl, fakeKubeletClient{}, 10, 5*time.Minute)
nodeController.Run(5*time.Second, true, true)
nodeController.Run(5*time.Second, true, false)

// Kubelet (localhost)
testRootDir := makeTempDirOrDie("kubelet_integ_1.")
Expand Down
2 changes: 1 addition & 1 deletion cmd/kube-controller-manager/app/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func NewCMServer() *CMServer {
NodeMilliCPU: 1000,
NodeMemory: resource.MustParse("3Gi"),
SyncNodeList: true,
SyncNodeStatus: true,
SyncNodeStatus: false,
KubeletConfig: client.KubeletConfig{
Port: ports.KubeletPort,
EnableHttps: false,
Expand Down
21 changes: 14 additions & 7 deletions cmd/kubelet/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type KubeletServer struct {
SyncFrequency time.Duration
FileCheckFrequency time.Duration
HTTPCheckFrequency time.Duration
StatusUpdateFrequency time.Duration
ManifestURL string
EnableServer bool
Address util.IP
Expand Down Expand Up @@ -82,12 +83,13 @@ type KubeletServer struct {
// NewKubeletServer will create a new KubeletServer with default values.
func NewKubeletServer() *KubeletServer {
return &KubeletServer{
SyncFrequency: 10 * time.Second,
FileCheckFrequency: 20 * time.Second,
HTTPCheckFrequency: 20 * time.Second,
EnableServer: true,
Address: util.IP(net.ParseIP("127.0.0.1")),
Port: ports.KubeletPort,
SyncFrequency: 10 * time.Second,
FileCheckFrequency: 20 * time.Second,
HTTPCheckFrequency: 20 * time.Second,
StatusUpdateFrequency: 20 * time.Second,
EnableServer: true,
Address: util.IP(net.ParseIP("127.0.0.1")),
Port: ports.KubeletPort,
PodInfraContainerImage: kubelet.PodInfraContainerImage,
RootDirectory: defaultRootDir,
RegistryBurst: 10,
Expand All @@ -104,6 +106,7 @@ func NewKubeletServer() *KubeletServer {
func (s *KubeletServer) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&s.Config, "config", s.Config, "Path to the config file or directory of files")
fs.DurationVar(&s.SyncFrequency, "sync_frequency", s.SyncFrequency, "Max period between synchronizing running containers and config")
fs.DurationVar(&s.StatusUpdateFrequency, "status_update_frequency", s.StatusUpdateFrequency, "Duration between posting node status to master")
fs.DurationVar(&s.FileCheckFrequency, "file_check_frequency", s.FileCheckFrequency, "Duration between checking config files for new data")
fs.DurationVar(&s.HTTPCheckFrequency, "http_check_frequency", s.HTTPCheckFrequency, "Duration between checking http for new data")
fs.StringVar(&s.ManifestURL, "manifest_url", s.ManifestURL, "URL for accessing the container manifest")
Expand Down Expand Up @@ -157,6 +160,7 @@ func (s *KubeletServer) Run(_ []string) error {
RootDirectory: s.RootDirectory,
ConfigFile: s.Config,
ManifestURL: s.ManifestURL,
StatusUpdateFrequency: s.StatusUpdateFrequency,
FileCheckFrequency: s.FileCheckFrequency,
HTTPCheckFrequency: s.HTTPCheckFrequency,
PodInfraContainerImage: s.PodInfraContainerImage,
Expand Down Expand Up @@ -250,6 +254,7 @@ func SimpleRunKubelet(client *client.Client,
Address: util.IP(net.ParseIP(address)),
EnableServer: true,
EnableDebuggingHandlers: true,
StatusUpdateFrequency: 3 * time.Second,
SyncFrequency: 3 * time.Second,
MinimumGCAge: 10 * time.Second,
MaxContainerCount: 5,
Expand Down Expand Up @@ -345,6 +350,7 @@ type KubeletConfig struct {
RootDirectory string
ConfigFile string
ManifestURL string
StatusUpdateFrequency time.Duration
FileCheckFrequency time.Duration
HTTPCheckFrequency time.Duration
Hostname string
Expand Down Expand Up @@ -408,7 +414,8 @@ func createAndInitKubelet(kc *KubeletConfig, pc *config.PodConfig) (*kubelet.Kub
kc.VolumePlugins,
kc.StreamingConnectionIdleTimeout,
kc.Recorder,
cadvisorInterface)
cadvisorInterface,
kc.StatusUpdateFrequency)

if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -903,7 +903,7 @@ type Node struct {
Status NodeStatus `json:"status,omitempty"`
}

// NodeList is a list of minions.
// NodeList is a list of nodes.
type NodeList struct {
TypeMeta `json:",inline"`
ListMeta `json:"metadata,omitempty"`
Expand Down
108 changes: 105 additions & 3 deletions pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"time"

"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation"
"github.com/GoogleCloudPlatform/kubernetes/pkg/capabilities"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
Expand All @@ -56,7 +57,7 @@ import (
)

const (
// taken from lmctfy https://github.com/google/lmctfy/blob/master/lmctfy/controllers/cpu_controller.cc
// Taken from lmctfy https://github.com/google/lmctfy/blob/master/lmctfy/controllers/cpu_controller.cc
minShares = 2
sharesPerCPU = 1024
milliCPUToCPU = 1000
Expand All @@ -67,6 +68,14 @@ const (

// Max amount of time to wait for the Docker daemon to come up.
maxWaitForDocker = 5 * time.Minute

// Initial node status update frequency and incremental frequency, for faster cluster startup.
// The update frequency will be increameted linearly, until it reaches status_update_frequency.
initialNodeStatusUpdateFrequency = 100 * time.Millisecond
nodeStatusUpdateFrequencyInc = 500 * time.Millisecond

// The retry count for updating node status at each sync period.
nodeStatusUpdateRetry = 5
)

var (
Expand Down Expand Up @@ -109,7 +118,8 @@ func NewMainKubelet(
volumePlugins []volume.Plugin,
streamingConnectionIdleTimeout time.Duration,
recorder record.EventRecorder,
cadvisorInterface cadvisor.Interface) (*Kubelet, error) {
cadvisorInterface cadvisor.Interface,
statusUpdateFrequency time.Duration) (*Kubelet, error) {
if rootDirectory == "" {
return nil, fmt.Errorf("invalid root directory %q", rootDirectory)
}
Expand Down Expand Up @@ -159,6 +169,7 @@ func NewMainKubelet(
etcdClient: etcdClient,
kubeClient: kubeClient,
rootDirectory: rootDirectory,
statusUpdateFrequency: statusUpdateFrequency,
resyncInterval: resyncInterval,
podInfraContainerImage: podInfraContainerImage,
dockerIDToRef: map[dockertools.DockerID]*api.ObjectReference{},
Expand Down Expand Up @@ -218,6 +229,7 @@ type Kubelet struct {
rootDirectory string
podInfraContainerImage string
podWorkers *podWorkers
statusUpdateFrequency time.Duration
resyncInterval time.Duration
sourcesReady SourcesReadyFn

Expand Down Expand Up @@ -520,9 +532,36 @@ func (kl *Kubelet) Run(updates <-chan PodUpdate) {
if kl.dockerPuller == nil {
kl.dockerPuller = dockertools.NewDockerPuller(kl.dockerClient, kl.pullQPS, kl.pullBurst)
}
if kl.kubeClient == nil {
glog.Warning("No api server defined - no node status update will be sent.")
}
go kl.syncNodeStatus()
kl.syncLoop(updates, kl)
}

// syncNodeStatus periodically synchronizes node status to master.
func (kl *Kubelet) syncNodeStatus() {
if kl.kubeClient == nil {
return
}
for feq := initialNodeStatusUpdateFrequency; feq < kl.statusUpdateFrequency; feq += nodeStatusUpdateFrequencyInc {
select {
case <-time.After(feq):
if err := kl.updateNodeStatus(); err != nil {
glog.Errorf("Unable to update node status: %v", err)
}
}
}
for {
select {
case <-time.After(kl.statusUpdateFrequency):
if err := kl.updateNodeStatus(); err != nil {
glog.Errorf("Unable to update node status: %v", err)
}
}
}
}

func makeBinds(pod *api.BoundPod, container *api.Container, podVolumes volumeMap) []string {
binds := []string{}
for _, mount := range container.VolumeMounts {
Expand All @@ -538,6 +577,7 @@ func makeBinds(pod *api.BoundPod, container *api.Container, podVolumes volumeMap
}
return binds
}

func makePortsAndBindings(container *api.Container) (map[docker.Port]struct{}, map[docker.Port][]docker.PortBinding) {
exposedPorts := map[docker.Port]struct{}{}
portBindings := map[docker.Port][]docker.PortBinding{}
Expand Down Expand Up @@ -1679,7 +1719,7 @@ func (kl *Kubelet) GetHostname() string {
return kl.hostname
}

// GetBoundPods returns all pods bound to the kubelet and their spec
// GetBoundPods returns all pods bound to the kubelet and their spec.
func (kl *Kubelet) GetBoundPods() ([]api.BoundPod, error) {
kl.podLock.RLock()
defer kl.podLock.RUnlock()
Expand All @@ -1699,6 +1739,68 @@ func (kl *Kubelet) GetPodByName(namespace, name string) (*api.BoundPod, bool) {
return nil, false
}

// updateNodeStatus updates node status to master with retries.
func (kl *Kubelet) updateNodeStatus() error {
for i := 0; i < nodeStatusUpdateRetry; i++ {
err := kl.tryUpdateNodeStatus()
if err != nil {
glog.Errorf("error updating node status, will retry: %v", err)
} else {
return nil
}
}
return fmt.Errorf("Update node status exceeds retry count")
}

// tryUpdateNodeStatus tries to update node status to master.
func (kl *Kubelet) tryUpdateNodeStatus() error {
node, err := kl.kubeClient.Nodes().Get(kl.hostname)
if err != nil {
return fmt.Errorf("error getting node %s: %v", kl.hostname, err)
}
if node == nil {
return fmt.Errorf("no node instance returned for %v", kl.hostname)
}

// TODO: Post NotReady if we cannot get MachineInfo from cAdvisor. This needs to start
// cAdvisor locally, e.g. for test-cmd.sh, and in integration test.
info, err := kl.GetMachineInfo()
if err != nil {
glog.Error("error getting machine info: %v", err)
} else {
node.Status.NodeInfo.MachineID = info.MachineID
node.Status.NodeInfo.SystemUUID = info.SystemUUID
node.Spec.Capacity = api.ResourceList{
api.ResourceCPU: *resource.NewMilliQuantity(
int64(info.NumCores*1000),
resource.DecimalSI),
api.ResourceMemory: *resource.NewQuantity(
info.MemoryCapacity,
resource.BinarySI),
}
}

newCondition := api.NodeCondition{
Type: api.NodeReady,
Status: api.ConditionFull,
Reason: fmt.Sprintf("kubelet is posting ready status"),
LastProbeTime: util.Now(),
}
updated := false
for i := range node.Status.Conditions {
if node.Status.Conditions[i].Type == api.NodeReady {
node.Status.Conditions[i] = newCondition
updated = true
}
}
if !updated {
node.Status.Conditions = append(node.Status.Conditions, newCondition)
}

_, err = kl.kubeClient.Nodes().Update(node)
return err
}

// getPhase returns the phase of a pod given its container info.
func getPhase(spec *api.PodSpec, info api.PodInfo) api.PodPhase {
running := 0
Expand Down
Loading

0 comments on commit 9982aaa

Please sign in to comment.