Skip to content

Commit

Permalink
refactor exec command and use get ip by node name & by labelselector …
Browse files Browse the repository at this point in the history
…method to avoid for loop

when connect to kubernetes failed ,Exit

use method to get ip

format code
  • Loading branch information
oldthreefeng committed Sep 5, 2020
1 parent aeeca3c commit ec778d1
Show file tree
Hide file tree
Showing 6 changed files with 347 additions and 129 deletions.
8 changes: 4 additions & 4 deletions cmd/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ import (
// execCmd represents the exec command
var (
exampleExecCmd = `
# exec cmd by label or nodes. when --label and --node is Exist, get Intersection of both.
# exec cmd by label or nodes. when --label and --node is Exist, get Union of both.
sealos exec --cmd "mkdir /data" --label node-role.kubernetes.io/master= --node 192.168.0.2
sealos exec --cmd "mkdir /data" --node 192.168.0.2
sealos exec --cmd "mkdir /data" --node 192.168.0.2 --nodes dev-k8s-mater
# exec copy src file to dst by label or nodes. when --label and --node is Exist, get Intersection of both.
# exec copy src file to dst by label or nodes. when --label and --node is Exist, get Union of both.
sealos exec --src /data/foo --dst /root/foo --label node-role.kubernetes.io/master=""
sealos exec --src /data/foo --dst /root/foo --node 192.168.0.2
`
Expand All @@ -46,7 +46,7 @@ func init() {
execCmd.Flags().StringVar(&install.Dst, "dst", "", "dest file location")
execCmd.Flags().StringVar(&install.ExecCommand, "cmd", "", "exec command string")
execCmd.Flags().StringVar(&install.Label, "label", "", "kubernetes labels like node-role.kubernetes.io/master=")
execCmd.Flags().StringSliceVar(&install.ExecNode, "node", []string{}, "node ip")
execCmd.Flags().StringSliceVar(&install.ExecNode, "node", []string{}, "node ip or hostname in kubernetes")

}

Expand Down
103 changes: 18 additions & 85 deletions install/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ type ExecFlag struct {
Cmd string
Label string
ExecNode []string
// map["hostname"] -> ip
Nodes map[string]string
SealConfig
}

Expand Down Expand Up @@ -43,19 +41,17 @@ func GetExecFlag() *ExecFlag {
// logger.Info("get label", Label)
e.Label = Label
e.Cmd = ExecCommand
e.ExecNode = ExecNode

// if use label, we need to re-init ExecNode which Flag --node is not set,
// we must put all nodes to ExecNode. so we can map the ip and hostname.
if e.IsUseLabeled() && !e.IsUseNode() {
ExecNode = append(ExecNode, MasterIPs...)
ExecNode = append(ExecNode, NodeIPs...)
// change labels ==> to ip
k8sClient, err := k8s.NewClient(k8s.KubeDefaultConfigPath, nil)
if err != nil {
logger.Error("get k8s client err: ", err)
os.Exit(ErrorExitOSCase)
}
// make to create non-nil map, nil map assign will panic
e.Nodes = make(map[string]string, len(ExecNode))
for _, node := range ExecNode {
hostname := SSHConfig.CmdToString(node, "hostname", "")
e.Nodes[hostname] = node
e.ExecNode, err = k8s.TransToIP(k8sClient, Label, ExecNode)
if err != nil {
logger.Error("get ips err: ", err)
os.Exit(ErrorExitOSCase)
}
return e
}
Expand All @@ -70,7 +66,7 @@ func (e *ExecFlag) IsUseCmd() bool {
return e.Cmd != ""
}

// IsUseCmd return true when you want to copy file
// IsUseCopy return true when you want to copy file
func (e *ExecFlag) IsUseCopy() bool {
return FileExist(e.Src) && e.Dst != ""
}
Expand All @@ -82,38 +78,18 @@ func (e *ExecFlag) IsUseNode() bool {

// Copy is cp src file to dst file
func (e *ExecFlag) Copy() {
// this case when use by label . we need a flag to set the --node is not used, in case of running twice By label
if e.IsUseNode() && !e.IsUseLabeled() {
e.copyByNode()
}
if e.IsUseLabeled() {
err := e.copyByLabel()
if err != nil {
logger.Error("copyByLabel err: ", err)
os.Exit(ErrorExitOSCase)
}
}
e.copyByNodeIp()
}

// Exec is cp src file to dst file
func (e *ExecFlag) Exec() {
// this case when use by label . we need a flag to set the --node is not used, in case of running twice By label
if e.IsUseNode() && !e.IsUseLabeled() {
e.execByNode()
}
if e.IsUseLabeled() {
err := e.execByLabel()
if err != nil {
logger.Error("execByLabel err: ", err)
os.Exit(ErrorExitOSCase)
}
}
e.execByNodeIp()
}

// copyByNode is cp src file to dst file
func (e *ExecFlag) copyByNode() {
// copyByNodeIp is cp src file to dst file
func (e *ExecFlag) copyByNodeIp() {
var wg sync.WaitGroup
for _, n := range e.Nodes {
for _, n := range e.ExecNode {
wg.Add(1)
go func(node string) {
defer wg.Done()
Expand All @@ -128,10 +104,10 @@ func (e *ExecFlag) copyByNode() {
wg.Wait()
}

// execByNode is exec cmd in Node
func (e *ExecFlag) execByNode() {
// execByNodeIp is exec cmd in Node
func (e *ExecFlag) execByNodeIp() {
var wg sync.WaitGroup
for _, n := range e.Nodes {
for _, n := range e.ExecNode {
wg.Add(1)
go func(node string) {
defer wg.Done()
Expand All @@ -140,46 +116,3 @@ func (e *ExecFlag) execByNode() {
}
wg.Wait()
}

func (e *ExecFlag) getNodesByLabel() ([]string, error) {
k8sClient, err := k8s.NewClient(k8s.KubeDefaultConfigPath, nil)
if err != nil {
return nil, err
}
return k8s.GetNodeByLabel(k8sClient, e.Label)
}

// execByNode is exec cmd by Node
func (e *ExecFlag) execByLabel() error {
hosts, err := e.getNodesByLabel()
if err != nil {
return err
}
for _, hostname := range hosts {
// logger.Info(hostname)
if node, ok := e.Nodes[hostname]; ok {
CmdWorkSpace(node, e.Cmd, TMPDIR)
}
}
return nil
}

// copyByNode is copy file by label
func (e *ExecFlag) copyByLabel() error {
hosts, err := e.getNodesByLabel()
if err != nil {
return err
}
for _, hostname := range hosts {
// 说明这个是需要操作的。
if node, ok := e.Nodes[hostname]; ok {
// 存在就直接跳过。 不存在才执行
if SSHConfig.IsFileExist(node, e.Dst) {
logger.Info("[%s] is exist on remote host [%s].skip...", e.Dst, node)
continue
}
SSHConfig.CopyLocalToRemote(node, e.Src, e.Dst)
}
}
return nil
}
124 changes: 84 additions & 40 deletions k8s/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"k8s.io/client-go/transport"
"os"
"path/filepath"
"strings"
"time"
)

Expand All @@ -26,6 +25,7 @@ const (
KubeDefaultConfigPath = "/root/.kube/config"
)

// NewClient is get clientSet by kubeConfig
func NewClient(kubeConfigPath string, k8sWrapTransport transport.WrapperFunc) (*kubernetes.Clientset, error) {
// use the current admin kubeconfig
var config *rest.Config
Expand All @@ -50,59 +50,84 @@ func NewClient(kubeConfigPath string, k8sWrapTransport transport.WrapperFunc) (*
return K8sClientSet, nil
}

// GetNodeList is get all nodes
func GetNodeList(k8sClient *kubernetes.Clientset) (*v1.NodeList, error) {
return k8sClient.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
}

func GetNode(k8sClient *kubernetes.Clientset, nodeName string) (*v1.Node, error) {
var listErr error
for retries := 0; retries < MaxRetries; retries++ {
nodes, err := GetNodeList(k8sClient)
if err != nil {
listErr = err
time.Sleep(time.Second * RetryInterval)
continue
// GetNodeListByLabel is get node list by label
func GetNodeListByLabel(k8sClient *kubernetes.Clientset, label string) (*v1.NodeList, error) {
listOption := &metav1.ListOptions{LabelSelector: label}
return k8sClient.CoreV1().Nodes().List(context.TODO(), *listOption)
}

// GetNodeIpByName is get node internalIp by nodeName
func GetNodeIpByName(k8sClient *kubernetes.Clientset, nodeName string) (ip string, err error) {
node, err := k8sClient.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
if err != nil {
return "", err
}

for _, v := range node.Status.Addresses {
if v.Type == v1.NodeInternalIP {
ip = v.Address
return ip, nil
}
// reset listErr back to nil
listErr = nil
for _, node := range nodes.Items {
if strings.ToLower(node.Labels[HostnameLabel]) == strings.ToLower(nodeName) {
return &node, nil
}
return "", apierrors.NewNotFound(schema.GroupResource{}, nodeName)
}

// GetNodeNameByIp is get node name by node ip
func GetNodeNameByIp(k8sClient *kubernetes.Clientset, ip string) (name string, err error) {
nodes, err := GetNodeList(k8sClient)
if err != nil {
return "", err
}
for _, node := range nodes.Items {
for _, v := range node.Status.Addresses {
if v.Type == v1.NodeInternalIP && ip == v.Address {
return node.Name, nil
}
}
time.Sleep(time.Second * RetryInterval)
}
if listErr != nil {
return nil, listErr
}
return nil, apierrors.NewNotFound(schema.GroupResource{}, nodeName)
return "", fmt.Errorf("ip [%s] is not fount in kubernetes nodes", ip)
}

func GetNodeByLabel(k8sClient *kubernetes.Clientset, label string) ([]string, error) {
var listErr error
// GetNodeNameByLabel is get node name by label
func GetNodeNameByLabel(k8sClient *kubernetes.Clientset, label string) ([]string, error) {
var ns []string
for retries := 0; retries < MaxRetries; retries++ {
nodes, err := GetNodeList(k8sClient)
if err != nil {
listErr = err
time.Sleep(time.Second * RetryInterval)
continue
}
// reset listErr back to nil
listErr = nil
for _, node := range nodes.Items {
for k, v := range node.Labels {
if label == fmt.Sprintf("%s=%s", k, v) {
ns = append(ns, node.Name)
}
nodes, err := GetNodeListByLabel(k8sClient, label)
if err != nil {
return nil, err
}
for _, node := range nodes.Items {
ns = append(ns, node.Name)
}
if len(ns) != 0 {
return ns, nil
}

return nil, fmt.Errorf("label %s is not fount in kubernetes nodes", label)
}

// GetNodeIpByLabel is is get node ip by label
func GetNodeIpByLabel(k8sClient *kubernetes.Clientset, label string) ([]string, error) {
var ips []string
nodes, err := GetNodeListByLabel(k8sClient, label)
if err != nil {
return nil, err
}
for _, node := range nodes.Items {
for _, v := range node.Status.Addresses {
if v.Type == v1.NodeInternalIP {
ips = append(ips, v.Address)
}
}
if ns != nil {
return ns, nil
}
time.Sleep(time.Second * RetryInterval)
}
return nil, listErr
if len(ips) != 0 {
return ips, nil
}
return nil, fmt.Errorf("label %s is not fount in kubernetes nodes", label)
}

func IsNodeReady(node v1.Node) bool {
Expand All @@ -114,3 +139,22 @@ func IsNodeReady(node v1.Node) bool {
}
return false
}

// TransToIP is use kubernetes label or hostname/ip to get ip
func TransToIP(k8sClient *kubernetes.Clientset, label string, hostname []string) ([]string, error) {
var ips []string
ips, err := GetNodeIpByLabel(k8sClient, label)
if err != nil {
return nil, err
}
resHost, resIp := getHostnameAndIp(hostname)
ips = append(ips, resIp...)
for _, node := range resHost {
ip, err := GetNodeIpByName(k8sClient, node)
if err == nil {
ips = append(ips, ip)
}
}
ips = removeRep(ips)
return ips, nil
}
Loading

0 comments on commit ec778d1

Please sign in to comment.