Skip to content

Commit

Permalink
chore: derive address from connection
Browse files Browse the repository at this point in the history
  • Loading branch information
b-zee authored and bochaco committed Dec 12, 2022
1 parent 399eeda commit 092ce6b
Showing 1 changed file with 5 additions and 9 deletions.
14 changes: 5 additions & 9 deletions src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,14 @@ impl Connection {
endpoint: quinn::Endpoint,
connection: quinn::Connection,
) -> (Connection, ConnectionIncoming) {
let peer_address = connection.remote_address();
let conn = Self {
inner: connection.clone(),
};
let conn_id = conn.id();

(
conn,
ConnectionIncoming::new(endpoint, conn_id, peer_address, connection),
ConnectionIncoming::new(endpoint, conn_id, connection),
)
}

Expand Down Expand Up @@ -278,14 +277,13 @@ impl ConnectionIncoming {
fn new(
endpoint: quinn::Endpoint,
conn_id: String,
peer_addr: SocketAddr,
connection: quinn::Connection,
) -> Self {
let (message_tx, message_rx) = mpsc::channel(INCOMING_MESSAGE_BUFFER_LEN);

// offload the actual message handling to a background task - the task will exit when
// `alive_tx` is dropped, which would be when both sides of the connection are dropped.
start_message_listeners(endpoint, conn_id, peer_addr, connection, message_tx);
start_message_listeners(endpoint, conn_id, connection, message_tx);

Self { message_rx }
}
Expand Down Expand Up @@ -315,26 +313,24 @@ impl ConnectionIncoming {
fn start_message_listeners(
endpoint: quinn::Endpoint,
conn_id: String,
peer_addr: SocketAddr,
connection: quinn::Connection,
message_tx: mpsc::Sender<Result<(UsrMsgBytes, Option<ResponseStream>), RecvError>>,
) {
let _ = tokio::spawn(listen_on_uni_streams(
peer_addr,
connection.clone(),
message_tx.clone(),
));

let _ = tokio::spawn(listen_on_bi_streams(
endpoint, conn_id, peer_addr, connection, message_tx,
endpoint, conn_id, connection, message_tx,
));
}

async fn listen_on_uni_streams(
addr: SocketAddr,
connection: quinn::Connection,
message_tx: mpsc::Sender<Result<(UsrMsgBytes, Option<ResponseStream>), RecvError>>,
) {
let addr = connection.remote_address();
trace!("Started listener for incoming uni-streams from {addr}");

loop {
Expand Down Expand Up @@ -389,10 +385,10 @@ async fn listen_on_uni_streams(
async fn listen_on_bi_streams(
endpoint: quinn::Endpoint,
conn_id: String,
peer_addr: SocketAddr,
connection: quinn::Connection,
message_tx: mpsc::Sender<Result<(UsrMsgBytes, Option<ResponseStream>), RecvError>>,
) {
let peer_addr = connection.remote_address();
trace!("Started listener for incoming bi-streams from {peer_addr}");

// Turn the `accept_bi` method into a stream that yields an `Some(Err(ConnectionError))` before `None`
Expand Down

0 comments on commit 092ce6b

Please sign in to comment.