Skip to content

Commit

Permalink
release 0.3.1
Browse files Browse the repository at this point in the history
Add LogPlugin to examples and improve MQTT client with auto-reconnect

- Integrated LogPlugin into WebSocket and Pub/Sub examples for better logging.
- Implemented automatic MQTT client reconnection after disconnection, enhancing reliability.
  • Loading branch information
foxzool committed Sep 12, 2024
1 parent 45285c6 commit 10d5128
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 16 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.3.1] - 2024-09-12

- refactor code
- auto reconnect mqtt client

## [0.2.0] - 2024-07-05

- bump bevy version to `0.14.*`
Expand Down
9 changes: 7 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "bevy_mqtt"
description = "A simple MQTT client for Bevy"
version = "0.3.0"
version = "0.3.1"
edition = "2021"
readme = "README.md"
repository = "https://github.com/foxzool/bevy_mqtt"
Expand All @@ -27,14 +27,19 @@ rumqttc = { version = "0.24.0", features = ["url"] }
bevy-tokio-tasks = { version = "0.14.0" }
kanal = "0.1.0-pre8"
bytes = "1"
log = "0.4.22"
tokio = { version = "1", features = ["rt", "sync"] }


[dev-dependencies]
bevy = { version = "0.14.2", default-features = false }
bincode = "1.3.3"
time = "0.3"
serde = { version = "1", features = ["derive"] }

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
tokio = { version = "1", features = ["rt-multi-thread"] }


[[example]]
name = "pub_and_sub"
path = "examples/pub_and_sub.rs"
Expand Down
3 changes: 2 additions & 1 deletion examples/pub_and_sub.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::time::SystemTime;

use bevy::{prelude::*, time::common_conditions::on_timer};
use bevy_log::LogPlugin;
use bevy_mqtt::{
rumqttc::{MqttOptions, QoS},
MqttClientState, MqttError, MqttEvent, MqttPlugin, MqttPublishOutgoing, MqttSetting,
Expand Down Expand Up @@ -42,7 +43,7 @@ fn main() {
mqtt_options: MqttOptions::new("mqtt-serde", "127.0.0.1", 1883),
cap: 10,
})
.add_plugins((MinimalPlugins, MqttPlugin))
.add_plugins((MinimalPlugins, MqttPlugin, LogPlugin::default()))
.add_systems(Update, (handle_message, handle_error))
.add_systems(OnEnter(MqttClientState::Connected), sub_topic)
.add_systems(
Expand Down
3 changes: 2 additions & 1 deletion examples/websocket.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::time::Duration;

use bevy::{prelude::*, time::common_conditions::on_timer};
use bevy_log::LogPlugin;
use bevy_mqtt::{
rumqttc::{MqttOptions, QoS, Transport},
MqttClientState, MqttError, MqttPlugin, MqttPublishOutgoing, MqttPublishPacket, MqttSetting,
Expand All @@ -18,7 +19,7 @@ fn main() {
mqtt_options,
cap: 10,
})
.add_plugins((MinimalPlugins, MqttPlugin))
.add_plugins((MinimalPlugins, MqttPlugin, LogPlugin::default()))
.add_systems(OnEnter(MqttClientState::Connected), sub_topic)
.add_systems(Update, (handle_mqtt_publish, handle_error))
.add_systems(
Expand Down
28 changes: 16 additions & 12 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
//! A Bevy plugin for MQTT
use bevy_app::Plugin;
use bevy_app::{App, Update};
use bevy_app::{App, Plugin, Update};
use bevy_derive::{Deref, DerefMut};
use bevy_ecs::prelude::{
resource_added, resource_exists, Event, EventReader, EventWriter, IntoSystemConfigs, Res,
ResMut, Resource,
};
use bevy_log::debug;
use bevy_state::app::{AppExtStates, StatesPlugin};
use bevy_state::prelude::in_state;
use bevy_state::state::{NextState, States};
use bevy_log::{debug, error};
use bevy_state::{
app::{AppExtStates, StatesPlugin},
prelude::in_state,
state::{NextState, States},
};
use bevy_tokio_tasks::{TokioTasksPlugin, TokioTasksRuntime};
use bytes::Bytes;
use kanal::{bounded_async, AsyncReceiver};
Expand Down Expand Up @@ -171,6 +172,7 @@ fn handle_mqtt_events(
}

while let Ok(Some(err)) = client.from_async_error.try_recv() {
next_state.set(MqttClientState::Disconnected);
error_events.send(MqttError(err));
}
}
Expand All @@ -194,17 +196,19 @@ fn spawn_client(setting: Res<MqttSetting>, runtime: Res<TokioTasksRuntime>) {
})
.await;
debug!(
"Mqtt client connect_to {:?}",
"Mqtt client connecting_to {:?}",
setting.mqtt_options.broker_address()
);
loop {
match event_loop.poll().await {
Ok(event) => to_async_event.send(event).await.expect("send event failed"),
Ok(event) => {
let _ = to_async_event.send(event).await;
}
Err(connection_err) => {
to_async_error
.send(connection_err)
.await
.expect("send error failed");
error!("Mqtt client connection error: {:?}", connection_err);
let _ = to_async_error.send(connection_err).await;
// auto reconnect after 10 seconds
tokio::time::sleep(std::time::Duration::from_secs(10)).await;
}
}
}
Expand Down

0 comments on commit 10d5128

Please sign in to comment.