Skip to content

Commit

Permalink
support dump workload bpf map
Browse files Browse the repository at this point in the history
  • Loading branch information
Okabe-Rintarou-0 committed Sep 13, 2024
1 parent 089540f commit 1ca871f
Show file tree
Hide file tree
Showing 12 changed files with 211 additions and 55 deletions.
5 changes: 5 additions & 0 deletions pkg/controller/workload/bpfcache/auth_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,8 @@ func (c *Cache) WorkloadPolicyLookup(key *WorkloadPolicy_key, value *WorkloadPol
log.Debugf("workload policy lookup: [%#v]", *key)
return c.bpfMap.MapOfWlPolicy.Lookup(key, value)
}

func (c *Cache) WorkloadPolicyLookupAll() []WorkloadPolicy_value {
log.Debugf("WorkloadPolicyLookupAll")
return LookupAll[WorkloadPolicy_value](c.bpfMap.MapOfWlPolicy)
}
15 changes: 5 additions & 10 deletions pkg/controller/workload/bpfcache/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,10 @@ func (c *Cache) BackendLookup(key *BackendKey, value *BackendValue) error {
// BackendCount returns the length of backend map
// Note only used for testing
func (c *Cache) BackendCount() int {
var (
key = BackendKey{}
value = BackendValue{}
)
return len(c.BackendLookupAll())
}

res := 0
iter := c.bpfMap.KmeshBackend.Iterate()
for iter.Next(&key, &value) {
res++
}
return res
func (c *Cache) BackendLookupAll() []BackendValue {
log.Debugf("BackendLookupAll")
return LookupAll[BackendValue](c.bpfMap.KmeshBackend)
}
19 changes: 19 additions & 0 deletions pkg/controller/workload/bpfcache/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package bpfcache

import (
"github.com/cilium/ebpf"
)

func LookupAll[T any](bpfMap *ebpf.Map) []T {
var (
key T
value T
ret []T
)

iter := bpfMap.Iterate()
for iter.Next(&key, &value) {
ret = append(ret, value)
}
return ret
}
15 changes: 5 additions & 10 deletions pkg/controller/workload/bpfcache/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,15 +153,10 @@ func (c *Cache) GetAllEndpointsForService(serviceId uint32) []EndpointValue {
// EndpointCount returns the length of endpoint map
// Note only used for testing
func (c *Cache) EndpointCount() int {
var (
key = EndpointKey{}
value = EndpointValue{}
)
return len(c.EndpointLookupAll())
}

res := 0
iter := c.bpfMap.KmeshEndpoint.Iterate()
for iter.Next(&key, &value) {
res++
}
return res
func (c *Cache) EndpointLookupAll() []EndpointValue {
log.Debugf("EndpointLookupAll")
return LookupAll[EndpointValue](c.bpfMap.KmeshEndpoint)
}
15 changes: 5 additions & 10 deletions pkg/controller/workload/bpfcache/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,10 @@ func (c *Cache) FrontendIterFindKey(upstreamId uint32) []FrontendKey {
// FrontendCount returns the length of frontend map
// Note only used for testing
func (c *Cache) FrontendCount() int {
var (
key = FrontendKey{}
value = FrontendValue{}
)
return len(c.FrontendLookupAll())
}

res := 0
iter := c.bpfMap.KmeshFrontend.Iterate()
for iter.Next(&key, &value) {
res++
}
return res
func (c *Cache) FrontendLookupAll() []FrontendValue {
log.Debugf("FrontendLookupAll")
return LookupAll[FrontendValue](c.bpfMap.KmeshFrontend)
}
15 changes: 5 additions & 10 deletions pkg/controller/workload/bpfcache/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,10 @@ func (c *Cache) ServiceLookup(key *ServiceKey, value *ServiceValue) error {
// ServiceCount returns the length of service map
// Note only used for testing
func (c *Cache) ServiceCount() int {
var (
key = ServiceKey{}
value = ServiceValue{}
)
return len(c.ServiceLookupAll())
}

res := 0
iter := c.bpfMap.KmeshService.Iterate()
for iter.Next(&key, &value) {
res++
}
return res
func (c *Cache) ServiceLookupAll() []ServiceValue {
log.Debugf("ServiceLookupAll")
return LookupAll[ServiceValue](c.bpfMap.KmeshService)
}
2 changes: 1 addition & 1 deletion pkg/controller/workload/workload_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type Controller struct {

func NewController(bpfWorkload *bpf.BpfKmeshWorkload) *Controller {
c := &Controller{
Processor: newProcessor(bpfWorkload.SockConn.KmeshCgroupSockWorkloadObjects.KmeshCgroupSockWorkloadMaps),
Processor: NewProcessor(bpfWorkload.SockConn.KmeshCgroupSockWorkloadObjects.KmeshCgroupSockWorkloadMaps),
bpfWorkloadObj: bpfWorkload,
}
// do some initialization when restart
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/workload/workload_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func TestWorkloadStreamCreateAndSend(t *testing.T) {
beforeFunc: func(t *testing.T) {
patches1.ApplyMethodReturn(fakeClient.Client, "DeltaAggregatedResources", fakeClient.DeltaClient, nil)

workloadController.Processor = newProcessor(workloadMap)
workloadController.Processor = NewProcessor(workloadMap)
workload := createFakeWorkload("10.10.10.1", workloadapi.NetworkMode_STANDARD)
workloadController.Processor.WorkloadCache.AddOrUpdateWorkload(workload)
patches2.ApplyMethodFunc(fakeClient.DeltaClient, "Send",
Expand Down Expand Up @@ -152,7 +152,7 @@ func TestWorkloadStreamCreateAndSend(t *testing.T) {
beforeFunc: func(t *testing.T) {
patches0.ApplyFuncReturn(maps_v2.AuthorizationUpdate, nil)
patches1.ApplyMethodReturn(fakeClient.Client, "DeltaAggregatedResources", fakeClient.DeltaClient, nil)
workloadController.Processor = newProcessor(workloadMap)
workloadController.Processor = NewProcessor(workloadMap)
workloadController.Rbac = auth.NewRbac(nil)
workloadController.Rbac.UpdatePolicy(&security.Authorization{
Name: "p1",
Expand Down
6 changes: 5 additions & 1 deletion pkg/controller/workload/workload_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ type Processor struct {
authzOnce sync.Once
}

func newProcessor(workloadMap bpf2go.KmeshCgroupSockWorkloadMaps) *Processor {
func NewProcessor(workloadMap bpf2go.KmeshCgroupSockWorkloadMaps) *Processor {
return &Processor{
hashName: NewHashName(),
bpf: bpf.NewCache(workloadMap),
Expand Down Expand Up @@ -93,6 +93,10 @@ func newAckRequest(rsp *service_discovery_v3.DeltaDiscoveryResponse) *service_di
}
}

func (p *Processor) GetBpfCache() *bpf.Cache {
return p.bpf
}

func (p *Processor) processWorkloadResponse(rsp *service_discovery_v3.DeltaDiscoveryResponse, rbac *auth.Rbac) {
var err error

Expand Down
10 changes: 5 additions & 5 deletions pkg/controller/workload/workload_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func Test_handleWorkload(t *testing.T) {
workloadMap := bpfcache.NewFakeWorkloadMap(t)
defer bpfcache.CleanupFakeWorkloadMap(workloadMap)

p := newProcessor(workloadMap)
p := NewProcessor(workloadMap)

var (
ek bpfcache.EndpointKey
Expand Down Expand Up @@ -175,7 +175,7 @@ func Test_handleServiceWithWaypoint(t *testing.T) {
// Mainly used to test whether processor can correctly handle
// different types of waypoint address without panic.
workloadMap := bpfcache.NewFakeWorkloadMap(t)
p := newProcessor(workloadMap)
p := NewProcessor(workloadMap)

// Waypoint with network address.
svc1 := createFakeService("svc1", "10.240.10.1", "10.240.10.200")
Expand All @@ -188,7 +188,7 @@ func Test_handleServiceWithWaypoint(t *testing.T) {

func Test_hostnameNetworkMode(t *testing.T) {
workloadMap := bpfcache.NewFakeWorkloadMap(t)
p := newProcessor(workloadMap)
p := NewProcessor(workloadMap)
workload := createFakeWorkload("1.2.3.4", workloadapi.NetworkMode_STANDARD)
workloadWithoutService := createFakeWorkload("1.2.3.5", workloadapi.NetworkMode_STANDARD)
workloadWithoutService.Services = nil
Expand Down Expand Up @@ -485,7 +485,7 @@ func TestRestart(t *testing.T) {
workloadMap := bpfcache.NewFakeWorkloadMap(t)
defer bpfcache.CleanupFakeWorkloadMap(workloadMap)

p := newProcessor(workloadMap)
p := NewProcessor(workloadMap)

res := &service_discovery_v3.DeltaDiscoveryResponse{}

Expand Down Expand Up @@ -545,7 +545,7 @@ func TestRestart(t *testing.T) {
// Set a restart label and simulate missing data in the cache
bpf.SetStartType(bpf.Restart)
// reconstruct a new processor
p = newProcessor(workloadMap)
p = NewProcessor(workloadMap)
p.bpf.RestoreEndpointKeys()
// 2.1 simulate workload add/delete during restart
// simulate workload update during restart
Expand Down
60 changes: 54 additions & 6 deletions pkg/status/status_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"kmesh.net/kmesh/pkg/constants"
"kmesh.net/kmesh/pkg/controller"
"kmesh.net/kmesh/pkg/controller/ads"
"kmesh.net/kmesh/pkg/controller/workload/bpfcache"
"kmesh.net/kmesh/pkg/logger"
)

Expand All @@ -46,6 +47,7 @@ const (
patternHelp = "/help"
patternOptions = "/options"
patternBpfAdsMaps = "/debug/bpf/ads"
patternBpfWorkloadMaps = "/debug/bpf/worload"
configDumpPrefix = "/debug/config_dump"
patternConfigDumpAds = configDumpPrefix + "/ads"
patternConfigDumpWorkload = configDumpPrefix + "/workload"
Expand All @@ -55,6 +57,8 @@ const (
bpfLoggerName = "bpf"

httpTimeout = time.Second * 20

invalidModeErrMessage = "\tInvalid Client Mode\n"
)

type Server struct {
Expand Down Expand Up @@ -128,11 +132,56 @@ func (s *Server) httpOptions(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, s.config.String())
}

func (s *Server) checkWorkloadMode(w http.ResponseWriter) bool {
client := s.xdsClient
if client == nil || client.WorkloadController == nil {
w.WriteHeader(http.StatusBadRequest)
fmt.Fprint(w, invalidModeErrMessage)
return false
}
return true
}

type WorkloadBpfDump struct {
AuthPolicies []bpfcache.WorkloadPolicy_value
Backends []bpfcache.BackendValue
Endpoints []bpfcache.EndpointValue
Frontends []bpfcache.FrontendValue
Services []bpfcache.ServiceValue
}

func (s *Server) bpfWorkloadMaps(w http.ResponseWriter, r *http.Request) {
if !s.checkWorkloadMode(w) {
return
}
client := s.xdsClient
bpfMaps := client.WorkloadController.Processor.GetBpfCache()
workloadBpfDump := WorkloadBpfDump{
AuthPolicies: bpfMaps.WorkloadPolicyLookupAll(),
Backends: bpfMaps.BackendLookupAll(),
Endpoints: bpfMaps.EndpointLookupAll(),
Frontends: bpfMaps.FrontendLookupAll(),
Services: bpfMaps.ServiceLookupAll(),
}
printWorkloadBpfDump(w, workloadBpfDump)
}

func printWorkloadBpfDump(w http.ResponseWriter, wbd WorkloadBpfDump) {
data, err := json.MarshalIndent(wbd, "", " ")
if err != nil {
log.Errorf("Failed to marshal WorkloadBpfDump: %v", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)
_, _ = w.Write(data)
}

func (s *Server) bpfAdsMaps(w http.ResponseWriter, r *http.Request) {
client := s.xdsClient
if client == nil || client.AdsController == nil {
w.WriteHeader(http.StatusBadRequest)
fmt.Fprintf(w, "\t%s\n", "invalid ClientMode")
fmt.Fprint(w, invalidModeErrMessage)
return
}

Expand Down Expand Up @@ -257,7 +306,7 @@ func (s *Server) configDumpAds(w http.ResponseWriter, r *http.Request) {
client := s.xdsClient
if client == nil || client.AdsController == nil {
w.WriteHeader(http.StatusBadRequest)
fmt.Fprintf(w, "\t%s\n", "invalid ClientMode")
fmt.Fprint(w, invalidModeErrMessage)
return
}

Expand All @@ -283,13 +332,12 @@ type WorkloadDump struct {
}

func (s *Server) configDumpWorkload(w http.ResponseWriter, r *http.Request) {
client := s.xdsClient
if client == nil || client.WorkloadController == nil {
w.WriteHeader(http.StatusBadRequest)
fmt.Fprintf(w, "\t%s\n", "invalid ClientMode")
if !s.checkWorkloadMode(w) {
return
}

client := s.xdsClient

workloads := client.WorkloadController.Processor.WorkloadCache.List()
services := client.WorkloadController.Processor.ServiceCache.List()
workloadDump := WorkloadDump{
Expand Down
Loading

0 comments on commit 1ca871f

Please sign in to comment.