Skip to content

Commit

Permalink
cleaning up code with a pre-PR review
Browse files Browse the repository at this point in the history
  • Loading branch information
justnoise committed May 15, 2020
1 parent 01e8923 commit 6049d25
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 50 deletions.
16 changes: 16 additions & 0 deletions cmd/virtual-kubelet/cert.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
Copyright 2020 Elotl Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main

import (
Expand Down
5 changes: 3 additions & 2 deletions pkg/server/cloud/gce/firewall.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/elotl/kip/pkg/server/cloud"
"github.com/elotl/kip/pkg/util"
"google.golang.org/api/compute/v1"
"k8s.io/klog"
)

func (c *gceClient) SetBootSecurityGroupIDs(ids []string) {
Expand Down Expand Up @@ -59,7 +60,7 @@ func (c *gceClient) EnsureSecurityGroup(sgName string, ports []cloud.InstancePor
}
// We have seen eventual consistency errors here, retry it if we
// can't find the group
for i := 0; i < 10; i++ {
for i := 0; i < apiRetries; i++ {
sg, err = c.FindSecurityGroup(sgName)
if sg != nil || err != nil {
break
Expand Down Expand Up @@ -131,7 +132,7 @@ func allowedRulesToPorts(fwa []*compute.FirewallAllowed) []cloud.InstancePort {
}
proto := gceProtocolToKipProtocol(fw.IPProtocol)
if proto == "" {
// Todo: output a warning
klog.Warningln("Unknown protocol in firewall rule", fw.IPProtocol)
continue
}
for _, port := range fw.Ports {
Expand Down
41 changes: 18 additions & 23 deletions pkg/server/cloud/gce/gce.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"context"
"fmt"
"os"
"runtime"
"strings"
"time"

"cloud.google.com/go/compute/metadata"
Expand All @@ -41,22 +41,9 @@ const (
podNameLabelKey = "kip-pod-name"
statusOperationDone = "DONE"
statusInstanceRunning = "RUNNING"
apiRetries = 10
)

func TODO() error {
msg := "TODO: Not implemented yet!"
pc, file, line, ok := runtime.Caller(1)
if ok {
fName := "unknown"
details := runtime.FuncForPC(pc)
if details != nil {
fName = details.Name()
}
msg = fmt.Sprintf("TODO: [%s.%d] %s not implemented yet", file, line, fName)
}
return fmt.Errorf(msg)
}

type gceClient struct {
service *compute.Service
controllerID string
Expand Down Expand Up @@ -107,7 +94,6 @@ func NewGCEClient(controllerID, nametag, projectID string, opts ...ClientOption)
}
}

// Setup VPC Parameters
if client.vpcName == "" {
client.vpcName, err = client.detectCurrentVPC()
if err != nil {
Expand All @@ -119,7 +105,6 @@ func NewGCEClient(controllerID, nametag, projectID string, opts ...ClientOption)
return nil, err
}

// Setup subnet parameters
if client.subnetName == "" {
client.subnetName, client.subnetCIDR, err = client.autodetectSubnet()
if err != nil {
Expand Down Expand Up @@ -178,7 +163,7 @@ func (c *gceClient) CloudStatusKeeper() cloud.StatusKeeper {
}

func (c *gceClient) GetRegistryAuth() (string, string, error) {
return "", "", TODO()
return "", "", fmt.Errorf("Not implemented in gce")
}

func nilResponseError(call string) error {
Expand Down Expand Up @@ -224,29 +209,39 @@ func (c *gceClient) getZoneOperation(opName string) (*compute.Operation, error)
return resp, nil
}

func waitBackoff(i int) time.Duration {
waitTimes := []time.Duration{1, 1, 2, 3, 5}
if i < len(waitTimes) {
return waitTimes[i] * time.Second
}
return waitTimes[len(waitTimes)-1] * time.Second
}

// In GCE operations will immediately succeed from a call, however that does not
// mean they have completed execution errorless. Here we wait for an operation
// to finish so we can check handle errors as we find necessary
func (c *gceClient) waitOnOperation(opName string, getOperation func(string) (*compute.Operation, error)) error {
i := -1
for {
i += 1
op, err := getOperation(opName)
if err != nil {
return err
}

if op.Status != statusOperationDone {
time.Sleep(1 * time.Second)
time.Sleep(waitBackoff(i) * time.Second)
continue
}
// check if the operation is not nil
// if not nil it will have a *compute.OperationError
if op.Error != nil {
klog.Errorf("Operation %s was not successful", opName)
var errors []string
for _, e := range op.Error.Errors {
klog.Errorf(
"Error running operation, status: %d error_code: %s message: %s",
op.HttpErrorStatusCode, e.Code, e.Message)
errors = append(errors, e.Message)
}
// TODO figure out how we should handle operation errors
return fmt.Errorf("Operation failed with error(s): %s", strings.Join(errors, ", "))
}
break
}
Expand Down
28 changes: 18 additions & 10 deletions pkg/server/cloud/gce/gce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package gce
import (
"os"
"testing"
"time"

"github.com/stretchr/testify/assert"
)
Expand All @@ -15,13 +16,20 @@ func getGCE(t *testing.T, controllerID string) *gceClient {
return c
}

// func TestGCECloud(t *testing.T) {
// fmt.Printf("Running Cloud Test\n")
// controllerID := "bcoxtestcontroller"
// cloudClient := getGCE(t, controllerID)
// err := cloudClient.EnsureMilpaSecurityGroups(
// []string{},
// []string{},
// )
// assert.NoError(t, err)
// }
func TestWaitForBackoff(t *testing.T) {
tests := []struct {
i int
exp time.Duration
}{
{i: 0, exp: 1},
{i: 1, exp: 1},
{i: 3, exp: 3},
{i: 4, exp: 5},
{i: 5, exp: 5},
{i: 6, exp: 5},
}
for _, tc := range tests {
res := waitBackoff(tc.i)
assert.Equal(t, tc.exp*time.Second, res)
}
}
17 changes: 7 additions & 10 deletions pkg/server/cloud/gce/instances.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ func (c *gceClient) getInstanceSpec(instanceID string) (*compute.Instance, error
defer cancel()
instance, err := c.service.Instances.Get(c.projectID, c.zone, instanceID).Context(ctx).Do()
if err != nil {
// TODO error handling for googleapi errors
klog.Errorf("error retrieving instance specification: %v", err)
return nil, err
}
Expand All @@ -68,7 +67,6 @@ func (c *gceClient) getInstanceSpec(instanceID string) (*compute.Instance, error
func (c *gceClient) getInstanceStatus(instanceID string) (string, error) {
instance, err := c.getInstanceSpec(instanceID)
if err != nil {
// TODO error handling for googleapi errors
klog.Errorf("error retrieving instance status: %v", err)
return "", err
}
Expand Down Expand Up @@ -189,12 +187,14 @@ func (c *gceClient) StartNode(node *api.Node, metadata string) (*cloud.StartNode
c.bootSecurityGroupIDs, c.subnetName)
op, err := c.service.Instances.Insert(c.projectID, c.zone, spec).Do()
if err != nil {
// TODO add error checking for googleapi using helpers in util
return nil, util.WrapError(err, "startup error")
}
if err := c.waitOnOperation(op.Name, c.getZoneOperation); err != nil {
return nil, err
}
// Todo: catch and convert errors to notify us of
// out of capacity errors or invalid machine types
// see pkg/server/cloud/aws/instances.StartNode()
startResult := &cloud.StartNodeResult{
InstanceID: spec.Name,
AvailabilityZone: c.zone,
Expand All @@ -212,9 +212,11 @@ func (c *gceClient) StartSpotNode(node *api.Node, metadata string) (*cloud.Start
c.bootSecurityGroupIDs, c.subnetName)
op, err := c.service.Instances.Insert(c.projectID, c.zone, spec).Do()
if err != nil {
// TODO add error checking for googleapi using helpers in util
return nil, util.WrapError(err, "startup error")
}
// Todo: catch and convert errors to notify us of
// out of capacity errors or invalid machine types
// see pkg/server/cloud/aws/instances.StartNode()
if err := c.waitOnOperation(op.Name, c.getZoneOperation); err != nil {
return nil, err
}
Expand All @@ -230,7 +232,6 @@ func (c *gceClient) WaitForRunning(node *api.Node) ([]api.NetworkAddress, error)
status, err := c.getInstanceStatus(node.Status.InstanceID)
if err != nil {
klog.Errorf("Error waiting for instance to start: %v", err)
// TODO add error checking for googleapi using helpers in util
return nil, err
}

Expand All @@ -242,7 +243,6 @@ func (c *gceClient) WaitForRunning(node *api.Node) ([]api.NetworkAddress, error)
}
instance, err := c.getInstanceSpec(node.Status.InstanceID)
if err != nil {
// TODO add error checking for googleapi using helpers in util
return nil, err
}

Expand Down Expand Up @@ -299,7 +299,6 @@ func (c *gceClient) StopInstance(instanceID string) error {
func (c *gceClient) getFirstVolume(instanceID string) *compute.AttachedDisk {
instance, err := c.getInstanceSpec(instanceID)
if err != nil {
// TODO error handling for googleapi errors
klog.Errorf("error retrieving instance volume: %v", err)
return nil
}
Expand All @@ -315,7 +314,7 @@ func (c *gceClient) getFirstVolume(instanceID string) *compute.AttachedDisk {
func (c *gceClient) ResizeVolume(node *api.Node, size int64) (error, bool) {
vol := c.getFirstVolume(node.Status.InstanceID)
// in GCE zonal standard persistent disks cannot be smaller than 10GiB
if vol == nil || vol.InitializeParams.DiskSizeGb < 10 {
if vol == nil {
return fmt.Errorf("Error retrieving volume info for node %s: %v",
node.Name, vol), false
}
Expand Down Expand Up @@ -350,7 +349,6 @@ func (c *gceClient) SetSustainedCPU(node *api.Node, enabled bool) error {
func (c *gceClient) ListInstancesFilterID(ids []string) ([]cloud.CloudInstance, error) {
instances, err := c.ListInstances()
if err != nil {
// TODO add error checking for googleapi using helpers in util
return nil, err
}
var filteredList []cloud.CloudInstance
Expand Down Expand Up @@ -384,7 +382,6 @@ func (c *gceClient) ListInstances() ([]cloud.CloudInstance, error) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout)
defer cancel()
if err := listCall.Pages(ctx, f); err != nil {
// TODO add error checking for googleapi using helpers in util
return nil, err
}
return instances, nil
Expand Down
9 changes: 4 additions & 5 deletions pkg/server/cloud/gce/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/elotl/kip/pkg/server/cloud"
"github.com/elotl/kip/pkg/util"
"google.golang.org/api/compute/v1"
"k8s.io/klog"
)

func zoneToRegion(zone string) (string, error) {
Expand Down Expand Up @@ -215,14 +214,14 @@ func (c *gceClient) ModifySourceDestinationCheck(instanceID string, isEnabled bo
}

func (c *gceClient) GetDNSInfo() ([]string, []string, error) {
// resolv.conf contents:
// instance resolv.conf contents:
//
// domain c.milpa-207719.internal
// search c.milpa-207719.internal. google.internal.
// nameserver 169.254.169.254

klog.Warningln("Need to improve GETDNSInfo()")

//
// This is fairly symplistic, might need to look into determining
// if the users environment has other settings
zoneLetter := c.zone[len(c.zone)-1]
s := fmt.Sprintf("%s.%s.internal.", string(zoneLetter), c.projectID)
searches := []string{s, "google.internal."}
Expand Down

0 comments on commit 6049d25

Please sign in to comment.