Skip to content

Commit

Permalink
Merge pull request zhboner#39 from zephyrchien/fix-udp-res-leak
Browse files Browse the repository at this point in the history
fix resource leak when keeping receiving udp packets
  • Loading branch information
zhboner authored Sep 19, 2021
2 parents 575625f + b6535cc commit 189c737
Showing 1 changed file with 71 additions and 43 deletions.
114 changes: 71 additions & 43 deletions src/udp.rs
Original file line number Diff line number Diff line change
@@ -1,64 +1,92 @@
use std::collections::HashMap;
use std::io;
use std::time::Duration;
use std::net::{IpAddr, SocketAddr};
use std::sync::{Arc, RwLock};
use std::time::Duration;
use tokio;
use tokio::io;
use std::collections::HashMap;

use tokio::net::UdpSocket;
use tokio::sync::oneshot;
use tokio::time::sleep;
use tokio::time::timeout;

const BUFFERSIZE: usize = 0x4000;
const TIMEOUT: Duration = Duration::from_secs(60 * 15);
const TIMEOUT: Duration = Duration::from_secs(20);

// client <--> allocated socket
type SockMap = Arc<RwLock<HashMap<SocketAddr, Arc<UdpSocket>>>>;

pub async fn transfer_udp(
local_addr: SocketAddr,
remote_port: u16,
remote_ip: Arc<RwLock<IpAddr>>,
) -> io::Result<()> {
// client_addr -> allocated_socket
let mut record = HashMap::new();
let local_socket = Arc::new(UdpSocket::bind(&local_addr).await.unwrap());
let sock_map: SockMap = Arc::new(RwLock::new(HashMap::new()));
let local_sock = Arc::new(UdpSocket::bind(&local_addr).await.unwrap());
let mut buf = vec![0u8; BUFFERSIZE];

loop {
tokio::select! {
_ = async {
let (n, client_addr) = local_socket.recv_from(&mut buf).await?;
if !record.contains_key(&client_addr) {
// pick a random port
let allocated_socket = Arc::new(UdpSocket::bind("0.0.0.0:0").await.unwrap());
let cloned_socket = allocated_socket.clone();
let (tx, rx) = oneshot::channel::<()>();
record.insert(client_addr, (allocated_socket,tx));
tokio::spawn(send_back(
client_addr, local_socket.clone(), cloned_socket, rx
));
}
let (allocated_socket, _) = record.get(&client_addr).unwrap();
let remote_addr: SocketAddr = format!("{}:{}", remote_ip.read().unwrap()
, remote_port).parse().unwrap();
allocated_socket.send_to(&buf[..n], &remote_addr).await?;
Ok::<_, io::Error>(())
} => {}
_ = async { sleep(TIMEOUT).await } => record.clear()
}
let (n, client_addr) = local_sock.recv_from(&mut buf).await?;

let remote_addr = format!("{}:{}", remote_ip.read().unwrap(), remote_port)
.parse::<SocketAddr>()
.unwrap();

// the socket associated with a unique client
let alloc_sock = match get_socket(&sock_map, &client_addr) {
Some(x) => x,
None => alloc_new_socket(
&sock_map, client_addr, &remote_addr, local_sock.clone()
).await
};

alloc_sock.send_to(&buf[..n], &remote_addr).await?;
}
}

async fn send_back(
sock_map: SockMap,
client_addr: SocketAddr,
local_socket: Arc<UdpSocket>,
allocated_socket: Arc<UdpSocket>,
rx: oneshot::Receiver<()>,
) -> io::Result<()> {
local_sock: Arc<UdpSocket>,
alloc_sock: Arc<UdpSocket>,
){
let mut buf = vec![0u8; BUFFERSIZE];
tokio::select! {
ret = async {
loop {
let (n, _) = allocated_socket.recv_from(&mut buf).await?;
local_socket.send_to(&buf[..n], &client_addr).await?;
}
} => { ret }
_ = rx => Ok(())

while let Ok(Ok((n, _))) = timeout(
TIMEOUT, alloc_sock.recv_from(&mut buf)
).await {
if local_sock.send_to(&buf[..n], &client_addr).await.is_err() {
break;
}
}

sock_map.write().unwrap().remove(&client_addr);
}

#[inline]
fn get_socket(
sock_map: &SockMap,
client_addr: &SocketAddr,
) -> Option<Arc<UdpSocket>> {
let alloc_sock = sock_map.read().unwrap();
alloc_sock.get(client_addr).cloned()
// drop the lock
}

async fn alloc_new_socket(
sock_map: &SockMap,
client_addr: SocketAddr,
remote_addr: &SocketAddr,
local_sock: Arc<UdpSocket>
) -> Arc<UdpSocket>{
// pick a random port
let alloc_sock = Arc::new(if remote_addr.is_ipv4(){
UdpSocket::bind("0.0.0.0:0").await.unwrap()
} else {
UdpSocket::bind("[::]:0").await.unwrap()
});

// new send back task
tokio::spawn(send_back(sock_map.clone(), client_addr, local_sock, alloc_sock.clone()));

sock_map.write().unwrap().insert(client_addr, alloc_sock.clone());
alloc_sock
// drop the lock
}

0 comments on commit 189c737

Please sign in to comment.