Skip to content

Commit

Permalink
Change reconnect logic in MQTT
Browse files Browse the repository at this point in the history
  • Loading branch information
liamadamson committed Nov 26, 2023
1 parent 2239abd commit e04be3f
Showing 1 changed file with 32 additions and 19 deletions.
51 changes: 32 additions & 19 deletions src/mqtt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use serde_json;

pub struct MQTTReadingsSender {
client: mqtt::Client,
connection_opts: mqtt::ConnectOptions,
topic: String,
}

Expand All @@ -20,11 +19,25 @@ impl MQTTReadingsSender {
match Self::create_mqtt_client(server_uri, client_id) {
Ok(client) => {
match Self::get_connection_opts(ca_file, key_store_file, private_key_file) {
Ok(connection_opts) => Ok(Self {
client,
connection_opts,
topic: topic.to_string(),
}),
Ok(connection_opts) => loop {
match client.connect(connection_opts.clone()) {
Ok(_) => {
log::info!("Connected to MQTT broker");

return Ok(Self {
client,
topic: topic.to_string(),
});
}
Err(e) => {
log::warn!(
"Failed to connect to MQTT broker, retrying in 5 seconds: {}",
e
);
std::thread::sleep(std::time::Duration::from_secs(5));
}
}
},
Err(e) => Err(anyhow::anyhow!(
"Failed to create MQTT connection options: {}",
e
Expand Down Expand Up @@ -72,22 +85,22 @@ impl ReadingsSender for MQTTReadingsSender {
.qos(1)
.finalize();

match self.client.connect(self.connection_opts.clone()) {
Ok(_) => {
if let Err(e) = self.client.publish(msg) {
log::warn!("Failed to publish readings to MQTT broker: {}", e);
}
if !self.client.is_connected() {
log::info!("MQTT client is not connected, trying to reconnect.");

if let Err(e) = self.client.disconnect(None) {
log::warn!(
"Failed to disconnect from MQTT broker after sending readings: {}",
e
);
// Try to reconnect once.
match self.client.reconnect() {
Ok(_) => log::info!("MQTT client reconnected"),
Err(e) => {
log::warn!("Failed to reconnect MQTT client: {}", e);
return Err(anyhow::anyhow!("Failed to reconnect MQTT client"));
}

Ok(())
}
Err(e) => Err(anyhow::anyhow!("Failed to connect to MQTT broker: {}", e)),
}

match self.client.publish(msg) {
Ok(_) => Ok(()),
Err(e) => Err(anyhow::anyhow!("Failed to publish MQTT message: {}", e)),
}
}
}

0 comments on commit e04be3f

Please sign in to comment.