From 2f8444c9354aae54a9d3f9c558104a70868efbc4 Mon Sep 17 00:00:00 2001 From: oldthreefeng Date: Fri, 15 Jan 2021 00:47:23 +0800 Subject: [PATCH 1/2] use channel to manage goroutine when upgrade nodes fix to Info not Alert on drain nodes. add ip in logs to upgrade logs Signed-off-by: oldthreefeng --- install/upgrade.go | 17 ++++++++--------- install/upgrade_pool.go | 38 ++++++++++++++++++++++++++++++++++++++ install/utils_test.go | 17 +++++++++++++++++ k8s/drain.go | 2 +- 4 files changed, 64 insertions(+), 10 deletions(-) create mode 100644 install/upgrade_pool.go diff --git a/install/upgrade.go b/install/upgrade.go index 0d77f391ac5..f8024d88b5a 100644 --- a/install/upgrade.go +++ b/install/upgrade.go @@ -2,8 +2,7 @@ package install import ( "fmt" - "os" - "sync" + "os" "time" "k8s.io/client-go/kubernetes" @@ -95,15 +94,16 @@ func (u *SealosUpgrade) UpgradeOtherMaster() { } func (u *SealosUpgrade) upgradeNodes(hostnames []string, isMaster bool) { - var wg sync.WaitGroup + wg := NewPool(2) var err error for _, hostname := range hostnames { wg.Add(1) go func(node string) { defer wg.Done() + ip := u.GetIpByHostname(node) // drain worker node is too danger for prod use; do not drain nodes if worker nodes~ if isMaster { - logger.Info("first: to drain master node %s", node) + logger.Info("[%s] first: to drain master node %s", ip, node) cmdDrain := fmt.Sprintf(`kubectl drain %s --ignore-daemonsets --delete-local-data`, node) err := SSHConfig.CmdAsync(u.Masters[0], cmdDrain) if err != nil { @@ -114,8 +114,7 @@ func (u *SealosUpgrade) upgradeNodes(hostnames []string, isMaster bool) { } // second to exec kubeadm upgrade node - logger.Info("second: to exec kubeadm upgrade node on %s", node) - ip := u.GetIpByHostname(node) + logger.Info("[%s] second: to exec kubeadm upgrade node on %s", ip, node) var cmdUpgrade string if ip == u.Masters[0] { cmdUpgrade = fmt.Sprintf("kubeadm upgrade apply --certificate-renewal=false --yes %s", u.NewVersion) @@ -134,7 +133,7 @@ func (u *SealosUpgrade) upgradeNodes(hostnames []string, isMaster bool) { } // third to restart kubelet - logger.Info("third: to restart kubelet on %s", node) + logger.Info("[%s] third: to restart kubelet on %s", ip, node) err = SSHConfig.CmdAsync(ip, "systemctl daemon-reload && systemctl restart kubelet") if err != nil { logger.Error("systemctl daemon-reload && systemctl restart kubelet err: ", err) @@ -144,14 +143,14 @@ func (u *SealosUpgrade) upgradeNodes(hostnames []string, isMaster bool) { time.Sleep(time.Second * 10) k8sNode, _ := k8s.GetNodeByName(u.Client, node) if k8s.IsNodeReady(*k8sNode) { - logger.Info("fourth: %s nodes is ready", node) + logger.Info("[%s] fourth: %s nodes is ready", ip,node) // fifth to uncordon node err = k8s.CordonUnCordon(u.Client, node, false) if err != nil { logger.Error(`k8s.CordonUnCordon err: %s, \n After upgrade, please run "kubectl uncordon %s" to enable Scheduling`, err, node) } - logger.Info("fifth: to uncordon node, 10 seconds to wait for %s uncordon", node) + logger.Info("[%s] fifth: to uncordon node, 10 seconds to wait for %s uncordon", ip, node) } else { logger.Error("fourth: %s nodes is not ready, please check the nodes logs to find out reason", node) } diff --git a/install/upgrade_pool.go b/install/upgrade_pool.go new file mode 100644 index 00000000000..cc452bc5178 --- /dev/null +++ b/install/upgrade_pool.go @@ -0,0 +1,38 @@ +package install + +import "sync" + + +type uPool struct { + queue chan int + wg *sync.WaitGroup +} + +func NewPool (size int) *uPool { + if size <= 1 { + size = 1 + } + return &uPool{ + queue: make(chan int, size), + wg: &sync.WaitGroup{}, + } +} + +func (p *uPool) Add(delta int) { + for i := 0; i < delta; i++ { + p.queue <- 1 + } + for i := 0; i > delta; i-- { + <-p.queue + } + p.wg.Add(delta) +} + +func (p *uPool) Done() { + <-p.queue + p.wg.Done() +} + +func (p *uPool) Wait() { + p.wg.Wait() +} \ No newline at end of file diff --git a/install/utils_test.go b/install/utils_test.go index e433ab4a59d..c3013018b13 100644 --- a/install/utils_test.go +++ b/install/utils_test.go @@ -3,8 +3,10 @@ package install import ( "fmt" "reflect" + "runtime" "strings" "testing" + "time" ) func TestPath(t *testing.T) { @@ -228,4 +230,19 @@ func TestFor120(t *testing.T) { } }) } +} + +func Test_Example(t *testing.T) { + pool := NewPool(2) + println(runtime.NumGoroutine()) + for i := 0; i < 10; i++ { + pool.Add(1) + go func(n int) { + time.Sleep(time.Second) + println(runtime.NumGoroutine(), n) + pool.Done() + }(i) + } + pool.Wait() + println(runtime.NumGoroutine()) } \ No newline at end of file diff --git a/k8s/drain.go b/k8s/drain.go index e871a9bfb40..5b2ec77cfe7 100644 --- a/k8s/drain.go +++ b/k8s/drain.go @@ -55,7 +55,7 @@ func CordonUnCordon(k8sClient *kubernetes.Clientset, nodeName string, cordoned b return err } if node.Spec.Unschedulable == cordoned { - logger.Alert("Node %s is already cordoned: %v", nodeName, cordoned) + logger.Info("Node %s is already Uncordoned, skip...", nodeName) return nil } node.Spec.Unschedulable = cordoned From 0c218d59400eb4549ae7a8c522ba8c14b5b38d63 Mon Sep 17 00:00:00 2001 From: oldthreefeng Date: Fri, 15 Jan 2021 21:18:24 +0800 Subject: [PATCH 2/2] fix Signed-off-by: oldthreefeng --- install/init.go | 6 +++--- install/send.go | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/install/init.go b/install/init.go index d10be62f829..8be2e2283d6 100644 --- a/install/init.go +++ b/install/init.go @@ -108,13 +108,13 @@ func (s *SealosInstaller) appendApiServer() error { reader := bufio.NewReader(file) for { str, err := reader.ReadString('\n') - if err == io.EOF { - break - } if strings.Contains(str, ApiServer) { logger.Info("local %s is already exists %s", etcHostPath, ApiServer) return nil } + if err == io.EOF { + break + } } write := bufio.NewWriter(file) write.WriteString(etcHostMap) diff --git a/install/send.go b/install/send.go index 9fba0a4fe0a..f0414bfb1dd 100755 --- a/install/send.go +++ b/install/send.go @@ -11,7 +11,7 @@ func (s *SealosInstaller) SendPackage() { // rm old sealos in package avoid old version problem. if sealos not exist in package then skip rm kubeHook := fmt.Sprintf("cd /root && rm -rf kube && tar zxvf %s && cd /root/kube/shell && rm -f ../bin/sealos && bash init.sh", pkg) deletekubectl := `sed -i '/kubectl/d;/sealos/d' /root/.bashrc ` - completion := "echo 'command -v kubectl &>/dev/null && source <(kubectl completion bash)' >> /root/.bashrc && echo 'command -v sealos &>/dev/null && source <(sealos completion bash)' >> /root/.bashrc && source /root/.bashrc" + completion := "echo 'command -v kubectl &>/dev/null && source <(kubectl completion bash)' >> /root/.bashrc && echo '[ -x /usr/bin/sealos ] && source <(sealos completion bash)' >> /root/.bashrc && source /root/.bashrc" kubeHook = kubeHook + " && " + deletekubectl + " && " + completion PkgUrl = SendPackage(PkgUrl, s.Hosts, "/root", nil, &kubeHook)