From 1ca871f938e64d10e064bf529123e7b8a7f6d1f9 Mon Sep 17 00:00:00 2001 From: "923048992@qq.com" <923048992@qq.com> Date: Fri, 13 Sep 2024 22:57:37 +0800 Subject: [PATCH] support dump workload bpf map Signed-off-by: 923048992@qq.com <923048992@qq.com> --- .../workload/bpfcache/auth_policy.go | 5 + pkg/controller/workload/bpfcache/backend.go | 15 +-- pkg/controller/workload/bpfcache/common.go | 19 ++++ pkg/controller/workload/bpfcache/endpoint.go | 15 +-- pkg/controller/workload/bpfcache/frontend.go | 15 +-- pkg/controller/workload/bpfcache/service.go | 15 +-- .../workload/workload_controller.go | 2 +- .../workload/workload_controller_test.go | 4 +- pkg/controller/workload/workload_processor.go | 6 +- .../workload/workload_processor_test.go | 10 +- pkg/status/status_server.go | 60 +++++++++-- pkg/status/status_server_test.go | 100 ++++++++++++++++++ 12 files changed, 211 insertions(+), 55 deletions(-) create mode 100644 pkg/controller/workload/bpfcache/common.go diff --git a/pkg/controller/workload/bpfcache/auth_policy.go b/pkg/controller/workload/bpfcache/auth_policy.go index 847c882a6..23059ad17 100644 --- a/pkg/controller/workload/bpfcache/auth_policy.go +++ b/pkg/controller/workload/bpfcache/auth_policy.go @@ -42,3 +42,8 @@ func (c *Cache) WorkloadPolicyLookup(key *WorkloadPolicy_key, value *WorkloadPol log.Debugf("workload policy lookup: [%#v]", *key) return c.bpfMap.MapOfWlPolicy.Lookup(key, value) } + +func (c *Cache) WorkloadPolicyLookupAll() []WorkloadPolicy_value { + log.Debugf("WorkloadPolicyLookupAll") + return LookupAll[WorkloadPolicy_value](c.bpfMap.MapOfWlPolicy) +} diff --git a/pkg/controller/workload/bpfcache/backend.go b/pkg/controller/workload/bpfcache/backend.go index 0e6f9bce2..241c17c88 100644 --- a/pkg/controller/workload/bpfcache/backend.go +++ b/pkg/controller/workload/bpfcache/backend.go @@ -56,15 +56,10 @@ func (c *Cache) BackendLookup(key *BackendKey, value *BackendValue) error { // BackendCount returns the length of backend map // Note only used for testing func (c *Cache) BackendCount() int { - var ( - key = BackendKey{} - value = BackendValue{} - ) + return len(c.BackendLookupAll()) +} - res := 0 - iter := c.bpfMap.KmeshBackend.Iterate() - for iter.Next(&key, &value) { - res++ - } - return res +func (c *Cache) BackendLookupAll() []BackendValue { + log.Debugf("BackendLookupAll") + return LookupAll[BackendValue](c.bpfMap.KmeshBackend) } diff --git a/pkg/controller/workload/bpfcache/common.go b/pkg/controller/workload/bpfcache/common.go new file mode 100644 index 000000000..b1f77d3d3 --- /dev/null +++ b/pkg/controller/workload/bpfcache/common.go @@ -0,0 +1,19 @@ +package bpfcache + +import ( + "github.com/cilium/ebpf" +) + +func LookupAll[T any](bpfMap *ebpf.Map) []T { + var ( + key T + value T + ret []T + ) + + iter := bpfMap.Iterate() + for iter.Next(&key, &value) { + ret = append(ret, value) + } + return ret +} diff --git a/pkg/controller/workload/bpfcache/endpoint.go b/pkg/controller/workload/bpfcache/endpoint.go index 14cc5b4b9..9489b68f0 100644 --- a/pkg/controller/workload/bpfcache/endpoint.go +++ b/pkg/controller/workload/bpfcache/endpoint.go @@ -153,15 +153,10 @@ func (c *Cache) GetAllEndpointsForService(serviceId uint32) []EndpointValue { // EndpointCount returns the length of endpoint map // Note only used for testing func (c *Cache) EndpointCount() int { - var ( - key = EndpointKey{} - value = EndpointValue{} - ) + return len(c.EndpointLookupAll()) +} - res := 0 - iter := c.bpfMap.KmeshEndpoint.Iterate() - for iter.Next(&key, &value) { - res++ - } - return res +func (c *Cache) EndpointLookupAll() []EndpointValue { + log.Debugf("EndpointLookupAll") + return LookupAll[EndpointValue](c.bpfMap.KmeshEndpoint) } diff --git a/pkg/controller/workload/bpfcache/frontend.go b/pkg/controller/workload/bpfcache/frontend.go index f08ec8e48..fa7117a9f 100644 --- a/pkg/controller/workload/bpfcache/frontend.go +++ b/pkg/controller/workload/bpfcache/frontend.go @@ -68,15 +68,10 @@ func (c *Cache) FrontendIterFindKey(upstreamId uint32) []FrontendKey { // FrontendCount returns the length of frontend map // Note only used for testing func (c *Cache) FrontendCount() int { - var ( - key = FrontendKey{} - value = FrontendValue{} - ) + return len(c.FrontendLookupAll()) +} - res := 0 - iter := c.bpfMap.KmeshFrontend.Iterate() - for iter.Next(&key, &value) { - res++ - } - return res +func (c *Cache) FrontendLookupAll() []FrontendValue { + log.Debugf("FrontendLookupAll") + return LookupAll[FrontendValue](c.bpfMap.KmeshFrontend) } diff --git a/pkg/controller/workload/bpfcache/service.go b/pkg/controller/workload/bpfcache/service.go index fe48f4e81..75866c052 100644 --- a/pkg/controller/workload/bpfcache/service.go +++ b/pkg/controller/workload/bpfcache/service.go @@ -58,15 +58,10 @@ func (c *Cache) ServiceLookup(key *ServiceKey, value *ServiceValue) error { // ServiceCount returns the length of service map // Note only used for testing func (c *Cache) ServiceCount() int { - var ( - key = ServiceKey{} - value = ServiceValue{} - ) + return len(c.ServiceLookupAll()) +} - res := 0 - iter := c.bpfMap.KmeshService.Iterate() - for iter.Next(&key, &value) { - res++ - } - return res +func (c *Cache) ServiceLookupAll() []ServiceValue { + log.Debugf("ServiceLookupAll") + return LookupAll[ServiceValue](c.bpfMap.KmeshService) } diff --git a/pkg/controller/workload/workload_controller.go b/pkg/controller/workload/workload_controller.go index 0a896e023..9da1c5411 100644 --- a/pkg/controller/workload/workload_controller.go +++ b/pkg/controller/workload/workload_controller.go @@ -45,7 +45,7 @@ type Controller struct { func NewController(bpfWorkload *bpf.BpfKmeshWorkload) *Controller { c := &Controller{ - Processor: newProcessor(bpfWorkload.SockConn.KmeshCgroupSockWorkloadObjects.KmeshCgroupSockWorkloadMaps), + Processor: NewProcessor(bpfWorkload.SockConn.KmeshCgroupSockWorkloadObjects.KmeshCgroupSockWorkloadMaps), bpfWorkloadObj: bpfWorkload, } // do some initialization when restart diff --git a/pkg/controller/workload/workload_controller_test.go b/pkg/controller/workload/workload_controller_test.go index 2f2a3d43e..3e37aea19 100644 --- a/pkg/controller/workload/workload_controller_test.go +++ b/pkg/controller/workload/workload_controller_test.go @@ -124,7 +124,7 @@ func TestWorkloadStreamCreateAndSend(t *testing.T) { beforeFunc: func(t *testing.T) { patches1.ApplyMethodReturn(fakeClient.Client, "DeltaAggregatedResources", fakeClient.DeltaClient, nil) - workloadController.Processor = newProcessor(workloadMap) + workloadController.Processor = NewProcessor(workloadMap) workload := createFakeWorkload("10.10.10.1", workloadapi.NetworkMode_STANDARD) workloadController.Processor.WorkloadCache.AddOrUpdateWorkload(workload) patches2.ApplyMethodFunc(fakeClient.DeltaClient, "Send", @@ -152,7 +152,7 @@ func TestWorkloadStreamCreateAndSend(t *testing.T) { beforeFunc: func(t *testing.T) { patches0.ApplyFuncReturn(maps_v2.AuthorizationUpdate, nil) patches1.ApplyMethodReturn(fakeClient.Client, "DeltaAggregatedResources", fakeClient.DeltaClient, nil) - workloadController.Processor = newProcessor(workloadMap) + workloadController.Processor = NewProcessor(workloadMap) workloadController.Rbac = auth.NewRbac(nil) workloadController.Rbac.UpdatePolicy(&security.Authorization{ Name: "p1", diff --git a/pkg/controller/workload/workload_processor.go b/pkg/controller/workload/workload_processor.go index ab6916870..6a574e13f 100644 --- a/pkg/controller/workload/workload_processor.go +++ b/pkg/controller/workload/workload_processor.go @@ -62,7 +62,7 @@ type Processor struct { authzOnce sync.Once } -func newProcessor(workloadMap bpf2go.KmeshCgroupSockWorkloadMaps) *Processor { +func NewProcessor(workloadMap bpf2go.KmeshCgroupSockWorkloadMaps) *Processor { return &Processor{ hashName: NewHashName(), bpf: bpf.NewCache(workloadMap), @@ -93,6 +93,10 @@ func newAckRequest(rsp *service_discovery_v3.DeltaDiscoveryResponse) *service_di } } +func (p *Processor) GetBpfCache() *bpf.Cache { + return p.bpf +} + func (p *Processor) processWorkloadResponse(rsp *service_discovery_v3.DeltaDiscoveryResponse, rbac *auth.Rbac) { var err error diff --git a/pkg/controller/workload/workload_processor_test.go b/pkg/controller/workload/workload_processor_test.go index ea52ac3fd..d0a2bcec6 100644 --- a/pkg/controller/workload/workload_processor_test.go +++ b/pkg/controller/workload/workload_processor_test.go @@ -42,7 +42,7 @@ func Test_handleWorkload(t *testing.T) { workloadMap := bpfcache.NewFakeWorkloadMap(t) defer bpfcache.CleanupFakeWorkloadMap(workloadMap) - p := newProcessor(workloadMap) + p := NewProcessor(workloadMap) var ( ek bpfcache.EndpointKey @@ -175,7 +175,7 @@ func Test_handleServiceWithWaypoint(t *testing.T) { // Mainly used to test whether processor can correctly handle // different types of waypoint address without panic. workloadMap := bpfcache.NewFakeWorkloadMap(t) - p := newProcessor(workloadMap) + p := NewProcessor(workloadMap) // Waypoint with network address. svc1 := createFakeService("svc1", "10.240.10.1", "10.240.10.200") @@ -188,7 +188,7 @@ func Test_handleServiceWithWaypoint(t *testing.T) { func Test_hostnameNetworkMode(t *testing.T) { workloadMap := bpfcache.NewFakeWorkloadMap(t) - p := newProcessor(workloadMap) + p := NewProcessor(workloadMap) workload := createFakeWorkload("1.2.3.4", workloadapi.NetworkMode_STANDARD) workloadWithoutService := createFakeWorkload("1.2.3.5", workloadapi.NetworkMode_STANDARD) workloadWithoutService.Services = nil @@ -485,7 +485,7 @@ func TestRestart(t *testing.T) { workloadMap := bpfcache.NewFakeWorkloadMap(t) defer bpfcache.CleanupFakeWorkloadMap(workloadMap) - p := newProcessor(workloadMap) + p := NewProcessor(workloadMap) res := &service_discovery_v3.DeltaDiscoveryResponse{} @@ -545,7 +545,7 @@ func TestRestart(t *testing.T) { // Set a restart label and simulate missing data in the cache bpf.SetStartType(bpf.Restart) // reconstruct a new processor - p = newProcessor(workloadMap) + p = NewProcessor(workloadMap) p.bpf.RestoreEndpointKeys() // 2.1 simulate workload add/delete during restart // simulate workload update during restart diff --git a/pkg/status/status_server.go b/pkg/status/status_server.go index b13a0fca5..f8bc2eb92 100644 --- a/pkg/status/status_server.go +++ b/pkg/status/status_server.go @@ -35,6 +35,7 @@ import ( "kmesh.net/kmesh/pkg/constants" "kmesh.net/kmesh/pkg/controller" "kmesh.net/kmesh/pkg/controller/ads" + "kmesh.net/kmesh/pkg/controller/workload/bpfcache" "kmesh.net/kmesh/pkg/logger" ) @@ -46,6 +47,7 @@ const ( patternHelp = "/help" patternOptions = "/options" patternBpfAdsMaps = "/debug/bpf/ads" + patternBpfWorkloadMaps = "/debug/bpf/worload" configDumpPrefix = "/debug/config_dump" patternConfigDumpAds = configDumpPrefix + "/ads" patternConfigDumpWorkload = configDumpPrefix + "/workload" @@ -55,6 +57,8 @@ const ( bpfLoggerName = "bpf" httpTimeout = time.Second * 20 + + invalidModeErrMessage = "\tInvalid Client Mode\n" ) type Server struct { @@ -128,11 +132,56 @@ func (s *Server) httpOptions(w http.ResponseWriter, r *http.Request) { fmt.Fprintln(w, s.config.String()) } +func (s *Server) checkWorkloadMode(w http.ResponseWriter) bool { + client := s.xdsClient + if client == nil || client.WorkloadController == nil { + w.WriteHeader(http.StatusBadRequest) + fmt.Fprint(w, invalidModeErrMessage) + return false + } + return true +} + +type WorkloadBpfDump struct { + AuthPolicies []bpfcache.WorkloadPolicy_value + Backends []bpfcache.BackendValue + Endpoints []bpfcache.EndpointValue + Frontends []bpfcache.FrontendValue + Services []bpfcache.ServiceValue +} + +func (s *Server) bpfWorkloadMaps(w http.ResponseWriter, r *http.Request) { + if !s.checkWorkloadMode(w) { + return + } + client := s.xdsClient + bpfMaps := client.WorkloadController.Processor.GetBpfCache() + workloadBpfDump := WorkloadBpfDump{ + AuthPolicies: bpfMaps.WorkloadPolicyLookupAll(), + Backends: bpfMaps.BackendLookupAll(), + Endpoints: bpfMaps.EndpointLookupAll(), + Frontends: bpfMaps.FrontendLookupAll(), + Services: bpfMaps.ServiceLookupAll(), + } + printWorkloadBpfDump(w, workloadBpfDump) +} + +func printWorkloadBpfDump(w http.ResponseWriter, wbd WorkloadBpfDump) { + data, err := json.MarshalIndent(wbd, "", " ") + if err != nil { + log.Errorf("Failed to marshal WorkloadBpfDump: %v", err) + w.WriteHeader(http.StatusInternalServerError) + return + } + w.WriteHeader(http.StatusOK) + _, _ = w.Write(data) +} + func (s *Server) bpfAdsMaps(w http.ResponseWriter, r *http.Request) { client := s.xdsClient if client == nil || client.AdsController == nil { w.WriteHeader(http.StatusBadRequest) - fmt.Fprintf(w, "\t%s\n", "invalid ClientMode") + fmt.Fprint(w, invalidModeErrMessage) return } @@ -257,7 +306,7 @@ func (s *Server) configDumpAds(w http.ResponseWriter, r *http.Request) { client := s.xdsClient if client == nil || client.AdsController == nil { w.WriteHeader(http.StatusBadRequest) - fmt.Fprintf(w, "\t%s\n", "invalid ClientMode") + fmt.Fprint(w, invalidModeErrMessage) return } @@ -283,13 +332,12 @@ type WorkloadDump struct { } func (s *Server) configDumpWorkload(w http.ResponseWriter, r *http.Request) { - client := s.xdsClient - if client == nil || client.WorkloadController == nil { - w.WriteHeader(http.StatusBadRequest) - fmt.Fprintf(w, "\t%s\n", "invalid ClientMode") + if !s.checkWorkloadMode(w) { return } + client := s.xdsClient + workloads := client.WorkloadController.Processor.WorkloadCache.List() services := client.WorkloadController.Processor.ServiceCache.List() workloadDump := WorkloadDump{ diff --git a/pkg/status/status_server_test.go b/pkg/status/status_server_test.go index c06cb2c2e..8e66d5269 100644 --- a/pkg/status/status_server_test.go +++ b/pkg/status/status_server_test.go @@ -19,6 +19,7 @@ package status import ( "bytes" "encoding/json" + "io" "net/http" "net/http/httptest" "net/netip" @@ -35,6 +36,7 @@ import ( "kmesh.net/kmesh/pkg/constants" "kmesh.net/kmesh/pkg/controller" "kmesh.net/kmesh/pkg/controller/workload" + "kmesh.net/kmesh/pkg/controller/workload/bpfcache" "kmesh.net/kmesh/pkg/controller/workload/cache" "kmesh.net/kmesh/pkg/logger" "kmesh.net/kmesh/pkg/utils/test" @@ -295,3 +297,101 @@ func TestServer_configDumpWorkload(t *testing.T) { util.CompareContent(t, w.Body.Bytes(), "./testdata/workload_configdump.json") } + +func TestServer_dumpWorkloadBpfMap(t *testing.T) { + // Create a new instance of the Server struct + t.Run("Ads mode test", func(t *testing.T) { + config := options.BpfConfig{ + Mode: "ads", + BpfFsPath: "/sys/fs/bpf", + Cgroup2Path: "/mnt/kmesh_cgroup2", + } + cleanup, _ := test.InitBpfMap(t, config) + defer cleanup() + + // ads mode will failed + server := &Server{} + req := httptest.NewRequest(http.MethodPost, patternBpfWorkloadMaps, nil) + w := httptest.NewRecorder() + server.configDumpWorkload(w, req) + + body, err := io.ReadAll(w.Body) + assert.Nil(t, err) + assert.Equal(t, invalidModeErrMessage, string(body)) + }) + + t.Run("Workload mode test", func(t *testing.T) { + config := options.BpfConfig{ + Mode: "workload", + BpfFsPath: "/sys/fs/bpf", + Cgroup2Path: "/mnt/kmesh_cgroup2", + } + cleanup, bpfLoader := test.InitBpfMap(t, config) + bpfMaps := bpfLoader.GetBpfKmeshWorkload().SockConn.KmeshCgroupSockWorkloadMaps + defer cleanup() + + // ads mode will failed + server := &Server{ + xdsClient: &controller.XdsClient{ + WorkloadController: &workload.Controller{ + Processor: workload.NewProcessor(bpfMaps), + }, + }, + } + req := httptest.NewRequest(http.MethodPost, patternBpfWorkloadMaps, nil) + w := httptest.NewRecorder() + server.configDumpWorkload(w, req) + + // do some updates + testWorkloadPolicyKeys := []bpfcache.WorkloadPolicy_key{ + {WorklodId: 1}, {WorklodId: 2}, + } + testWorkloadPolicyVals := []bpfcache.WorkloadPolicy_value{ + {PolicyIds: [4]uint32{1, 2, 3, 4}}, {PolicyIds: [4]uint32{5, 6, 7, 8}}, + } + bpfMaps.MapOfAuth.BatchUpdate(testWorkloadPolicyKeys, testWorkloadPolicyVals, nil) + + testBackendKeys := []bpfcache.BackendKey{ + {BackendUid: 1}, {BackendUid: 2}, + } + testBackendVals := []bpfcache.BackendValue{ + {WaypointPort: 1234}, {WaypointPort: 5678}, + } + bpfMaps.KmeshBackend.BatchUpdate(testBackendKeys, testBackendVals, nil) + + testEndpointKeys := []bpfcache.EndpointKey{ + {ServiceId: 1}, {ServiceId: 2}, + } + testEndpointVals := []bpfcache.EndpointValue{ + {BackendUid: 1234}, {BackendUid: 5678}, + } + bpfMaps.KmeshEndpoint.BatchUpdate(testEndpointKeys, testEndpointVals, nil) + + testFrontendKeys := []bpfcache.FrontendKey{ + {Ip: [16]byte{1, 2, 3, 4}}, {Ip: [16]byte{5, 6, 7, 8}}, + } + testFrontendVals := []bpfcache.FrontendValue{ + {UpstreamId: 1234}, {UpstreamId: 5678}, + } + bpfMaps.KmeshFrontend.BatchUpdate(testFrontendKeys, testFrontendVals, nil) + + testServiceKeys := []bpfcache.ServiceKey{ + {ServiceId: 1}, {ServiceId: 2}, + } + testServiceVals := []bpfcache.ServiceValue{ + {EndpointCount: 1234}, {EndpointCount: 5678}, + } + bpfMaps.KmeshService.BatchUpdate(testServiceKeys, testServiceVals, nil) + + body, err := io.ReadAll(w.Body) + assert.Nil(t, err) + dump := WorkloadBpfDump{} + json.Unmarshal(body, &dump) + + assert.ObjectsAreEqual(testWorkloadPolicyVals, dump.AuthPolicies) + assert.ObjectsAreEqual(testBackendVals, dump.Backends) + assert.ObjectsAreEqual(testEndpointVals, dump.Endpoints) + assert.ObjectsAreEqual(testFrontendVals, dump.Frontends) + assert.ObjectsAreEqual(testServiceVals, dump.Services) + }) +}