Skip to content

Commit

Permalink
test: fix tests to match refactors
Browse files Browse the repository at this point in the history
  • Loading branch information
b-zee authored and bochaco committed Dec 12, 2022
1 parent 19702f0 commit 33b97f6
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 19 deletions.
9 changes: 5 additions & 4 deletions examples/p2p_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
use bytes::Bytes;
use color_eyre::eyre::Result;
use futures::StreamExt;
use qp2p::{Config, Endpoint};
use std::{
env,
Expand Down Expand Up @@ -53,13 +54,13 @@ async fn main() -> Result<()> {
.expect("Invalid SocketAddr. Use the form 127.0.0.1:1234");
let msg = Bytes::from(MSG_MARCO);
println!("Sending to {:?} --> {:?}\n", peer, msg);
let (conn, mut incoming) = node.connect_to(&peer).await?;
let conn = node.connect_to(&peer).await?;
conn.send((Bytes::new(), Bytes::new(), msg.clone())).await?;
// `Endpoint` no longer having `connection_pool` to hold established connection.
// Which means the connection get closed immediately when it reaches end of life span.
// And causes the receiver side a sending error when reply via the in-coming connection.
// Hence here have to listen for the reply to avoid such error
let reply = incoming.next().await?.unwrap();
let reply = conn.accept_uni().next().await.unwrap();
println!("Received from {:?} --> {:?}", peer, reply);
}

Expand All @@ -71,11 +72,11 @@ async fn main() -> Result<()> {
println!("---\n");

// loop over incoming connections
while let Some((connection, mut incoming_messages)) = incoming_conns.next().await {
while let Some(connection) = incoming_conns.next().await {
let src = connection.remote_address();

// loop over incoming messages
while let Some((_, _, bytes)) = incoming_messages.next().await? {
while let Some(Ok((_, _, bytes))) = connection.accept_uni().next().await {
println!("Received from {:?} --> {:?}", src, bytes);
if bytes == *MSG_MARCO {
let reply = Bytes::from(MSG_POLO);
Expand Down
32 changes: 17 additions & 15 deletions src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,7 @@ impl Connection {
}

///
pub fn accept_bi(
&self,
) -> ReceiverStream<Result<(UsrMsgBytes, SendStream), RecvError>> {
pub fn accept_bi(&self) -> ReceiverStream<Result<(UsrMsgBytes, SendStream), RecvError>> {
let (tx, rx) = tokio::sync::mpsc::channel(INCOMING_MESSAGE_BUFFER_LEN);

let connection = self.inner.clone();
Expand Down Expand Up @@ -487,10 +485,9 @@ mod tests {
);
let mut p1_rx = p1_tx.accept_uni();

let p2_tx = if let Some(connection) =
timeout(OptionFuture::from(peer2.accept().await))
.await?
.and_then(|c| c.ok())
let p2_tx = if let Some(connection) = timeout(OptionFuture::from(peer2.accept().await))
.await?
.and_then(|c| c.ok())
{
Connection::new(connection, peer2.clone())
} else {
Expand All @@ -504,7 +501,6 @@ mod tests {
.send_user_msg((Bytes::new(), Bytes::new(), Bytes::from_static(b"hello")))
.await?;


if let Some(Ok((_, _, msg))) = timeout(p2_rx.next()).await? {
assert_eq!(&msg[..], b"hello");
} else {
Expand Down Expand Up @@ -550,10 +546,9 @@ mod tests {
peer1.clone(),
);

let p2_conn = if let Some(connection) =
timeout(OptionFuture::from(peer2.accept().await))
.await?
.and_then(|c| c.ok())
let p2_conn = if let Some(connection) = timeout(OptionFuture::from(peer2.accept().await))
.await?
.and_then(|c| c.ok())
{
Connection::new(connection, peer2.clone())
} else {
Expand Down Expand Up @@ -582,6 +577,7 @@ mod tests {
}

#[tokio::test]
#[tracing_test::traced_test]
async fn test_endpoint_echo() -> Result<()> {
let config = InternalConfig::try_from_config(Config::default())?;

Expand All @@ -602,7 +598,9 @@ mod tests {
.await?
.and_then(|c| c.ok())
{
Connection::new(connection, peer2.clone())
let conn = Connection::new(connection, peer2.clone());
let rx = conn.accept_bi();
(conn, rx)
} else {
bail!("did not receive incoming connection when one was expected");
};
Expand Down Expand Up @@ -654,7 +652,9 @@ mod tests {
.await?
.and_then(|c| c.ok())
{
Connection::new(connection, peer2.clone())
let conn = Connection::new(connection, peer2.clone());
let rx = conn.accept_bi();
(conn, rx)
} else {
bail!("did not receive incoming connection when one was expected");
};
Expand All @@ -670,7 +670,9 @@ mod tests {
.await?
.and_then(|c| c.ok())
{
Connection::new(connection, peer1.clone())
let conn = Connection::new(connection, peer1.clone());
let rx = conn.accept_bi();
(conn, rx)
} else {
bail!("did not receive incoming connection when one was expected");
};
Expand Down

0 comments on commit 33b97f6

Please sign in to comment.