Skip to content

Commit

Permalink
perf(phantun) spawn multiple threads for UDP send/receive
Browse files Browse the repository at this point in the history
  • Loading branch information
dndx committed Apr 9, 2022
1 parent dff0c4c commit 35f7b35
Show file tree
Hide file tree
Showing 3 changed files with 167 additions and 69 deletions.
1 change: 1 addition & 0 deletions phantun/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ clap = { version = "3.0", features = ["cargo"] }
socket2 = { version = "0.4", features = ["all"] }
fake-tcp = { path = "../fake-tcp", version = "0.2" }
tokio = { version = "1.14", features = ["full"] }
tokio-util = "0.7"
log = "0.4"
pretty_env_logger = "0.4"
tokio-tun = "0.5"
Expand Down
108 changes: 72 additions & 36 deletions phantun/src/bin/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ use std::net::{Ipv4Addr, SocketAddr};
use std::sync::Arc;
use std::time::Duration;
use tokio::net::UdpSocket;
use tokio::sync::RwLock;
use tokio::sync::{Notify, RwLock};
use tokio::time;
use tokio_tun::TunBuilder;
use tokio_util::sync::CancellationToken;

const UDP_TTL: Duration = Duration::from_secs(180);

Expand Down Expand Up @@ -119,14 +120,16 @@ async fn main() {
.parse()
.expect("bad peer address for Tun interface");

let num_cpus = num_cpus::get();

let tun = TunBuilder::new()
.name(matches.value_of("tun").unwrap()) // if name is empty, then it is set by kernel.
.tap(false) // false (default): TUN, true: TAP.
.packet_info(false) // false: IFF_NO_PI, default is true.
.up() // or set it up manually using `sudo ip link set <tun-name> up`.
.address(tun_local)
.destination(tun_peer)
.try_build_mq(num_cpus::get())
.try_build_mq(num_cpus)
.unwrap();

info!("Created TUN device {}", tun[0].name());
Expand Down Expand Up @@ -168,52 +171,85 @@ async fn main() {
assert!(connections.write().await.insert(addr, sock.clone()).is_none());
debug!("inserted fake TCP socket into connection table");

let connections = connections.clone();

// spawn "fastpath" UDP socket and task, this will offload main task
// from forwarding UDP packets
tokio::spawn(async move {
let mut buf_udp = [0u8; MAX_PACKET_LEN];
let mut buf_tcp = [0u8; MAX_PACKET_LEN];
let udp_sock = new_udp_reuseport(local_addr);
udp_sock.connect(addr).await.unwrap();

let packet_received = Arc::new(Notify::new());
let quit = CancellationToken::new();

for i in 0..num_cpus {
let sock = sock.clone();
let quit = quit.child_token();
let packet_received = packet_received.clone();

tokio::spawn(async move {
let mut buf_udp = [0u8; MAX_PACKET_LEN];
let mut buf_tcp = [0u8; MAX_PACKET_LEN];
let udp_sock = new_udp_reuseport(local_addr);
udp_sock.connect(addr).await.unwrap();

loop {
tokio::select! {
Ok(size) = udp_sock.recv(&mut buf_udp) => {
if sock.send(&buf_udp[..size]).await.is_none() {
debug!("removed fake TCP socket from connections table");
quit.cancel();
return;
}

packet_received.notify_one();
},
res = sock.recv(&mut buf_tcp) => {
match res {
Some(size) => {
if size > 0 {
if let Err(e) = udp_sock.send(&buf_tcp[..size]).await {
error!("Unable to send UDP packet to {}: {}, closing connection", e, addr);
quit.cancel();
return;
}
}
},
None => {
debug!("removed fake TCP socket from connections table");
quit.cancel();
return;
},
}

packet_received.notify_one();
},
_ = quit.cancelled() => {
debug!("worker {} terminated", i);
return;
},
};
}
});
}

let connections = connections.clone();
tokio::spawn(async move {
loop {
let read_timeout = time::sleep(UDP_TTL);
let packet_received_fut = packet_received.notified();

tokio::select! {
Ok(size) = udp_sock.recv(&mut buf_udp) => {
if sock.send(&buf_udp[..size]).await.is_none() {
connections.write().await.remove(&addr);
debug!("removed fake TCP socket from connections table");
return;
}
},
res = sock.recv(&mut buf_tcp) => {
match res {
Some(size) => {
if size > 0 {
if let Err(e) = udp_sock.send(&buf_tcp[..size]).await {
connections.write().await.remove(&addr);
error!("Unable to send UDP packet to {}: {}, closing connection", e, addr);
return;
}
}
},
None => {
connections.write().await.remove(&addr);
debug!("removed fake TCP socket from connections table");
return;
},
}
},
_ = read_timeout => {
info!("No traffic seen in the last {:?}, closing connection", UDP_TTL);
connections.write().await.remove(&addr);
debug!("removed fake TCP socket from connections table");

quit.cancel();
return;
}
};
},
_ = quit.cancelled() => {
connections.write().await.remove(&addr);
debug!("removed fake TCP socket from connections table");
return;
},
_ = packet_received_fut => {},
}
}
});
},
Expand Down
127 changes: 94 additions & 33 deletions phantun/src/bin/server.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,36 @@
use clap::{crate_version, Arg, Command};
use fake_tcp::packet::MAX_PACKET_LEN;
use fake_tcp::Stack;
use log::{error, info};
use std::net::Ipv4Addr;
use log::{debug, error, info};
use std::net::{Ipv4Addr, SocketAddr};
use std::sync::Arc;
use tokio::net::UdpSocket;
use tokio::sync::Notify;
use tokio::time::{self, Duration};
use tokio_tun::TunBuilder;
use tokio_util::sync::CancellationToken;
const UDP_TTL: Duration = Duration::from_secs(180);

fn new_udp_reuseport(addr: SocketAddr) -> UdpSocket {
let udp_sock = socket2::Socket::new(
if addr.is_ipv4() {
socket2::Domain::IPV4
} else {
socket2::Domain::IPV6
},
socket2::Type::DGRAM,
None,
)
.unwrap();
udp_sock.set_reuse_port(true).unwrap();
// from tokio-rs/mio/blob/master/src/sys/unix/net.rs
udp_sock.set_cloexec(true).unwrap();
udp_sock.set_nonblocking(true).unwrap();
udp_sock.bind(&socket2::SockAddr::from(addr)).unwrap();
let udp_sock: std::net::UdpSocket = udp_sock.into();
udp_sock.try_into().unwrap()
}

#[tokio::main]
async fn main() {
pretty_env_logger::init();
Expand Down Expand Up @@ -88,14 +111,16 @@ async fn main() {
.parse()
.expect("bad peer address for Tun interface");

let num_cpus = num_cpus::get();

let tun = TunBuilder::new()
.name(matches.value_of("tun").unwrap()) // if name is empty, then it is set by kernel.
.tap(false) // false (default): TUN, true: TAP.
.packet_info(false) // false: IFF_NO_PI, default is true.
.up() // or set it up manually using `sudo ip link set <tun-name> up`.
.address(tun_local)
.destination(tun_peer)
.try_build_mq(num_cpus::get())
.try_build_mq(num_cpus)
.unwrap();

info!("Created TUN device {}", tun[0].name());
Expand All @@ -110,46 +135,82 @@ async fn main() {
let mut buf_tcp = [0u8; MAX_PACKET_LEN];

loop {
let sock = stack.accept().await;
let sock = Arc::new(stack.accept().await);
info!("New connection: {}", sock);

tokio::spawn(async move {
let udp_sock = UdpSocket::bind(if remote_addr.is_ipv4() {
"0.0.0.0:0"
} else {
"[::]:0"
})
.await
.unwrap();
udp_sock.connect(remote_addr).await.unwrap();
let packet_received = Arc::new(Notify::new());
let quit = CancellationToken::new();
let udp_sock = UdpSocket::bind(if remote_addr.is_ipv4() {
"0.0.0.0:0"
} else {
"[::]:0"
})
.await
.unwrap();
let local_addr = udp_sock.local_addr().unwrap();
drop(udp_sock);

for i in 0..num_cpus {
let sock = sock.clone();
let quit = quit.child_token();
let packet_received = packet_received.clone();
let udp_sock = new_udp_reuseport(local_addr);

tokio::spawn(async move {
udp_sock.connect(remote_addr).await.unwrap();

loop {
tokio::select! {
Ok(size) = udp_sock.recv(&mut buf_udp) => {
if sock.send(&buf_udp[..size]).await.is_none() {
quit.cancel();
return;
}

packet_received.notify_one();
},
res = sock.recv(&mut buf_tcp) => {
match res {
Some(size) => {
if size > 0 {
if let Err(e) = udp_sock.send(&buf_tcp[..size]).await {
error!("Unable to send UDP packet to {}: {}, closing connection", e, remote_addr);
quit.cancel();
return;
}
}
},
None => {
quit.cancel();
return;
},
}

packet_received.notify_one();
},
_ = quit.cancelled() => {
debug!("worker {} terminated", i);
return;
},
};
}
});
}

tokio::spawn(async move {
loop {
let read_timeout = time::sleep(UDP_TTL);
let packet_received_fut = packet_received.notified();

tokio::select! {
Ok(size) = udp_sock.recv(&mut buf_udp) => {
if sock.send(&buf_udp[..size]).await.is_none() {
return;
}
},
res = sock.recv(&mut buf_tcp) => {
match res {
Some(size) => {
if size > 0 {
if let Err(e) = udp_sock.send(&buf_tcp[..size]).await {
error!("Unable to send UDP packet to {}: {}, closing connection", e, remote_addr);
return;
}
}
},
None => { return; },
}
},
_ = read_timeout => {
info!("No traffic seen in the last {:?}, closing connection", UDP_TTL);

quit.cancel();
return;
}
};
},
_ = packet_received_fut => {},
}
}
});
}
Expand Down

0 comments on commit 35f7b35

Please sign in to comment.