Skip to content

Commit

Permalink
Merge pull request kmesh-net#871 from LiZhenCheng9527/accesslog-trigger
Browse files Browse the repository at this point in the history
add a trigger for accesslog
  • Loading branch information
LiZhenCheng9527 authored Sep 21, 2024
2 parents cce29bc + a5dce80 commit 5c72407
Show file tree
Hide file tree
Showing 8 changed files with 24 additions and 18 deletions.
2 changes: 1 addition & 1 deletion daemon/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func Execute(configs *options.BootstrapConfigs) error {
stopCh := make(chan struct{})
defer close(stopCh)

c := controller.NewController(configs, bpfLoader.GetBpfKmeshWorkload(), configs.BpfConfig.BpfFsPath, configs.BpfConfig.EnableBpfLog)
c := controller.NewController(configs, bpfLoader.GetBpfKmeshWorkload(), configs.BpfConfig.BpfFsPath, configs.BpfConfig.EnableBpfLog, configs.BpfConfig.EnableAccesslog)
if err := c.Start(stopCh); err != nil {
return err
}
Expand Down
12 changes: 7 additions & 5 deletions daemon/options/bpf.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@ import (
)

type BpfConfig struct {
Mode string
BpfFsPath string
Cgroup2Path string
EnableMda bool
EnableBpfLog bool
Mode string
BpfFsPath string
Cgroup2Path string
EnableMda bool
EnableBpfLog bool
EnableAccesslog bool
}

func (c *BpfConfig) AttachFlags(cmd *cobra.Command) {
Expand All @@ -39,6 +40,7 @@ func (c *BpfConfig) AttachFlags(cmd *cobra.Command) {
cmd.PersistentFlags().StringVar(&c.Mode, "mode", "workload", "controller plane mode, valid values are [ads, workload]")
cmd.PersistentFlags().BoolVar(&c.EnableMda, "enable-mda", false, "enable mda")
cmd.PersistentFlags().BoolVar(&c.EnableBpfLog, "enable-bpf-log", false, "enable ebpf log in daemon process")
cmd.PersistentFlags().BoolVar(&c.EnableAccesslog, "enable-accesslog", false, "enable accesslog in daemon process")
}

func (c *BpfConfig) ParseConfig() error {
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,14 @@ type XdsClient struct {
xdsConfig *config.XdsConfig
}

func NewXdsClient(mode string, bpfWorkload *bpf.BpfKmeshWorkload) *XdsClient {
func NewXdsClient(mode string, bpfWorkload *bpf.BpfKmeshWorkload, enableAccesslog bool) *XdsClient {
client := &XdsClient{
mode: mode,
xdsConfig: config.GetConfig(mode),
}

if mode == constants.WorkloadMode {
client.WorkloadController = workload.NewController(bpfWorkload)
client.WorkloadController = workload.NewController(bpfWorkload, enableAccesslog)
} else if mode == constants.AdsMode {
client.AdsController = ads.NewController()
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/controller/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import (

func TestRecoverConnection(t *testing.T) {
t.Run("test reconnect success", func(t *testing.T) {
utClient := NewXdsClient(constants.AdsMode, &bpf.BpfKmeshWorkload{})
utClient := NewXdsClient(constants.AdsMode, &bpf.BpfKmeshWorkload{}, false)
patches := gomonkey.NewPatches()
defer patches.Reset()
iteration := 0
Expand Down Expand Up @@ -78,7 +78,7 @@ func TestClientResponseProcess(t *testing.T) {
}))
})

utClient := NewXdsClient(constants.AdsMode, &bpf.BpfKmeshWorkload{})
utClient := NewXdsClient(constants.AdsMode, &bpf.BpfKmeshWorkload{}, false)
err := utClient.createGrpcStreamClient()
assert.NoError(t, err)

Expand Down Expand Up @@ -125,7 +125,7 @@ func TestClientResponseProcess(t *testing.T) {
}))
})

utClient := NewXdsClient(constants.WorkloadMode, &bpf.BpfKmeshWorkload{})
utClient := NewXdsClient(constants.WorkloadMode, &bpf.BpfKmeshWorkload{}, false)
err := utClient.createGrpcStreamClient()
assert.NoError(t, err)

Expand Down
6 changes: 4 additions & 2 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,18 @@ type Controller struct {
enableSecretManager bool
bpfFsPath string
enableBpfLog bool
enableAccesslog bool
}

func NewController(opts *options.BootstrapConfigs, bpfWorkloadObj *bpf.BpfKmeshWorkload, bpfFsPath string, enableBpfLog bool) *Controller {
func NewController(opts *options.BootstrapConfigs, bpfWorkloadObj *bpf.BpfKmeshWorkload, bpfFsPath string, enableBpfLog bool, enableAccesslog bool) *Controller {
return &Controller{
mode: opts.BpfConfig.Mode,
enableByPass: opts.ByPassConfig.EnableByPass,
bpfWorkloadObj: bpfWorkloadObj,
enableSecretManager: opts.SecretManagerConfig.Enable,
bpfFsPath: bpfFsPath,
enableBpfLog: enableBpfLog,
enableAccesslog: enableAccesslog,
}
}

Expand Down Expand Up @@ -101,7 +103,7 @@ func (c *Controller) Start(stopCh <-chan struct{}) error {
return fmt.Errorf("fail to start ringbuf reader: %v", err)
}
}
c.client = NewXdsClient(c.mode, c.bpfWorkloadObj)
c.client = NewXdsClient(c.mode, c.bpfWorkloadObj, c.enableAccesslog)

if c.client.WorkloadController != nil {
c.client.WorkloadController.Run(ctx)
Expand Down
6 changes: 4 additions & 2 deletions pkg/controller/telemetry/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ const (
var osStartTime time.Time

type MetricController struct {
enableAccesslog bool
workloadCache cache.WorkloadCache
workloadMetricCache map[workloadMetricLabels]*workloadMetricInfo
serviceMetricCache map[serviceMetricLabels]*serviceMetricInfo
Expand Down Expand Up @@ -173,8 +174,9 @@ type serviceMetricLabels struct {
connectionSecurityPolicy string
}

func NewMetric(workloadCache cache.WorkloadCache) *MetricController {
func NewMetric(workloadCache cache.WorkloadCache, enableAccesslog bool) *MetricController {
return &MetricController{
enableAccesslog: enableAccesslog,
workloadCache: workloadCache,
workloadMetricCache: map[workloadMetricLabels]*workloadMetricInfo{},
serviceMetricCache: map[serviceMetricLabels]*serviceMetricInfo{},
Expand Down Expand Up @@ -260,7 +262,7 @@ func (m *MetricController) Run(ctx context.Context, mapOfTcpInfo *ebpf.Map) {
serviceLabels.reporter = "source"
accesslog.direction = "OUTBOUND"
}
if data.state == TCP_CLOSTED && accesslog.sourceWorkload != "-" {
if data.state == TCP_CLOSTED && accesslog.sourceWorkload != "-" && m.enableAccesslog {
OutputAccesslog(data, accesslog)
}
m.mutex.Lock()
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/workload/workload_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type Controller struct {
bpfWorkloadObj *bpf.BpfKmeshWorkload
}

func NewController(bpfWorkload *bpf.BpfKmeshWorkload) *Controller {
func NewController(bpfWorkload *bpf.BpfKmeshWorkload, enableAccesslog bool) *Controller {
c := &Controller{
Processor: NewProcessor(bpfWorkload.SockConn.KmeshCgroupSockWorkloadObjects.KmeshCgroupSockWorkloadMaps),
bpfWorkloadObj: bpfWorkload,
Expand All @@ -54,7 +54,7 @@ func NewController(bpfWorkload *bpf.BpfKmeshWorkload) *Controller {
c.Processor.bpf.RestoreEndpointKeys()
}
c.Rbac = auth.NewRbac(c.Processor.WorkloadCache)
c.MetricController = telemetry.NewMetric(c.Processor.WorkloadCache)
c.MetricController = telemetry.NewMetric(c.Processor.WorkloadCache, enableAccesslog)
return c
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/workload/workload_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ func BenchmarkAddNewServicesWithWorkload(b *testing.B) {
cleanup, bpfLoader := test.InitBpfMap(t, config)
b.Cleanup(cleanup)

workloadController := NewController(bpfLoader.GetBpfKmeshWorkload())
workloadController := NewController(bpfLoader.GetBpfKmeshWorkload(), false)

b.ResetTimer()
for i := 0; i < b.N; i++ {
Expand Down

0 comments on commit 5c72407

Please sign in to comment.