Skip to content

Commit

Permalink
[mqtt]: Offline messages integration tests (Azure#2898)
Browse files Browse the repository at this point in the history
This PR adds:
- Existing session support for mqtt Client.
- Basic connect w/ existing session tests.
- Offline messages integration tests.
- Overlapping subscriptions tests
- Some refactoring.
  • Loading branch information
vadim-kovalyov authored May 4, 2020
1 parent 9e77ba2 commit c09f14b
Show file tree
Hide file tree
Showing 3 changed files with 394 additions and 123 deletions.
178 changes: 105 additions & 73 deletions mqtt/mqtt-broker/tests/common.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
use std::{
pin::Pin,
sync::atomic::{AtomicU32, Ordering},
task::{Context, Poll},
time::Duration,
};

use bytes::Bytes;
use futures::{future::select, pin_mut};
use futures_util::{FutureExt, StreamExt};
use lazy_static::lazy_static;
use tokio::{
net::ToSocketAddrs,
stream::Stream,
sync::{
mpsc::{self, UnboundedReceiver},
oneshot::{self, Sender},
Expand All @@ -19,8 +17,8 @@ use tokio::{
};

use mqtt3::{
proto::{Publication, QoS, SubscribeTo},
Client, Event, PublishError, PublishHandle, ShutdownHandle, UpdateSubscriptionError,
proto::{ClientId, Publication, QoS, SubscribeTo},
Client, Event, PublishError, PublishHandle, ReceivedPublication, ShutdownHandle,
UpdateSubscriptionHandle,
};
use mqtt_broker::{Authenticator, Authorizer, Broker, BrokerState, Error, Server};
Expand All @@ -36,7 +34,9 @@ pub struct TestClient {

/// Used to simulate unexpected shutdown.
termination_handle: Sender<()>,
events_receiver: UnboundedReceiver<Event>,
pub_receiver: UnboundedReceiver<ReceivedPublication>,
sub_receiver: UnboundedReceiver<Event>,
conn_receiver: UnboundedReceiver<Event>,
event_loop_handle: JoinHandle<()>,
}

Expand All @@ -45,65 +45,73 @@ impl TestClient {
self.publish_handle.publish(publication).await
}

pub async fn publish_qos0(&mut self, topic: &str, payload: &str, retain: bool) {
pub async fn publish_qos0(
&mut self,
topic: impl Into<String>,
payload: impl Into<Bytes>,
retain: bool,
) {
self.publish(Publication {
topic_name: topic.into(),
qos: QoS::AtMostOnce,
retain,
payload: payload.to_owned().into(),
payload: payload.into(),
})
.await
.expect("couldn't publish")
}

pub async fn publish_qos1(&mut self, topic: &str, payload: &str, retain: bool) {
pub async fn publish_qos1(
&mut self,
topic: impl Into<String>,
payload: impl Into<Bytes>,
retain: bool,
) {
self.publish(Publication {
topic_name: topic.into(),
qos: QoS::AtLeastOnce,
retain,
payload: payload.to_owned().into(),
payload: payload.into(),
})
.await
.expect("couldn't publish")
}

pub async fn publish_qos2(&mut self, topic: &str, payload: &str, retain: bool) {
pub async fn publish_qos2(
&mut self,
topic: impl Into<String>,
payload: impl Into<Bytes>,
retain: bool,
) {
self.publish(Publication {
topic_name: topic.into(),
qos: QoS::ExactlyOnce,
retain,
payload: payload.to_owned().into(),
payload: payload.into(),
})
.await
.expect("couldn't publish")
}

pub async fn subscribe(
&mut self,
subscribe_to: SubscribeTo,
) -> Result<(), UpdateSubscriptionError> {
self.subscription_handle.subscribe(subscribe_to).await
}

pub async fn subscribe_qos2(&mut self, topic_filter: &str) {
self.subscribe(SubscribeTo {
topic_filter: topic_filter.into(),
qos: QoS::ExactlyOnce,
})
.await
.expect("couldn't subscribe to a topic")
pub async fn subscribe(&mut self, topic_filter: impl Into<String>, qos: QoS) {
self.subscription_handle
.subscribe(SubscribeTo {
topic_filter: topic_filter.into(),
qos,
})
.await
.expect("couldn't subscribe to a topic")
}

/// Initiates sending Disconnect packet and proper client shutdown.
pub async fn shutdown(&mut self) {
/// Send the Disconnect packet and shutdown the client properly.
pub async fn shutdown(mut self) {
self.shutdown_handle
.shutdown()
.await
.expect("couldn't shutdown")
}

pub fn shutdown_handle(&mut self) -> ShutdownHandle {
self.shutdown_handle.clone()
.expect("couldn't shutdown");
self.event_loop_handle
.await
.expect("couldn't terminate a client");
}

/// Terminates client w/o sending Disconnect packet.
Expand All @@ -116,26 +124,16 @@ impl TestClient {
.expect("couldn't terminate a client")
}

/// Waits until client's event loop is finished.
pub async fn join(self) {
self.event_loop_handle
.await
.expect("couldn't wait for client event loop to finish")
pub fn connections(&mut self) -> &mut UnboundedReceiver<Event> {
&mut self.conn_receiver
}

pub async fn try_recv(&mut self) -> Option<Event> {
self.events_receiver.try_recv().ok()
pub fn publications(&mut self) -> &mut UnboundedReceiver<ReceivedPublication> {
&mut self.pub_receiver
}
}

impl Stream for TestClient
where
Self: Unpin,
{
type Item = Event;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.events_receiver.poll_recv(cx)
pub fn subscriptions(&mut self) -> &mut UnboundedReceiver<Event> {
&mut self.sub_receiver
}
}

Expand All @@ -144,10 +142,11 @@ where
T: ToSocketAddrs + Clone + Send + Sync + Unpin + 'static,
{
address: T,
client_id: Option<String>,
client_id: ClientId,
username: Option<String>,
password: Option<String>,
will: Option<Publication>,
max_reconnect_back_off: Duration,
keep_alive: Duration,
}

Expand All @@ -159,16 +158,17 @@ where
pub fn new(address: T) -> Self {
Self {
address,
client_id: None,
client_id: ClientId::ServerGenerated,
username: None,
password: None,
will: None,
max_reconnect_back_off: Duration::from_secs(1),
keep_alive: Duration::from_secs(60),
}
}

pub fn client_id(mut self, client_id: &str) -> Self {
self.client_id = Some(client_id.into());
pub fn client_id(mut self, client_id: ClientId) -> Self {
self.client_id = client_id;
self
}

Expand Down Expand Up @@ -196,21 +196,41 @@ where
let address = self.address;
let password = self.password;

let mut client = Client::new(
self.client_id,
self.username,
self.will,
move || {
let address = address.clone();
let password = password.clone();
Box::pin(async move {
let io = tokio::net::TcpStream::connect(&address).await;
io.map(|io| (io, password))
})
},
Duration::from_secs(1),
self.keep_alive,
);
let io_source = move || {
let address = address.clone();
let password = password.clone();
Box::pin(async move {
let io = tokio::net::TcpStream::connect(&address).await;
io.map(|io| (io, password))
})
};

let mut client = match self.client_id {
ClientId::IdWithCleanSession(client_id) => Client::new(
Some(client_id),
self.username,
self.will,
io_source,
self.max_reconnect_back_off,
self.keep_alive,
),
ClientId::IdWithExistingSession(client_id) => Client::from_state(
client_id,
self.username,
self.will,
io_source,
self.max_reconnect_back_off,
self.keep_alive,
),
ClientId::ServerGenerated => Client::new(
None,
self.username,
self.will,
io_source,
self.max_reconnect_back_off,
self.keep_alive,
),
};

let publish_handle = client
.publish_handle()
Expand All @@ -224,17 +244,27 @@ where
.shutdown_handle()
.expect("couldn't get shutdown handle");

let (events_sender, events_receiver) = mpsc::unbounded_channel();
let (pub_sender, pub_receiver) = mpsc::unbounded_channel();
let (sub_sender, sub_receiver) = mpsc::unbounded_channel();
let (conn_sender, conn_receiver) = mpsc::unbounded_channel();

let (termination_handle, tx) = oneshot::channel::<()>();

let event_loop_handle = tokio::spawn(async move {
let event_loop = async {
while let Some(event) = client.next().await {
let event = event.expect("event expected");
events_sender
.send(event)
.expect("can't send an event to a channel");
match event {
Event::NewConnection { .. } => conn_sender
.send(event)
.expect("can't send an event to a conn channel"),
Event::Publication(publication) => pub_sender
.send(publication)
.expect("can't send an event to a pub channel"),
Event::SubscriptionUpdates(_) => sub_sender
.send(event)
.expect("can't send an event to a sub channel"),
};
}
};
pin_mut!(event_loop);
Expand All @@ -246,7 +276,9 @@ where
subscription_handle,
shutdown_handle,
termination_handle,
events_receiver,
pub_receiver,
sub_receiver,
conn_receiver,
event_loop_handle,
}
}
Expand Down
Loading

0 comments on commit c09f14b

Please sign in to comment.