Skip to content

Commit

Permalink
refresh pid map
Browse files Browse the repository at this point in the history
  • Loading branch information
pythops committed Nov 16, 2024
1 parent d1da41b commit b90cad5
Show file tree
Hide file tree
Showing 23 changed files with 181 additions and 1,293 deletions.
26 changes: 24 additions & 2 deletions oryx-ebpf/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@

use aya_ebpf::{
bindings::{TC_ACT_PIPE, TC_ACT_SHOT},
macros::{classifier, map},
macros::{cgroup_sock_addr, classifier, map},
maps::{Array, HashMap, RingBuf},
programs::TcContext,
programs::{SockAddrContext, TcContext},
EbpfContext,
};
use core::mem;
use network_types::{
Expand All @@ -24,6 +25,9 @@ use oryx_common::{
#[map]
static DATA: RingBuf = RingBuf::with_byte_size(4096 * RawPacket::LEN as u32, 0);

#[map]
static PID_DATA: RingBuf = RingBuf::with_byte_size(1024, 0);

#[map]
static NETWORK_FILTERS: Array<u32> = Array::with_max_entries(8, 0);

Expand Down Expand Up @@ -306,6 +310,24 @@ fn process(ctx: TcContext) -> Result<i32, ()> {
Ok(TC_ACT_PIPE)
}

#[cgroup_sock_addr(connect4)]
pub fn socket_connect(ctx: SockAddrContext) -> i32 {
match sock_connect(ctx) {
Ok(ret) => ret,
Err(ret) => ret,
}
}

fn sock_connect(ctx: SockAddrContext) -> Result<i32, i32> {
let pid = ctx.pid();

if let Some(mut buf) = PID_DATA.reserve::<u32>(0) {
unsafe { (*buf.as_mut_ptr()) = pid };
buf.submit(0);
}
Ok(1)
}

#[panic_handler]
fn panic(_info: &core::panic::PanicInfo) -> ! {
unsafe { core::hint::unreachable_unchecked() }
Expand Down
17 changes: 14 additions & 3 deletions oryx-tui/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use ratatui::{
};
use std::{
error,
sync::{Arc, Mutex},
sync::{atomic::AtomicBool, Arc, Mutex},
thread,
time::Duration,
};
Expand All @@ -14,7 +14,7 @@ use crate::{
filter::Filter,
help::Help,
packet::{direction::TrafficDirection, NetworkPacket},
pid,
pid::{self, ConnectionMap},
};

use crate::{filter::IoChannels, notification::Notification};
Expand Down Expand Up @@ -50,6 +50,8 @@ pub struct App {
pub data_channel_sender: kanal::Sender<([u8; RawPacket::LEN], TrafficDirection)>,
pub is_editing: bool,
pub active_popup: Option<ActivePopup>,
pub pid_terminate: Arc<AtomicBool>,
pub pid_map: Arc<Mutex<ConnectionMap>>,
}

impl Default for App {
Expand All @@ -61,19 +63,24 @@ impl Default for App {
impl App {
pub fn new() -> Self {
let packets = Arc::new(Mutex::new(Vec::with_capacity(RawPacket::LEN * 1024 * 1024)));
let pid_map = Arc::new(Mutex::new(ConnectionMap::new()));

let (sender, receiver) = kanal::unbounded();

let firewall_channels = IoChannels::new();
thread::spawn({
let packets = packets.clone();
let pid_map = pid_map.clone();
move || loop {
let pid_map = pid::ConnectionMap::new();
if let Ok((raw_packet, direction)) = receiver.recv() {
let network_packet = NetworkPacket::from(raw_packet);

let pid = {
if direction == TrafficDirection::Egress {
let pid_map = {
let map = pid_map.lock().unwrap();
map.clone()
};
pid::get_pid(network_packet, &pid_map)
} else {
None
Expand Down Expand Up @@ -107,6 +114,8 @@ impl App {
data_channel_sender: sender,
is_editing: false,
active_popup: None,
pid_terminate: Arc::new(AtomicBool::new(false)),
pid_map,
}
}

Expand Down Expand Up @@ -147,6 +156,8 @@ impl App {

pub fn quit(&mut self) {
self.filter.terminate();
self.pid_terminate
.store(true, std::sync::atomic::Ordering::Relaxed);
thread::sleep(Duration::from_millis(110));
self.running = false;
}
Expand Down
5 changes: 3 additions & 2 deletions oryx-tui/src/ebpf.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
pub mod egress;
mod firewall;
pub mod ingress;
pub mod pid;

use std::{io, os::fd::AsRawFd};

Expand All @@ -16,8 +17,8 @@ pub struct RingBuffer<'a> {
}

impl<'a> RingBuffer<'a> {
fn new(ebpf: &'a mut Ebpf) -> Self {
let buffer = RingBuf::try_from(ebpf.map_mut("DATA").unwrap()).unwrap();
fn new(ebpf: &'a mut Ebpf, name: &'a str) -> Self {
let buffer = RingBuf::try_from(ebpf.map_mut(name).unwrap()).unwrap();
Self { buffer }
}

Expand Down
2 changes: 1 addition & 1 deletion oryx-tui/src/ebpf/egress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ pub fn load_egress(
});

// packets reading
let mut ring_buf = RingBuffer::new(&mut bpf);
let mut ring_buf = RingBuffer::new(&mut bpf, "DATA");

poll.registry()
.register(
Expand Down
2 changes: 1 addition & 1 deletion oryx-tui/src/ebpf/ingress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ pub fn load_ingress(
});

// packets reader
let mut ring_buf = RingBuffer::new(&mut bpf);
let mut ring_buf = RingBuffer::new(&mut bpf, "DATA");

poll.registry()
.register(
Expand Down
132 changes: 132 additions & 0 deletions oryx-tui/src/ebpf/pid.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
use std::{
fs::{self, File},
os::fd::AsRawFd,
sync::{atomic::AtomicBool, Arc, Mutex},
thread,
time::Duration,
};

use aya::{
include_bytes_aligned,
programs::{CgroupAttachMode, CgroupSockAddr},
EbpfLoader,
};
use log::error;

use crate::{
event::Event,
notification::{Notification, NotificationLevel},
pid::ConnectionMap,
};
use mio::{unix::SourceFd, Events, Interest, Poll, Token};

use super::RingBuffer;

pub fn load_pid(
pid_map: Arc<Mutex<ConnectionMap>>,
notification_sender: kanal::Sender<Event>,
terminate: Arc<AtomicBool>,
) {
thread::spawn({
move || {
let rlim = libc::rlimit {
rlim_cur: libc::RLIM_INFINITY,
rlim_max: libc::RLIM_INFINITY,
};

unsafe { libc::setrlimit(libc::RLIMIT_MEMLOCK, &rlim) };

#[cfg(debug_assertions)]
let mut bpf = match EbpfLoader::new().load(include_bytes_aligned!(
"../../../target/bpfel-unknown-none/debug/oryx"
)) {
Ok(v) => v,
Err(e) => {
error!("Failed to load the pid eBPF bytecode. {}", e);
Notification::send(
"Failed to load the pid eBPF bytecode",
NotificationLevel::Error,
notification_sender,
)
.unwrap();
return;
}
};

#[cfg(not(debug_assertions))]
let mut bpf = match EbpfLoader::new().load(include_bytes_aligned!(
"../../../target/bpfel-unknown-none/debug/oryx"
)) {
Ok(v) => v,
Err(e) => {
error!("Failed to load the pid eBPF bytecode. {}", e);
Notification::send(
"Failed to load the pid eBPF bytecode",
NotificationLevel::Error,
notification_sender,
)
.unwrap();
return;
}
};

let sock_connect: &mut CgroupSockAddr = bpf
.program_mut("socket_connect")
.unwrap()
.try_into()
.unwrap();
sock_connect.load().unwrap();
let file = File::open("/sys/fs/cgroup/user.slice").unwrap();

sock_connect.attach(file, CgroupAttachMode::Single).unwrap();

let mut poll = Poll::new().unwrap();
let mut events = Events::with_capacity(128);

let mut ring_buf = RingBuffer::new(&mut bpf, "PID_DATA");

poll.registry()
.register(
&mut SourceFd(&ring_buf.buffer.as_raw_fd()),
Token(0),
Interest::READABLE,
)
.unwrap();

loop {
poll.poll(&mut events, Some(Duration::from_millis(100)))
.unwrap();
if terminate.load(std::sync::atomic::Ordering::Relaxed) {
break;
}
for event in &events {
if terminate.load(std::sync::atomic::Ordering::Relaxed) {
break;
}
if event.token() == Token(0) && event.is_readable() {
if terminate.load(std::sync::atomic::Ordering::Relaxed) {
break;
}
while let Some(item) = ring_buf.next() {
if terminate.load(std::sync::atomic::Ordering::Relaxed) {
break;
}
let pid: [u8; 4] = item.to_owned().try_into().unwrap();
let pid = u32::from_ne_bytes(pid);

let fd_dir = format!("/proc/{}/fd", pid);
if let Ok(_fds) = fs::read_dir(&fd_dir) {
let mut map = pid_map.lock().unwrap();
*map = ConnectionMap::new();
}
}
}
}
}

let _ = poll
.registry()
.deregister(&mut SourceFd(&ring_buf.buffer.as_raw_fd()));
}
});
}
6 changes: 6 additions & 0 deletions oryx-tui/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::{

use crate::{
app::{ActivePopup, App, AppResult},
ebpf::pid::load_pid,
event::Event,
filter::FocusedBlock,
section::{stats::Stats, FocusedSection},
Expand All @@ -24,6 +25,11 @@ pub fn handle_key_events(
app.section.stats = Some(Stats::new(app.packets.clone()));
app.filter
.start(event_sender.clone(), app.data_channel_sender.clone())?;
load_pid(
app.pid_map.clone(),
event_sender.clone(),
app.pid_terminate.clone(),
);

sleep(Duration::from_millis(100));
app.start_sniffing = true;
Expand Down
9 changes: 0 additions & 9 deletions test-sock-addr/.gitignore

This file was deleted.

Loading

0 comments on commit b90cad5

Please sign in to comment.