Skip to content

Commit

Permalink
Fixes to service deletion and errors
Browse files Browse the repository at this point in the history
  • Loading branch information
thebsdbox committed Dec 6, 2020
1 parent a009f8a commit 6e7a7a6
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 23 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ TARGET := kube-vip
.DEFAULT_GOAL: $(TARGET)

# These will be provided to the target
VERSION := 0.2.2
VERSION := 0.2.3
BUILD := `git rev-parse HEAD`

# Operating System Default (LINUX)
Expand Down
4 changes: 4 additions & 0 deletions pkg/kubevip/config_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,10 @@ func generatePodSpec(c *Config, imageVersion string) *corev1.Pod {
Name: vipInterface,
Value: c.Interface,
},
{
Name: port,
Value: fmt.Sprintf("%d", c.Port),
},
}

// If a CIDR is used add it to the manifest
Expand Down
5 changes: 4 additions & 1 deletion pkg/manager/manager_arp.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,10 @@ func (sm *Manager) startARP() error {
RetryPeriod: 1 * time.Second,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
sm.servicesWatcher(ctx)
err = sm.servicesWatcher(ctx)
if err != nil {
log.Error(err)
}
},
OnStoppedLeading: func() {
// we can do cleanup here
Expand Down
5 changes: 4 additions & 1 deletion pkg/manager/manager_bgp.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,10 @@ func (sm *Manager) startBGP() error {
// }
}

sm.servicesWatcher(ctx)
err = sm.servicesWatcher(ctx)
if err != nil {
return err
}

log.Infof("Shutting down Kube-Vip")

Expand Down
30 changes: 16 additions & 14 deletions pkg/manager/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,25 +183,27 @@ func (sm *Manager) syncServices(service *v1.Service) error {
sm.serviceInstances = append(sm.serviceInstances, newService)

// Update the service
ns, err := returnNameSpace()
if err != nil {
log.Errorf("Error finding Namespace")
return
}
dhcpService, err := sm.clientSet.CoreV1().Services(ns).Get(context.TODO(), newService.ServiceName, metav1.GetOptions{})
if err != nil {
log.Errorf("Error finding Service [%s] : %v", newService.ServiceName, err)
return
}
dhcpService.Spec.LoadBalancerIP = newVip.VIP
updatedService, err := sm.clientSet.CoreV1().Services(ns).Update(context.TODO(), dhcpService, metav1.UpdateOptions{})
// ns, err := returnNameSpace()
// if err != nil {
// log.Errorf("Error finding Namespace")
// return
// }
// dhcpService, err := sm.clientSet.CoreV1().Services(ns).Get(context.TODO(), newService.ServiceName, metav1.GetOptions{})
// if err != nil {
// log.Errorf("Error finding Service [%s] : %v", newService.ServiceName, err)
// return
// }

// Update the service with DHCP information
service.Spec.LoadBalancerIP = newVip.VIP
updatedService, err := sm.clientSet.CoreV1().Services(service.Namespace).Update(context.TODO(), service, metav1.UpdateOptions{})
log.Infof("Updating service [%s], with load balancer address [%s]", updatedService.Name, updatedService.Spec.LoadBalancerIP)
if err != nil {
log.Errorf("Error updating Service Spec [%s] : %v", newService.ServiceName, err)
return
}
updatedService.Status.LoadBalancer.Ingress = []v1.LoadBalancerIngress{{IP: newVip.VIP}}
_, err = sm.clientSet.CoreV1().Services(ns).UpdateStatus(context.TODO(), updatedService, metav1.UpdateOptions{})
_, err = sm.clientSet.CoreV1().Services(updatedService.Namespace).UpdateStatus(context.TODO(), updatedService, metav1.UpdateOptions{})
if err != nil {
log.Errorf("Error updating Service [%s] Status: %v", newService.ServiceName, err)
return
Expand Down Expand Up @@ -264,7 +266,7 @@ func (sm *Manager) syncServices(service *v1.Service) error {

// Update the "Status" of the LoadBalancer (one or many may do this), as long as one does it
service.Status.LoadBalancer.Ingress = []v1.LoadBalancerIngress{{IP: newVip.VIP}}
_, err = sm.clientSet.CoreV1().Services("").UpdateStatus(context.TODO(), service, metav1.UpdateOptions{})
_, err = sm.clientSet.CoreV1().Services(service.Namespace).UpdateStatus(context.TODO(), service, metav1.UpdateOptions{})
if err != nil {
log.Errorf("Error updating Service [%s] Status: %v", newService.ServiceName, err)
}
Expand Down
23 changes: 17 additions & 6 deletions pkg/manager/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,29 @@ func (sm *Manager) servicesWatcher(ctx context.Context) error {
if !ok {
return fmt.Errorf("Unable to parse Kubernetes services from API watcher")
}
log.Infof("Found Service [%s], it has an assigned external addresses [%s]", svc.Name, len(svc.Spec.LoadBalancerIP))
err = sm.syncServices(svc)
if err != nil {
log.Error(err)
if svc.Spec.LoadBalancerIP == "" {
log.Infof("Service [%s] has been addded/modified, it has no assigned external addresses", svc.Name)
} else {
log.Infof("Service [%s] has been addded/modified, it has an assigned external addresses [%s]", svc.Name, svc.Spec.LoadBalancerIP)
err = sm.syncServices(svc)
if err != nil {
log.Error(err)
}
}

case watch.Deleted:
svc, ok := event.Object.(*v1.Service)
if !ok {
return fmt.Errorf("Unable to parse Kubernetes services from API watcher")
}
sm.deleteService(string(svc.UID))
err = sm.stopService(string(svc.UID))
if err != nil {
log.Error(err)
}
err = sm.deleteService(string(svc.UID))
if err != nil {
log.Error(err)
}
log.Infof("Service [%s] has been deleted", svc.Name)

case watch.Bookmark:
// Un-used
Expand Down

0 comments on commit 6e7a7a6

Please sign in to comment.