Skip to content

Commit

Permalink
#410 Added 'shutdown_event' and reworked consumer to prevent propagat…
Browse files Browse the repository at this point in the history
…ing exceptions on shutdown.
  • Loading branch information
fpagliughi committed Jan 5, 2025
1 parent 1e7d090 commit df82ee4
Show file tree
Hide file tree
Showing 5 changed files with 180 additions and 88 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [#503](https://github.com/eclipse-paho/paho.mqtt.cpp/issues/503) Fixed issue that generated docs were empty.
- [#518](https://github.com/eclipse-paho/paho.mqtt.cpp/pull/518) Add function for checking async consumer event queue size
- [#519](https://github.com/eclipse-paho/paho.mqtt.cpp/pull/519) Fix potential deadlock in set_callback
- [#524](https://github.com/eclipse-paho/paho.mqtt.cpp/issues/524) Fixed copy and move operations for 'subscribe_options'. Added unit tests.


## [Version 1.4.1](https://github.com/eclipse/paho.mqtt.cpp/compare/v1.4.0..v1.4.1) - (2024-07-09)
Expand Down
43 changes: 34 additions & 9 deletions examples/multithr_pub_sub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,20 @@
// processing, perhaps based on the topics. It could be common, however, to
// want to have multiple threads for publishing.
//
// The sample demonstrates:
// - Creating a client and accessing it from a shared_ptr<>
// This example demonstrates:
// - Creating a client and sharing it across threads using a shared_ptr<>
// - Using one thread to receive incoming messages from the broker and
// another thread to publish messages to it.
// - Connecting to an MQTT server/broker.
// - Subscribing to a topic
// - Using the asynchronous consumer
// - Publishing messages.
// - Automatic reconnect
// - Publishing messages
// - Subscribing to multiple topics
// - Using the asynchronous message consumer
// - Signaling consumer from another thread
//

/*******************************************************************************
* Copyright (c) 2020-2023 Frank Pagliughi <[email protected]>
* Copyright (c) 2020-2025 Frank Pagliughi <[email protected]>
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0
Expand Down Expand Up @@ -61,8 +63,8 @@
using namespace std;
using namespace std::chrono;

const std::string DFLT_SERVER_ADDRESS("mqtt://localhost:1883");
const std::string CLIENT_ID("multithr_pub_sub_cpp");
const std::string DFLT_SERVER_ADDRESS{"mqtt://localhost:1883"};
const std::string CLIENT_ID{"multithr_pub_sub_cpp"};

/////////////////////////////////////////////////////////////////////////////

Expand Down Expand Up @@ -172,6 +174,10 @@ int main(int argc, char* argv[])
auto rsp = cli->connect(connOpts)->get_connect_response();
cout << "OK\n" << endl;

cout << "Now start an application such as 'async_publish_time'\n"
<< "that publishes to a 'data/' topic...\n"
<< endl;

// Subscribe if this is a new session with the server
if (!rsp.is_session_present())
cli->subscribe(TOPICS, QOS);
Expand All @@ -180,13 +186,32 @@ int main(int argc, char* argv[])

std::thread publisher(publisher_func, cli, counter);

// Start another thread to shut us down after a minute

std::thread{[cli] {
this_thread::sleep_for(30s);
cout << "Signaling the consumer to stop." << endl;
cli->stop_consuming();
}}.detach();

// Consume messages in this thread

// Remember that with the message consumer, we can't detect a
// reconnect We would need to register a connect callback or use the
// event consumer.

while (true) {
auto msg = cli->consume_message();

if (!msg)
if (!msg) {
// Exit if the consumer was shut down
if (cli->consumer_closed())
break;

// Otherwise let auto-reconnect deal with it.
cout << "Disconnect detected. Attempting an auto-reconnect." << endl;
continue;
}

if (msg->get_topic() == "command" && msg->to_string() == "exit") {
cout << "Exit command received" << endl;
Expand Down
170 changes: 97 additions & 73 deletions include/mqtt/async_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -826,8 +826,102 @@ class async_client : public virtual iasync_client
return (que_) ? que_->size() : 0;
}
/**
* Read the next message from the queue.
* Read the next client event from the queue.
* This blocks until a new message arrives.
* If the consumer queue is closed, this returns a shutdown event.
* @return The client event.
*/
event consume_event() override;
/**
* Try to read the next client event without blocking.
* @param evt Pointer to the value to receive the event
* @return @em true if an event was read, @em false if no
* event was available.
*/
bool try_consume_event(event* evt) override;
/**
* Waits a limited time for a client event to appear.
* @param evt Pointer to the value to receive the event.
* @param relTime The maximum amount of time to wait for an event.
* @return @em true if an event was read, @em false if a timeout
* occurred.
*/
template <typename Rep, class Period>
bool try_consume_event_for(
event* evt, const std::chrono::duration<Rep, Period>& relTime
) {
if (!que_)
throw mqtt::exception(-1, "Consumer not started");

try {
return que_->try_get_for(evt, relTime);
}
catch (queue_closed&) {
*evt = event{shutdown_event{}};
return true;
}
}
/**
* Waits a limited time for a client event to arrive.
* @param relTime The maximum amount of time to wait for an event.
* @return The event that was received. It will contain empty message on
* timeout.
*/
template <typename Rep, class Period>
event try_consume_event_for(const std::chrono::duration<Rep, Period>& relTime) {
event evt;
try {
que_->try_get_for(&evt, relTime);
}
catch (queue_closed&) {
evt = event{shutdown_event{}};
}
return evt;
}
/**
* Waits until a specific time for a client event to appear.
* @param evt Pointer to the value to receive the event.
* @param absTime The time point to wait until, before timing out.
* @return @em true if an event was recceived, @em false if a timeout
* occurred.
*/
template <class Clock, class Duration>
bool try_consume_event_until(
event* evt, const std::chrono::time_point<Clock, Duration>& absTime
) {
if (!que_)
throw mqtt::exception(-1, "Consumer not started");

try {
return que_->try_get_until(evt, absTime);
}
catch (queue_closed&) {
*evt = event{shutdown_event{}};
return true;
}
}
/**
* Waits until a specific time for a client event to appear.
* @param absTime The time point to wait until, before timing out.
* @return The event that was received. It will contain empty message on
* timeout.
*/
template <class Clock, class Duration>
event try_consume_event_until(const std::chrono::time_point<Clock, Duration>& absTime
) {
event evt;
try {
que_->try_get_until(&evt, absTime);
}
catch (queue_closed&) {
evt = event{shutdown_event{}};
}
return evt;
}
/**
* Read the next message from the queue.
* This blocks until a new message arrives or until a disconnect or
* shutdown occurs.
* @return The message and topic.
*/
const_message_ptr consume_message() override;
Expand Down Expand Up @@ -855,7 +949,7 @@ class async_client : public virtual iasync_client
event evt;

while (true) {
if (!que_->try_get_for(&evt, relTime))
if (!try_consume_event_for(&evt, relTime))
return false;

if (const auto* pval = evt.get_message_if()) {
Expand Down Expand Up @@ -901,7 +995,7 @@ class async_client : public virtual iasync_client
event evt;

while (true) {
if (!que_->try_get_until(&evt, absTime))
if (!try_consume_event_until(&evt, absTime))
return false;

if (const auto* pval = evt.get_message_if()) {
Expand Down Expand Up @@ -930,76 +1024,6 @@ class async_client : public virtual iasync_client
this->try_consume_message_until(&msg, absTime);
return msg;
}
/**
* Read the next message from the queue.
* This blocks until a new message arrives.
* @return The message and topic.
*/
event consume_event() override { return que_->get(); }
/**
* Try to read the next message from the queue without blocking.
* @param evt Pointer to the value to receive the event
* @return @em true if an event was read, @em false if no
* event was available.
*/
bool try_consume_event(event* evt) override { return que_->try_get(evt); }
/**
* Waits a limited time for a message to arrive.
* @param evt Pointer to the value to receive the event.
* @param relTime The maximum amount of time to wait for an event.
* @return @em true if an event was read, @em false if a timeout
* occurred.
*/
template <typename Rep, class Period>
bool try_consume_event_for(
event* evt, const std::chrono::duration<Rep, Period>& relTime
) {
if (!que_)
throw mqtt::exception(-1, "Consumer not started");

return que_->try_get_for(evt, relTime);
}
/**
* Waits a limited time for an event to arrive.
* @param relTime The maximum amount of time to wait for an event.
* @return The event that was received. It will contain empty message on
* timeout.
*/
template <typename Rep, class Period>
event try_consume_event_for(const std::chrono::duration<Rep, Period>& relTime) {
event evt;
que_->try_get_for(&evt, relTime);
return evt;
}
/**
* Waits until a specific time for an event to appear.
* @param evt Pointer to the value to receive the event.
* @param absTime The time point to wait until, before timing out.
* @return @em true if an event was recceived, @em false if a timeout
* occurred.
*/
template <class Clock, class Duration>
bool try_consume_event_until(
event* evt, const std::chrono::time_point<Clock, Duration>& absTime
) {
if (!que_)
throw mqtt::exception(-1, "Consumer not started");

return que_->try_get_until(evt, absTime);
}
/**
* Waits until a specific time for an event to appear.
* @param absTime The time point to wait until, before timing out.
* @return The event that was received. It will contain empty message on
* timeout.
*/
template <class Clock, class Duration>
event try_consume_event_until(const std::chrono::time_point<Clock, Duration>& absTime
) {
event evt;
que_->try_get_until(&evt, absTime);
return evt;
}
};

/** Smart/shared pointer to an asynchronous MQTT client object */
Expand Down
25 changes: 21 additions & 4 deletions include/mqtt/event.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ struct disconnected_event
ReasonCode reasonCode;
};

/** Event for when the consumer queue is shutdown from another thread */
struct shutdown_event { };


/* Event for when a message arrives is just a message pointer */

Expand Down Expand Up @@ -83,7 +86,7 @@ class event
public:
/** The variant type for any possible event. */
using event_type = std::variant<
const_message_ptr, connected_event, connection_lost_event, disconnected_event>;
const_message_ptr, connected_event, connection_lost_event, disconnected_event, shutdown_event>;

private:
event_type evt_{};
Expand Down Expand Up @@ -124,6 +127,11 @@ class event
* @param evt A disconnected event.
*/
event(disconnected_event evt) : evt_{std::move(evt)} {}
/**
* Constructs a 'shutdown' event.
* @param evt A shutdown event.
*/
event(shutdown_event evt) : evt_{std::move(evt)} {}
/**
* Copy constructor.
* @param evt The event to copy.
Expand Down Expand Up @@ -196,13 +204,22 @@ class event
return std::holds_alternative<disconnected_event>(evt_);
}
/**
* Determines if this is any type of client disconnect.
* Determines if this event is an internal shutdown request.
* @return @em true if this event is a shutdown request, @em false
* otherwise.
*/
bool is_shutdown() const {
return std::holds_alternative<disconnected_event>(evt_);
}
/**
* Determines if this is any type of client disconnect or shutdown.
* @return @em true if this event is any type of client disconnect such
* as a 'connection lost' or 'disconnected' event.
* as a 'connection lost', 'disconnected', or shutdown event.
*/
bool is_any_disconnect() const {
return std::holds_alternative<connection_lost_event>(evt_)
|| std::holds_alternative<disconnected_event>(evt_);
|| std::holds_alternative<disconnected_event>(evt_)
|| std::holds_alternative<shutdown_event>(evt_);
}
/**
* Gets the message from the event, iff this is a message event.
Expand Down
29 changes: 27 additions & 2 deletions src/async_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -879,6 +879,31 @@ void async_client::stop_consuming()
}
}

event async_client::consume_event()
{
event evt;
try {
evt = que_->get();
}
catch (queue_closed&) {
evt = event{shutdown_event{}};
}
return evt;
}

bool async_client::try_consume_event(event* evt)
{
bool res = false;
try {
res = que_->try_get(evt);
}
catch (queue_closed&) {
*evt = event{shutdown_event{}};
res = true;
}
return res;
}

const_message_ptr async_client::consume_message()
{
if (!que_)
Expand All @@ -887,7 +912,7 @@ const_message_ptr async_client::consume_message()
// For backward compatibility we ignore the 'connected' events,
// whereas disconnected/lost return an empty pointer.
while (true) {
auto evt = que_->get();
auto evt = consume_event();

if (const auto* pval = evt.get_message_if())
return *pval;
Expand All @@ -905,7 +930,7 @@ bool async_client::try_consume_message(const_message_ptr* msg)
event evt;

while (true) {
if (!que_->try_get(&evt))
if (!try_consume_event(&evt))
return false;

if (const auto* pval = evt.get_message_if()) {
Expand Down

0 comments on commit df82ee4

Please sign in to comment.