diff --git a/bpf/enhancedrecording/command.bpf.c b/bpf/enhancedrecording/command.bpf.c index cecd2272b9d1d..b2b0d3a37b508 100644 --- a/bpf/enhancedrecording/command.bpf.c +++ b/bpf/enhancedrecording/command.bpf.c @@ -3,6 +3,7 @@ #include /* for BPF CO-RE helpers */ #include /* for getting kprobe arguments */ +#include "./common.h" #include "../helpers.h" #define ARGSIZE 128 @@ -13,6 +14,9 @@ // the userspace can adjust this value based on config. #define EVENTS_BUF_SIZE (4096*8) +// hashmap keeps all cgroups id that should be monitored by Teleport. +BPF_HASH(monitored_cgroups, u64, int64_t, MAX_MONITORED_SESSIONS); + char LICENSE[] SEC("license") = "Dual BSD/GPL"; enum event_type { @@ -61,9 +65,18 @@ static int enter_execve(const char *filename, // create data here and pass to submit_arg to save stack space (#555) struct data_t data = {}; struct task_struct *task; + u64 cgroup = bpf_get_current_cgroup_id(); + u64 *is_monitored; + + // Check if the cgroup should be monitored. + is_monitored = bpf_map_lookup_elem(&monitored_cgroups, &cgroup); + if (is_monitored == NULL) { + // Missed entry. + return 0; + } data.pid = bpf_get_current_pid_tgid() >> 32; - data.cgroup = bpf_get_current_cgroup_id(); + data.cgroup = cgroup; task = (struct task_struct *)bpf_get_current_task(); data.ppid = BPF_CORE_READ(task, real_parent, tgid); @@ -90,9 +103,18 @@ static int exit_execve(int ret) { struct data_t data = {}; struct task_struct *task; + u64 cgroup = bpf_get_current_cgroup_id(); + u64 *is_monitored; + + // Check if the cgroup should be monitored. + is_monitored = bpf_map_lookup_elem(&monitored_cgroups, &cgroup); + if (is_monitored == NULL) { + // cgroup has not been marked for monitoring, ignore. + return 0; + } data.pid = bpf_get_current_pid_tgid() >> 32; - data.cgroup = bpf_get_current_cgroup_id(); + data.cgroup = cgroup; task = (struct task_struct *)bpf_get_current_task(); data.ppid = BPF_CORE_READ(task, real_parent, tgid); diff --git a/bpf/enhancedrecording/common.h b/bpf/enhancedrecording/common.h new file mode 100644 index 0000000000000..f48cdc4826549 --- /dev/null +++ b/bpf/enhancedrecording/common.h @@ -0,0 +1,7 @@ +#ifndef BPF_COMMON_H +#define BPF_COMMON_H + +// Maximum monitored sessions. +#define MAX_MONITORED_SESSIONS 1024 + +#endif // BPF_COMMON_H \ No newline at end of file diff --git a/bpf/enhancedrecording/disk.bpf.c b/bpf/enhancedrecording/disk.bpf.c index 5836f7f280a34..480d48635de90 100644 --- a/bpf/enhancedrecording/disk.bpf.c +++ b/bpf/enhancedrecording/disk.bpf.c @@ -4,6 +4,7 @@ #include /* for BPF CO-RE helpers */ #include /* for getting kprobe arguments */ +#include "./common.h" #include "../helpers.h" // Maximum number of in-flight open syscalls supported @@ -14,8 +15,6 @@ // the userspace can adjust this value based on config. #define EVENTS_BUF_SIZE (4096*128) -// Maximum monitored sessions. -#define MAX_MONITORED_SESSIONS 1024 char LICENSE[] SEC("license") = "Dual BSD/GPL"; @@ -73,7 +72,7 @@ static int exit_open(int ret) { // Check if the cgroup should be monitored. is_monitored = bpf_map_lookup_elem(&monitored_cgroups, &cgroup); if (is_monitored == NULL) { - // Missed entry. + // cgroup has not been marked for monitoring, ignore. return 0; } diff --git a/bpf/enhancedrecording/network.bpf.c b/bpf/enhancedrecording/network.bpf.c index 51d20cc6b809d..9f104909f75e8 100644 --- a/bpf/enhancedrecording/network.bpf.c +++ b/bpf/enhancedrecording/network.bpf.c @@ -3,6 +3,7 @@ #include /* for BPF CO-RE helpers */ #include /* for getting kprobe arguments */ +#include "./common.h" #include "../helpers.h" char LICENSE[] SEC("license") = "Dual BSD/GPL"; @@ -17,6 +18,9 @@ char LICENSE[] SEC("license") = "Dual BSD/GPL"; BPF_HASH(currsock, u32, struct sock *, INFLIGHT_MAX); +// hashmap keeps all cgroups id that should be monitored by Teleport. +BPF_HASH(monitored_cgroups, u64, int64_t, MAX_MONITORED_SESSIONS); + // separate data structs for ipv4 and ipv6 struct ipv4_data_t { u64 cgroup; @@ -55,7 +59,16 @@ static int trace_connect_entry(struct sock *sk) static int trace_connect_return(int ret, short ipver) { u64 pid_tgid = bpf_get_current_pid_tgid(); - u32 id = (u32)pid_tgid; + u64 cgroup = bpf_get_current_cgroup_id(); + u32 id = (u32)pid_tgid; + u64 *is_monitored; + + // Check if the cgroup should be monitored. + is_monitored = bpf_map_lookup_elem(&monitored_cgroups, &cgroup); + if (is_monitored == NULL) { + // cgroup has not been marked for monitoring, ignore. + return 0; + } struct sock **skpp; skpp = bpf_map_lookup_elem(&currsock, &id); diff --git a/lib/bpf/bpf.go b/lib/bpf/bpf.go index 96e302393544b..4d532d1e08c0d 100644 --- a/lib/bpf/bpf.go +++ b/lib/bpf/bpf.go @@ -233,9 +233,24 @@ func (s *Service) OpenSession(ctx *SessionContext) (uint64, error) { return 0, trace.Wrap(err) } - // Register cgroup in the BPF module. - if err := s.open.startSession(cgroupID); err != nil { - return 0, trace.Wrap(err) + // initializedModClosures holds all already opened modules closures. + initializedModClosures := make([]interface{ endSession(uint64) error }, 0) + for _, module := range []cgroupRegister{ + s.open, + s.exec, + s.conn, + } { + // Register cgroup in the BPF module. + if err := module.startSession(cgroupID); err != nil { + // Clean up all already opened modules. + for _, closer := range initializedModClosures { + if closeErr := closer.endSession(cgroupID); closeErr != nil { + log.Debugf("failed to close session: %v", closeErr) + } + } + return 0, trace.Wrap(err) + } + initializedModClosures = append(initializedModClosures, module) } // Start watching for any events that come from this cgroup. @@ -268,9 +283,15 @@ func (s *Service) CloseSession(ctx *SessionContext) error { errs = append(errs, trace.Wrap(err)) } - // Remove the cgroup from BPF module. - if err := s.open.endSession(cgroupID); err != nil { - errs = append(errs, trace.Wrap(err)) + for _, module := range []interface{ endSession(cgroupID uint64) error }{ + s.open, + s.exec, + s.conn, + } { + // Remove the cgroup from BPF module. + if err := module.endSession(cgroupID); err != nil { + errs = append(errs, trace.Wrap(err)) + } } return trace.NewAggregate(errs...) diff --git a/lib/bpf/bpf_test.go b/lib/bpf/bpf_test.go index b854d8e596e9c..c2e7517e3f23b 100644 --- a/lib/bpf/bpf_test.go +++ b/lib/bpf/bpf_test.go @@ -178,9 +178,7 @@ func TestRootObfuscate(t *testing.T) { for { select { case <-ticker.C: - if err := osexec.Command(fileName).Run(); err != nil { - t.Logf("Failed to run script: %v.", err) - } + runCmd(t, reexecInCGroupCmd, fileName, execsnoop) case <-done: return } @@ -241,10 +239,8 @@ func TestRootScript(t *testing.T) { case <-done: return case <-ticker.C: - // Run script. - if err := osexec.Command(fileName).Run(); err != nil { - t.Logf("Failed to run script: %v.", err) - } + // Run script in a cgroup. + runCmd(t, reexecInCGroupCmd, fileName, execsnoop) } } }() @@ -304,20 +300,18 @@ func TestRootPrograms(t *testing.T) { // Loop over all three programs and make sure events are received off the // perf buffer. var tests = []struct { - inName string - inCommand string - inCommandArgs []string - inEventCh <-chan []byte - inHTTP bool - verifyFn func(event []byte) bool + inName string + inEventCh <-chan []byte + genEvents func(t *testing.T, ctx context.Context) + verifyFn func(event []byte) bool }{ // Run execsnoop with "ls". { - inName: "execsnoop", - inCommand: "ls", - inCommandArgs: []string{}, - inEventCh: execsnoop.events(), - inHTTP: false, + inName: "execsnoop", + inEventCh: execsnoop.events(), + genEvents: func(t *testing.T, ctx context.Context) { + executeCommand(t, ctx, "ls", execsnoop) + }, verifyFn: func(event []byte) bool { var e rawExecEvent err := unmarshalEvent(event, &e) @@ -327,11 +321,11 @@ func TestRootPrograms(t *testing.T) { // Run opensnoop with "ls". This is fine because "ls" will open some // shared library. { - inName: "opensnoop", - inCommand: "ls", - inCommandArgs: []string{}, - inEventCh: opensnoop.events(), - inHTTP: false, + inName: "opensnoop", + inEventCh: opensnoop.events(), + genEvents: func(t *testing.T, ctx context.Context) { + executeCommand(t, ctx, "ls", opensnoop) + }, verifyFn: func(event []byte) bool { var e rawOpenEvent err := unmarshalEvent(event, &e) @@ -342,7 +336,9 @@ func TestRootPrograms(t *testing.T) { { inName: "tcpconnect", inEventCh: tcpconnect.v4Events(), - inHTTP: true, + genEvents: func(t *testing.T, ctx context.Context) { + executeHTTP(t, ctx, ts.URL, tcpconnect) + }, verifyFn: func(event []byte) bool { var e rawConn4Event err := unmarshalEvent(event, &e) @@ -359,11 +355,8 @@ func TestRootPrograms(t *testing.T) { // second will continue to execute or an HTTP GET in a processAccessEvents attempting to // trigger an event. go waitForEvent(doneContext, doneFunc, tt.inEventCh, tt.verifyFn) - if tt.inHTTP { - go executeHTTP(t, doneContext, ts.URL) - } else { - go executeCommand(t, doneContext, tt.inCommand, opensnoop) - } + + go tt.genEvents(t, doneContext) // Wait for an event to arrive from execsnoop. If an event does not arrive // within 10 seconds, timeout. @@ -526,14 +519,17 @@ func executeCommand(t *testing.T, doneContext context.Context, file string, t.Logf("Failed to find executable %q: %v.", file, err) } - runCmd(t, path, traceCgroup) + fullPath, err := osexec.LookPath(path) + require.NoError(t, err) + + runCmd(t, reexecInCGroupCmd, fullPath, traceCgroup) case <-doneContext.Done(): return } } } -func runCmd(t *testing.T, cmdName string, traceCgroup cgroupRegister) { +func runCmd(t *testing.T, reexecCmd string, arg string, traceCgroup cgroupRegister) { t.Helper() // Create a pipe to communicate with the child process after re-exec. @@ -545,11 +541,8 @@ func runCmd(t *testing.T, cmdName string, traceCgroup cgroupRegister) { writeP.Close() }) - path, err := osexec.LookPath(cmdName) - require.NoError(t, err) - // Re-exec the test binary. We can then move the binary to a new cgroup. - cmd := osexec.Command(os.Args[0], reexecInCGroupCmd, path) + cmd := osexec.Command(os.Args[0], reexecCmd, arg) cmd.ExtraFiles = append(cmd.ExtraFiles, readP) @@ -578,7 +571,7 @@ func runCmd(t *testing.T, cmdName string, traceCgroup cgroupRegister) { } // executeHTTP will perform a HTTP GET to some endpoint in a loop. -func executeHTTP(t *testing.T, doneContext context.Context, endpoint string) { +func executeHTTP(t *testing.T, doneContext context.Context, endpoint string, traceCgroup cgroupRegister) { t.Helper() ticker := time.NewTicker(250 * time.Millisecond) @@ -592,6 +585,8 @@ func executeHTTP(t *testing.T, doneContext context.Context, endpoint string) { t.Logf("HTTP request failed: %v.", err) } + runCmd(t, networkInCgroupCmd, endpoint, traceCgroup) + case <-doneContext.Done(): return } diff --git a/lib/bpf/command.go b/lib/bpf/command.go index bfa2682b9bf1c..88998a4ab57b3 100644 --- a/lib/bpf/command.go +++ b/lib/bpf/command.go @@ -69,7 +69,7 @@ type rawExecEvent struct { } type exec struct { - module *libbpfgo.Module + session eventBuf *RingBuffer lost *Counter @@ -90,36 +90,36 @@ func startExec(bufferSize int) (*exec, error) { return nil, trace.Wrap(err) } - e.module, err = libbpfgo.NewModuleFromBuffer(commandBPF, "command") + e.session.module, err = libbpfgo.NewModuleFromBuffer(commandBPF, "command") if err != nil { return nil, trace.Wrap(err) } // Resizing the ring buffer must be done here, after the module // was created but before it's loaded into the kernel. - if err = ResizeMap(e.module, commandEventsBuffer, uint32(bufferSize*pageSize)); err != nil { + if err = ResizeMap(e.session.module, commandEventsBuffer, uint32(bufferSize*pageSize)); err != nil { return nil, trace.Wrap(err) } // Load into the kernel - if err = e.module.BPFLoadObject(); err != nil { + if err = e.session.module.BPFLoadObject(); err != nil { return nil, trace.Wrap(err) } syscalls := []string{"execve", "execveat"} for _, syscall := range syscalls { - if err = AttachSyscallTracepoint(e.module, syscall); err != nil { + if err = AttachSyscallTracepoint(e.session.module, syscall); err != nil { return nil, trace.Wrap(err) } } - e.eventBuf, err = NewRingBuffer(e.module, commandEventsBuffer) + e.eventBuf, err = NewRingBuffer(e.session.module, commandEventsBuffer) if err != nil { return nil, trace.Wrap(err) } - e.lost, err = NewCounter(e.module, "lost", lostCommandEvents) + e.lost, err = NewCounter(e.session.module, "lost", lostCommandEvents) if err != nil { return nil, trace.Wrap(err) } @@ -132,7 +132,7 @@ func startExec(bufferSize int) (*exec, error) { func (e *exec) close() { e.lost.Close() e.eventBuf.Close() - e.module.Close() + e.session.module.Close() } // events contains raw events off the perf buffer. diff --git a/lib/bpf/common_linux.go b/lib/bpf/common_linux.go new file mode 100644 index 0000000000000..e5ae94aab7df4 --- /dev/null +++ b/lib/bpf/common_linux.go @@ -0,0 +1,66 @@ +//go:build bpf && !386 +// +build bpf,!386 + +/* + * + * Copyright 2023 Gravitational, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package bpf + +import ( + "unsafe" + + "github.com/aquasecurity/libbpfgo" + "github.com/gravitational/trace" +) + +const monitoredCGroups = "monitored_cgroups" + +type session struct { + module *libbpfgo.Module +} + +// startSession registers the given cgroup in the BPF module. Only registered +// cgroups will return events to the userspace. +func (s *session) startSession(cgroupID uint64) error { + cgroupMap, err := s.module.GetMap(monitoredCGroups) + if err != nil { + return trace.Wrap(err) + } + + dummyVal := 0 + err = cgroupMap.Update(unsafe.Pointer(&cgroupID), unsafe.Pointer(&dummyVal)) + if err != nil { + return trace.Wrap(err) + } + + return nil +} + +// endSession removes the previously registered cgroup from the BPF module. +func (s *session) endSession(cgroupID uint64) error { + cgroupMap, err := s.module.GetMap(monitoredCGroups) + if err != nil { + return trace.Wrap(err) + } + + if err := cgroupMap.DeleteKey(unsafe.Pointer(&cgroupID)); err != nil { + return trace.Wrap(err) + } + + return nil +} diff --git a/lib/bpf/common_test.go b/lib/bpf/common_test.go index 7a28e294177df..a36ec29b8a018 100644 --- a/lib/bpf/common_test.go +++ b/lib/bpf/common_test.go @@ -18,6 +18,7 @@ package bpf import ( "io" + "net/http" "os" osexec "os/exec" "testing" @@ -29,18 +30,36 @@ import ( "github.com/gravitational/teleport/lib/utils" ) -// reexecInCGroupCmd is a cmd argument used to re-exec the test binary. -const reexecInCGroupCmd = "reexecCgroup" +const ( + // reexecInCGroupCmd is a cmd used to re-exec the test binary and call arbitrary program. + reexecInCGroupCmd = "reexecCgroup" + // networkInCgroupCmd is a cmd used to re-exec the test binary and make HTTP call. + networkInCgroupCmd = "networkCgroup" +) func TestMain(m *testing.M) { utils.InitLoggerForTests() // Check if the re-exec was requested. - if len(os.Args) >= 3 && os.Args[1] == reexecInCGroupCmd { - // Get the command to run passed as the 3rd argument. - cmd := os.Args[2] + if len(os.Args) == 3 { + var err error + + switch os.Args[1] { + case reexecInCGroupCmd: + // Get the command to run passed as the 3rd argument. + cmd := os.Args[2] + + err = waitAndRun(cmd) + case networkInCgroupCmd: + // Get the endpoint to call. + endpoint := os.Args[2] + + err = callEndpoint(endpoint) + default: + os.Exit(2) + } - if err := waitAndRun(cmd); err != nil { + if err != nil { // Something went wrong, exit with error. os.Exit(1) } @@ -101,9 +120,35 @@ func TestCheckAndSetDefaults(t *testing.T) { } } -// waitAndRun opens FD 3 and waits for at least one byte. After it runs the +// waitAndRun wait for continue signal to be generated an executes the // passed command and waits until returns. func waitAndRun(cmd string) error { + if err := waitForContinue(); err != nil { + return err + } + + return osexec.Command(cmd).Run() +} + +// callEndpoint wait for continue signal to be generated an executes HTTP GET +// on provided endpoint. +func callEndpoint(endpoint string) error { + if err := waitForContinue(); err != nil { + return err + } + + resp, err := http.Get(endpoint) + if resp != nil { + // Close the body to make our linter happy. + _ = resp.Body.Close() + } + + return err +} + +// waitForContinue opens FD 3 and waits the signal from parent process that +// the cgroup is being observed and the even can be generated. +func waitForContinue() error { waitFD := os.NewFile(3, "/proc/self/fd/3") defer waitFD.Close() @@ -112,5 +157,5 @@ func waitAndRun(cmd string) error { return err } - return osexec.Command(cmd).Run() + return nil } diff --git a/lib/bpf/disk.go b/lib/bpf/disk.go index dc1639877d6c5..fd194fc7d9026 100644 --- a/lib/bpf/disk.go +++ b/lib/bpf/disk.go @@ -22,7 +22,6 @@ package bpf import ( _ "embed" "runtime" - "unsafe" "github.com/aquasecurity/libbpfgo" "github.com/gravitational/trace" @@ -43,7 +42,6 @@ var ( const ( diskEventsBuffer = "open_events" - monitoredCGroups = "monitored_cgroups" ) // rawOpenEvent is sent by the eBPF program that Teleport pulls off the perf @@ -74,7 +72,7 @@ type cgroupRegister interface { } type open struct { - module *libbpfgo.Module + session eventBuf *RingBuffer lost *Counter @@ -95,19 +93,19 @@ func startOpen(bufferSize int) (*open, error) { return nil, trace.Wrap(err) } - o.module, err = libbpfgo.NewModuleFromBuffer(diskBPF, "disk") + o.session.module, err = libbpfgo.NewModuleFromBuffer(diskBPF, "disk") if err != nil { return nil, trace.Wrap(err) } // Resizing the ring buffer must be done here, after the module // was created but before it's loaded into the kernel. - if err = ResizeMap(o.module, diskEventsBuffer, uint32(bufferSize*pageSize)); err != nil { + if err = ResizeMap(o.session.module, diskEventsBuffer, uint32(bufferSize*pageSize)); err != nil { return nil, trace.Wrap(err) } // Load into the kernel - if err = o.module.BPFLoadObject(); err != nil { + if err = o.session.module.BPFLoadObject(); err != nil { return nil, trace.Wrap(err) } @@ -119,17 +117,17 @@ func startOpen(bufferSize int) (*open, error) { } for _, syscall := range syscalls { - if err = AttachSyscallTracepoint(o.module, syscall); err != nil { + if err = AttachSyscallTracepoint(o.session.module, syscall); err != nil { return nil, trace.Wrap(err) } } - o.eventBuf, err = NewRingBuffer(o.module, diskEventsBuffer) + o.eventBuf, err = NewRingBuffer(o.session.module, diskEventsBuffer) if err != nil { return nil, trace.Wrap(err) } - o.lost, err = NewCounter(o.module, "lost", lostDiskEvents) + o.lost, err = NewCounter(o.session.module, "lost", lostDiskEvents) if err != nil { return nil, trace.Wrap(err) } @@ -142,41 +140,10 @@ func startOpen(bufferSize int) (*open, error) { func (o *open) close() { o.lost.Close() o.eventBuf.Close() - o.module.Close() + o.session.module.Close() } // events contains raw events off the perf buffer. func (o *open) events() <-chan []byte { return o.eventBuf.EventCh } - -// startSession registers the given cgroup in the BPF module. Only registered -// cgroups will return events to the userspace. -func (o *open) startSession(cgroupID uint64) error { - cgroupMap, err := o.module.GetMap(monitoredCGroups) - if err != nil { - return trace.Wrap(err) - } - - dummyVal := 0 - err = cgroupMap.Update(unsafe.Pointer(&cgroupID), unsafe.Pointer(&dummyVal)) - if err != nil { - return trace.Wrap(err) - } - - return nil -} - -// endSession removes the previously registered cgroup from the BPF module. -func (o *open) endSession(cgroupID uint64) error { - cgroupMap, err := o.module.GetMap(monitoredCGroups) - if err != nil { - return trace.Wrap(err) - } - - if err := cgroupMap.DeleteKey(unsafe.Pointer(&cgroupID)); err != nil { - return trace.Wrap(err) - } - - return nil -} diff --git a/lib/bpf/network.go b/lib/bpf/network.go index 5238703661bff..fd68e2553fc35 100644 --- a/lib/bpf/network.go +++ b/lib/bpf/network.go @@ -95,7 +95,7 @@ type rawConn6Event struct { } type conn struct { - module *libbpfgo.Module + session event4Buf *RingBuffer event6Buf *RingBuffer @@ -116,45 +116,45 @@ func startConn(bufferSize int) (*conn, error) { return nil, trace.Wrap(err) } - c.module, err = libbpfgo.NewModuleFromBuffer(networkBPF, "network") + c.session.module, err = libbpfgo.NewModuleFromBuffer(networkBPF, "network") if err != nil { return nil, trace.Wrap(err) } // Resizing the ring buffer must be done here, after the module // was created but before it's loaded into the kernel. - if err = ResizeMap(c.module, network4EventsBuffer, uint32(bufferSize*pageSize)); err != nil { + if err = ResizeMap(c.session.module, network4EventsBuffer, uint32(bufferSize*pageSize)); err != nil { return nil, trace.Wrap(err) } - if err = ResizeMap(c.module, network6EventsBuffer, uint32(bufferSize*pageSize)); err != nil { + if err = ResizeMap(c.session.module, network6EventsBuffer, uint32(bufferSize*pageSize)); err != nil { return nil, trace.Wrap(err) } // Load into the kernel - if err = c.module.BPFLoadObject(); err != nil { + if err = c.session.module.BPFLoadObject(); err != nil { return nil, trace.Wrap(err) } - if err = AttachKprobe(c.module, "tcp_v4_connect"); err != nil { + if err = AttachKprobe(c.session.module, "tcp_v4_connect"); err != nil { return nil, trace.Wrap(err) } - if err = AttachKprobe(c.module, "tcp_v6_connect"); err != nil { + if err = AttachKprobe(c.session.module, "tcp_v6_connect"); err != nil { return nil, trace.Wrap(err) } - c.event4Buf, err = NewRingBuffer(c.module, network4EventsBuffer) + c.event4Buf, err = NewRingBuffer(c.session.module, network4EventsBuffer) if err != nil { return nil, trace.Wrap(err) } - c.event6Buf, err = NewRingBuffer(c.module, network6EventsBuffer) + c.event6Buf, err = NewRingBuffer(c.session.module, network6EventsBuffer) if err != nil { return nil, trace.Wrap(err) } - c.lost, err = NewCounter(c.module, "lost", lostNetworkEvents) + c.lost, err = NewCounter(c.session.module, "lost", lostNetworkEvents) if err != nil { return nil, trace.Wrap(err) } @@ -168,7 +168,7 @@ func (c *conn) close() { c.lost.Close() c.event4Buf.Close() c.event6Buf.Close() - c.module.Close() + c.session.module.Close() } // v4Events contains raw events off the perf buffer.