Skip to content

Commit

Permalink
Merge pull request kmesh-net#695 from hzxuzhonghu/filter-out-unhealthy
Browse files Browse the repository at this point in the history
Filter out unhealthy workloads
  • Loading branch information
kmesh-bot authored Sep 11, 2024
2 parents a2853e8 + c5b6f8d commit b238317
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 22 deletions.
45 changes: 31 additions & 14 deletions pkg/controller/workload/workload_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,18 +147,27 @@ func (p *Processor) storePodFrontendData(uid uint32, ip []byte) error {
return nil
}

func (p *Processor) removeWorkloadResource(removedResources []string) error {
func (p *Processor) removeWorkloadResources(removedResources []string) error {
for _, uid := range removedResources {
wl := p.WorkloadCache.GetWorkloadByUid(uid)
p.WorkloadCache.DeleteWorkload(uid)
telemetry.DeleteWorkloadMetric(wl)
if err := p.removeWorkloadFromBpfMap(uid); err != nil {
return err
err := p.removeWorkload(uid)
if err != nil {
log.Warnf("removeWorkload %s failed: %v", uid, err)
continue
}
}
return nil
}

func (p *Processor) removeWorkload(uid string) error {
wl := p.WorkloadCache.GetWorkloadByUid(uid)
if wl == nil {
return nil
}
p.WorkloadCache.DeleteWorkload(uid)
telemetry.DeleteWorkloadMetric(wl)
return p.removeWorkloadFromBpfMap(uid)
}

func (p *Processor) removeWorkloadFromBpfMap(uid string) error {
var (
err error
Expand Down Expand Up @@ -234,7 +243,7 @@ func (p *Processor) deleteServiceFrontendData(service *workloadapi.Service, id u
return nil
}

func (p *Processor) removeServiceResource(resources []string) error {
func (p *Processor) removeServiceResources(resources []string) error {
for _, name := range resources {
telemetry.DeleteServiceMetric(name)
svc := p.ServiceCache.GetService(name)
Expand Down Expand Up @@ -382,11 +391,19 @@ func (p *Processor) updateWorkload(workload *workloadapi.Workload) error {
}

func (p *Processor) handleWorkload(workload *workloadapi.Workload) error {
log.Debugf("handle workload: %s", workload.Uid)
log.Debugf("handle workload: %s", workload.ResourceName())

// Keep track of the workload no matter it is healthy, unhealthy workload is just for debugging
p.WorkloadCache.AddOrUpdateWorkload(workload)
p.storeWorkloadPolicies(workload.GetUid(), workload.GetAuthorizationPolicies())

// Exclude unhealthy workload, which is not ready to serve traffic
if workload.Status == workloadapi.WorkloadStatus_UNHEALTHY {
log.Debugf("workload %s is unhealthy", workload.ResourceName())
// If the workload is updated to unhealthy, we should remove it from the bpf map
return p.removeWorkloadFromBpfMap(workload.Uid)
}

unboundedEndpointKeys, newServices := p.compareWorkloadServices(workload)
if err := p.handleWorkloadUnboundServices(workload, unboundedEndpointKeys); err != nil {
log.Errorf("handleWorkloadUnboundServices %s failed: %v", workload.ResourceName(), err)
Expand Down Expand Up @@ -542,10 +559,10 @@ func (p *Processor) handleRemovedAddresses(removed []string) {
}
}

if err := p.removeWorkloadResource(workloadNames); err != nil {
log.Errorf("RemoveWorkloadResource failed: %v", err)
if err := p.removeWorkloadResources(workloadNames); err != nil {
log.Errorf("removeWorkloadResources failed: %v", err)
}
if err := p.removeServiceResource(serviceNames); err != nil {
if err := p.removeServiceResources(serviceNames); err != nil {
log.Errorf("RemoveServiceResource failed: %v", err)
}
}
Expand All @@ -567,19 +584,19 @@ func (p *Processor) handleAddressTypeResponse(rsp *service_discovery_v3.DeltaDis
case *workloadapi.Address_Service:
services = append(services, address.GetService())
default:
log.Errorf("unknown type")
log.Errorf("unknown type, should not reach here")
}
}

for _, service := range services {
if err = p.handleService(service); err != nil {
log.Errorf("handle service failed, err: %v", err)
log.Errorf("handle service %v failed, err: %v", service.ResourceName(), err)
}
}

for _, workload := range workloads {
if err = p.handleWorkload(workload); err != nil {
log.Errorf("handle workload failed, err: %v", err)
log.Errorf("handle workload %s failed, err: %v", workload.ResourceName(), err)
}
}

Expand Down
37 changes: 29 additions & 8 deletions pkg/controller/workload/workload_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,12 @@ func Test_handleWorkload(t *testing.T) {
_ = p.handleService(fakeSvc)

// 2. add workload
wl := createTestWorkloadWithService(true)
err := p.handleWorkload(wl)
workload1 := createTestWorkloadWithService(true)
err := p.handleWorkload(workload1)
assert.NoError(t, err)

workloadID := checkFrontEndMap(t, wl.Addresses[0], p)
checkBackendMap(t, p, workloadID, wl)
workloadID := checkFrontEndMap(t, workload1.Addresses[0], p)
checkBackendMap(t, p, workloadID, workload1)

// 2.1 check front end map contains service
svcID := checkFrontEndMap(t, fakeSvc.Addresses[0].Address, p)
Expand Down Expand Up @@ -118,9 +118,9 @@ func Test_handleWorkload(t *testing.T) {
checkBackendMap(t, p, workload2ID, workload2)

// 5 update workload to remove the bound services
wl3 := proto.Clone(wl).(*workloadapi.Workload)
wl3.Services = nil
err = p.handleWorkload(wl3)
workload1Updated := proto.Clone(workload1).(*workloadapi.Workload)
workload1Updated.Services = nil
err = p.handleWorkload(workload1Updated)
assert.NoError(t, err)

// 5.1 check service map
Expand All @@ -142,7 +142,28 @@ func Test_handleWorkload(t *testing.T) {
// 6.2 check service map contains service, but no waypoint address
checkServiceMap(t, p, svcID, wpSvc, 0)

// 7. delete service
// 7. test add unhealthy workload
workload3 := createFakeWorkload("1.2.3.7", workloadapi.NetworkMode_STANDARD)
workload3.Status = workloadapi.WorkloadStatus_UNHEALTHY
_ = p.handleWorkload(workload3)

addr, _ := netip.AddrFromSlice(workload3.Addresses[0])
networkAddress := cache.NetworkAddress{
Network: workload3.Network,
Address: addr,
}
got := p.WorkloadCache.GetWorkloadByAddr(networkAddress)
t.Logf("workload3: %v", got)
assert.NotNil(t, got)
assert.Equal(t, got.Status, workloadapi.WorkloadStatus_UNHEALTHY)
checkNotExistInFrontEndMap(t, workload3.Addresses[0], p)

// 8. update workload from healthy to unhealthy, should remove it from bpf map
workload2.Status = workloadapi.WorkloadStatus_UNHEALTHY
_ = p.handleWorkload(workload2)
checkNotExistInFrontEndMap(t, workload2.Addresses[0], p)

// 9. delete service
p.handleRemovedAddresses([]string{fakeSvc.ResourceName()})
checkNotExistInFrontEndMap(t, fakeSvc.Addresses[0].Address, p)

Expand Down

0 comments on commit b238317

Please sign in to comment.