From 36d8fa300927912659d944fe84c31c8b7d170a14 Mon Sep 17 00:00:00 2001 From: CQ Xiao Date: Mon, 29 Apr 2024 14:46:15 +0800 Subject: [PATCH 1/7] Manual ACK V5 improvement --- rumqttc/examples/async_manual_acks_v5.rs | 21 +++- rumqttc/src/v5/client.rs | 132 +++++++++++++++++++++++ rumqttc/src/v5/mod.rs | 2 +- rumqttc/src/v5/mqttbytes/v5/puback.rs | 26 +++++ rumqttc/src/v5/mqttbytes/v5/pubrec.rs | 30 +++++- 5 files changed, 203 insertions(+), 8 deletions(-) diff --git a/rumqttc/examples/async_manual_acks_v5.rs b/rumqttc/examples/async_manual_acks_v5.rs index bcf1bf35..774283de 100644 --- a/rumqttc/examples/async_manual_acks_v5.rs +++ b/rumqttc/examples/async_manual_acks_v5.rs @@ -2,16 +2,17 @@ use rumqttc::v5::mqttbytes::v5::Packet; use rumqttc::v5::mqttbytes::QoS; use tokio::{task, time}; -use rumqttc::v5::{AsyncClient, Event, EventLoop, MqttOptions}; +use rumqttc::v5::{AsyncClient, Event, EventLoop, ManualAckReason, MqttOptions}; use std::error::Error; use std::time::Duration; fn create_conn() -> (AsyncClient, EventLoop) { - let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1884); + let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883); mqttoptions .set_keep_alive(Duration::from_secs(5)) .set_manual_acks(true) - .set_clean_start(false); + .set_clean_start(false) + .set_session_expiry_interval(u32::MAX.into()); AsyncClient::new(mqttoptions, 10) } @@ -21,6 +22,9 @@ async fn main() -> Result<(), Box> { // todo!("fix this example with new way of spawning clients") pretty_env_logger::init(); + println!(""); + println!(">>>>>>>>>>> Create broker connection, do not ack broker publishes!!!"); + // create mqtt connection with clean_session = false and manual_acks = true let (client, mut eventloop) = create_conn(); @@ -50,6 +54,9 @@ async fn main() -> Result<(), Box> { } } + println!(""); + println!(">>>>>>>>>>> Create new broker connection to get unack packets again!!!"); + // create new broker connection let (client, mut eventloop) = create_conn(); @@ -65,7 +72,11 @@ async fn main() -> Result<(), Box> { // Its important not to block notifier as this can cause deadlock. let c = client.clone(); tokio::spawn(async move { - c.ack(&publish).await.unwrap(); + let mut ack = c.get_manual_ack(&publish); + ack.set_reason(ManualAckReason::UnspecifiedError); + ack.set_reason_string("Testing error".to_string().into()); + c.manual_ack(ack).await.unwrap(); + // c.ack(&publish).await.unwrap(); }); } } @@ -74,7 +85,7 @@ async fn main() -> Result<(), Box> { } async fn requests(client: &AsyncClient) { - for i in 1..=10 { + for i in 1..=5 { client .publish("hello/world", QoS::AtLeastOnce, false, vec![1; i]) .await diff --git a/rumqttc/src/v5/client.rs b/rumqttc/src/v5/client.rs index f8629b8c..059e5a3d 100644 --- a/rumqttc/src/v5/client.rs +++ b/rumqttc/src/v5/client.rs @@ -177,6 +177,33 @@ impl AsyncClient { self.handle_try_publish(topic, qos, retain, payload, None) } + /// Get a MQTT ManualAck (PubAck/PubRec) for manual_ack/try_manual_ack to send to the `EventLoop`. Only needed in if `manual_acks` flag is set. + pub fn get_manual_ack(&self, publish: &Publish) -> ManualAck { + match publish.qos { + QoS::AtMostOnce => ManualAck::None, + QoS::AtLeastOnce => ManualAck::PubAck(PubAck::new(publish.pkid, None)), + QoS::ExactlyOnce => ManualAck::PubRec(PubRec::new(publish.pkid, None)), + } + } + + /// Sends a prepared MQTT ManualAck (PubAck/PubRec) to the `EventLoop`. Only needed in if `manual_acks` flag is set. + pub async fn manual_ack(&self, ack: ManualAck) -> Result<(), ClientError> { + let ack = get_manual_ack_req(ack); + if let Some(ack) = ack { + self.request_tx.send_async(ack).await?; + } + Ok(()) + } + + /// Attempts to send a prepared MQTT ManualAck (PubAck/PubRec) to the `EventLoop`. Only needed in if `manual_acks` flag is set. + pub fn try_manual_ack(&self, ack: ManualAck) -> Result<(), ClientError> { + let ack = get_manual_ack_req(ack); + if let Some(ack) = ack { + self.request_tx.try_send(ack)?; + } + Ok(()) + } + /// Sends a MQTT PubAck to the `EventLoop`. Only needed in if `manual_acks` flag is set. pub async fn ack(&self, publish: &Publish) -> Result<(), ClientError> { let ack = get_ack_req(publish); @@ -450,6 +477,91 @@ impl AsyncClient { } } +/// Reasons for ManualAck Preparation +pub enum ManualAckReason { + Success, + NoMatchingSubscribers, + UnspecifiedError, + ImplementationSpecificError, + NotAuthorized, + TopicNameInvalid, + PacketIdentifierInUse, + QuotaExceeded, + PayloadFormatInvalid, +} +impl ManualAckReason { + pub fn code(&self) -> u8 { + match self { + ManualAckReason::Success => 0, + ManualAckReason::NoMatchingSubscribers => 16, + ManualAckReason::UnspecifiedError => 128, + ManualAckReason::ImplementationSpecificError => 131, + ManualAckReason::NotAuthorized => 135, + ManualAckReason::TopicNameInvalid => 144, + ManualAckReason::PacketIdentifierInUse => 145, + ManualAckReason::QuotaExceeded => 151, + ManualAckReason::PayloadFormatInvalid => 153, + } + } +} + +/// ManualAck packet for manual_ack +pub enum ManualAck { + None, + PubAck(PubAck), + PubRec(PubRec), +} + +impl ManualAck { + /// Set reason code for manual_ack sending + pub fn set_reason(&mut self, reason: ManualAckReason) -> &mut Self { + match self { + ManualAck::None => (), + ManualAck::PubAck(ack) => ack.set_code(reason.code()), + ManualAck::PubRec(ack) => ack.set_code(reason.code()), + } + self + } + + /// Set reason code number for manual_ack sending + pub fn set_code(&mut self, code: u8) -> &mut Self { + match self { + ManualAck::None => (), + ManualAck::PubAck(ack) => ack.set_code(code), + ManualAck::PubRec(ack) => ack.set_code(code), + } + self + } + + /// Set reason string on manual_ack properties + pub fn set_reason_string(&mut self, reason_string: Option) -> &mut Self { + match self { + ManualAck::None => (), + ManualAck::PubAck(ack) => ack.set_reason_string(reason_string), + ManualAck::PubRec(ack) => ack.set_reason_string(reason_string), + } + self + } + + /// Set user properties on manual_ack properties + pub fn set_user_properties(&mut self, user_properties: Vec<(String, String)>) -> &mut Self { + match self { + ManualAck::None => (), + ManualAck::PubAck(ack) => ack.set_user_properties(user_properties), + ManualAck::PubRec(ack) => ack.set_user_properties(user_properties), + } + self + } +} + +fn get_manual_ack_req(ack: ManualAck) -> Option { + match ack { + ManualAck::None => None, + ManualAck::PubAck(ack) => Some(Request::PubAck(ack)), + ManualAck::PubRec(ack) => Some(Request::PubRec(ack)), + } +} + fn get_ack_req(publish: &Publish) -> Option { let ack = match publish.qos { QoS::AtMostOnce => return None, @@ -584,6 +696,26 @@ impl Client { self.client.try_publish(topic, qos, retain, payload) } + /// Get a MQTT ManualAck (PubAck/PubRec) for manual_ack/try_manual_ack to send to the `EventLoop`. Only needed in if `manual_acks` flag is set. + pub fn get_manual_ack(&self, publish: &Publish) -> ManualAck { + self.client.get_manual_ack(publish) + } + + /// Sends a prepared MQTT ManualAck (PubAck/PubRec) to the `EventLoop`. Only needed in if `manual_acks` flag is set. + pub fn manual_ack(&self, ack: ManualAck) -> Result<(), ClientError> { + let ack = get_manual_ack_req(ack); + if let Some(ack) = ack { + self.client.request_tx.send(ack)?; + } + Ok(()) + } + + /// Attempts to send a prepared MQTT ManualAck (PubAck/PubRec) to the `EventLoop`. Only needed in if `manual_acks` flag is set. + pub fn try_manual_ack(&self, ack: ManualAck) -> Result<(), ClientError> { + self.client.try_manual_ack(ack)?; + Ok(()) + } + /// Sends a MQTT PubAck to the `EventLoop`. Only needed in if `manual_acks` flag is set. pub fn ack(&self, publish: &Publish) -> Result<(), ClientError> { let ack = get_ack_req(publish); diff --git a/rumqttc/src/v5/mod.rs b/rumqttc/src/v5/mod.rs index 44499cde..5bda232a 100644 --- a/rumqttc/src/v5/mod.rs +++ b/rumqttc/src/v5/mod.rs @@ -19,7 +19,7 @@ use crate::{NetworkOptions, Transport}; use mqttbytes::v5::*; -pub use client::{AsyncClient, Client, ClientError, Connection, Iter}; +pub use client::{AsyncClient, Client, ClientError, Connection, Iter, ManualAckReason}; pub use eventloop::{ConnectionError, Event, EventLoop}; pub use state::{MqttState, StateError}; diff --git a/rumqttc/src/v5/mqttbytes/v5/puback.rs b/rumqttc/src/v5/mqttbytes/v5/puback.rs index 9905a450..3a0af3a6 100644 --- a/rumqttc/src/v5/mqttbytes/v5/puback.rs +++ b/rumqttc/src/v5/mqttbytes/v5/puback.rs @@ -32,6 +32,32 @@ impl PubAck { } } + pub fn set_code(&mut self, code: u8) { + self.reason = reason(code).unwrap(); + } + + pub fn set_reason_string(&mut self, reason_string: Option) { + if let Some(props) = &mut self.properties { + props.reason_string = reason_string; + } else { + self.properties = Some(PubAckProperties { + reason_string, + user_properties: Vec::<(String, String)>::new(), + }); + } + } + + pub fn set_user_properties(&mut self, user_properties: Vec<(String, String)>) { + if let Some(props) = &mut self.properties { + props.user_properties = user_properties; + } else { + self.properties = Some(PubAckProperties { + reason_string: None, + user_properties, + }); + } + } + pub fn size(&self) -> usize { if self.reason == PubAckReason::Success && self.properties.is_none() { return 4; diff --git a/rumqttc/src/v5/mqttbytes/v5/pubrec.rs b/rumqttc/src/v5/mqttbytes/v5/pubrec.rs index 6b3aad36..ec98aadb 100644 --- a/rumqttc/src/v5/mqttbytes/v5/pubrec.rs +++ b/rumqttc/src/v5/mqttbytes/v5/pubrec.rs @@ -33,6 +33,32 @@ impl PubRec { } } + pub fn set_code(&mut self, code: u8) { + self.reason = reason(code).unwrap(); + } + + pub fn set_reason_string(&mut self, reason_string: Option) { + if let Some(props) = &mut self.properties { + props.reason_string = reason_string; + } else { + self.properties = Some(PubRecProperties { + reason_string, + user_properties: Vec::<(String, String)>::new(), + }); + } + } + + pub fn set_user_properties(&mut self, user_properties: Vec<(String, String)>) { + if let Some(props) = &mut self.properties { + props.user_properties = user_properties; + } else { + self.properties = Some(PubRecProperties { + reason_string: None, + user_properties, + }); + } + } + pub fn size(&self) -> usize { let len = self.len(); let remaining_len_size = len_len(len); @@ -83,12 +109,12 @@ impl PubRec { } let properties = PubRecProperties::read(&mut bytes)?; - let puback = PubRec { + let pubrec = PubRec { pkid, reason: reason(ack_reason)?, properties, }; - Ok(puback) + Ok(pubrec) } pub fn write(&self, buffer: &mut BytesMut) -> Result { From 57bb3f712165534a07e0ee45536db6eb8eb507a9 Mon Sep 17 00:00:00 2001 From: CQ Xiao Date: Mon, 29 Apr 2024 15:02:37 +0800 Subject: [PATCH 2/7] Manual ACK v4 update. --- rumqttc/examples/async_manual_acks.rs | 4 +- rumqttc/src/client.rs | 67 +++++++++++++++++++++++++++ 2 files changed, 70 insertions(+), 1 deletion(-) diff --git a/rumqttc/examples/async_manual_acks.rs b/rumqttc/examples/async_manual_acks.rs index e5360aa0..a7cb04df 100644 --- a/rumqttc/examples/async_manual_acks.rs +++ b/rumqttc/examples/async_manual_acks.rs @@ -68,7 +68,9 @@ async fn main() -> Result<(), Box> { // Its important not to block eventloop as this can cause deadlock. let c = client.clone(); tokio::spawn(async move { - c.ack(&publish).await.unwrap(); + let ack = c.get_manual_ack(&publish); + c.manual_ack(ack).await.unwrap(); + // c.ack(&publish).await.unwrap(); }); } } diff --git a/rumqttc/src/client.rs b/rumqttc/src/client.rs index 15cd5f5a..0b18f0dc 100644 --- a/rumqttc/src/client.rs +++ b/rumqttc/src/client.rs @@ -111,6 +111,33 @@ impl AsyncClient { Ok(()) } + /// Get a MQTT ManualAck (PubAck/PubRec) for manual_ack/try_manual_ack to the `EventLoop`. Only needed in if `manual_acks` flag is set. + pub fn get_manual_ack(&self, publish: &Publish) -> ManualAck { + match publish.qos { + QoS::AtMostOnce => ManualAck::None, + QoS::AtLeastOnce => ManualAck::PubAck(PubAck::new(publish.pkid)), + QoS::ExactlyOnce => ManualAck::PubRec(PubRec::new(publish.pkid)), + } + } + + /// Sends a prepared MQTT ManualAck (PubAck/PubRec) to the `EventLoop`. Only needed in if `manual_acks` flag is set. + pub async fn manual_ack(&self, ack: ManualAck) -> Result<(), ClientError> { + let ack = get_manual_ack_req(ack); + if let Some(ack) = ack { + self.request_tx.send_async(ack).await?; + } + Ok(()) + } + + /// Attempts to send a prepared MQTT ManualAck (PubAck/PubRec) to the `EventLoop`. Only needed in if `manual_acks` flag is set. + pub fn try_manual_ack(&self, ack: ManualAck) -> Result<(), ClientError> { + let ack = get_manual_ack_req(ack); + if let Some(ack) = ack { + self.request_tx.try_send(ack)?; + } + Ok(()) + } + /// Sends a MQTT PubAck to the `EventLoop`. Only needed in if `manual_acks` flag is set. pub async fn ack(&self, publish: &Publish) -> Result<(), ClientError> { let ack = get_ack_req(publish); @@ -235,6 +262,22 @@ impl AsyncClient { } } + +/// ManualAck packet for manual_ack +pub enum ManualAck { + None, + PubAck(PubAck), + PubRec(PubRec), +} + +fn get_manual_ack_req(ack: ManualAck) -> Option { + match ack { + ManualAck::None => None, + ManualAck::PubAck(ack) => Some(Request::PubAck(ack)), + ManualAck::PubRec(ack) => Some(Request::PubRec(ack)), + } +} + fn get_ack_req(publish: &Publish) -> Option { let ack = match publish.qos { QoS::AtMostOnce => return None, @@ -323,6 +366,30 @@ impl Client { Ok(()) } + /// Get a MQTT ManualAck (PubAck/PubRec) for manual_ack/try_manual_ack to the `EventLoop`. Only needed in if `manual_acks` flag is set. + pub fn get_manual_ack(&self, publish: &Publish) -> ManualAck { + match publish.qos { + QoS::AtMostOnce => ManualAck::None, + QoS::AtLeastOnce => ManualAck::PubAck(PubAck::new(publish.pkid)), + QoS::ExactlyOnce => ManualAck::PubRec(PubRec::new(publish.pkid)), + } + } + + /// Sends a prepared MQTT ManualAck (PubAck/PubRec) to the `EventLoop`. Only needed in if `manual_acks` flag is set. + pub async fn manual_ack(&self, ack: ManualAck) -> Result<(), ClientError> { + let ack = get_manual_ack_req(ack); + if let Some(ack) = ack { + self.client.request_tx.send(ack)?; + } + Ok(()) + } + + /// Attempts to send a prepared MQTT ManualAck (PubAck/PubRec) to the `EventLoop`. Only needed in if `manual_acks` flag is set. + pub fn try_manual_ack(&self, ack: ManualAck) -> Result<(), ClientError> { + self.client.try_manual_ack(ack)?; + Ok(()) + } + /// Sends a MQTT PubAck to the `EventLoop`. Only needed in if `manual_acks` flag is set. pub fn ack(&self, publish: &Publish) -> Result<(), ClientError> { let ack = get_ack_req(publish); From aebf868fa279c2440a5bfda82060edf242b04bb6 Mon Sep 17 00:00:00 2001 From: CQ Xiao Date: Tue, 30 Apr 2024 11:46:37 +0800 Subject: [PATCH 3/7] Cleanup example code. --- rumqttc/examples/async_manual_acks.rs | 4 +--- rumqttc/examples/async_manual_acks_v5.rs | 16 ++++------------ 2 files changed, 5 insertions(+), 15 deletions(-) diff --git a/rumqttc/examples/async_manual_acks.rs b/rumqttc/examples/async_manual_acks.rs index a7cb04df..e5360aa0 100644 --- a/rumqttc/examples/async_manual_acks.rs +++ b/rumqttc/examples/async_manual_acks.rs @@ -68,9 +68,7 @@ async fn main() -> Result<(), Box> { // Its important not to block eventloop as this can cause deadlock. let c = client.clone(); tokio::spawn(async move { - let ack = c.get_manual_ack(&publish); - c.manual_ack(ack).await.unwrap(); - // c.ack(&publish).await.unwrap(); + c.ack(&publish).await.unwrap(); }); } } diff --git a/rumqttc/examples/async_manual_acks_v5.rs b/rumqttc/examples/async_manual_acks_v5.rs index 774283de..b22d0af9 100644 --- a/rumqttc/examples/async_manual_acks_v5.rs +++ b/rumqttc/examples/async_manual_acks_v5.rs @@ -7,12 +7,11 @@ use std::error::Error; use std::time::Duration; fn create_conn() -> (AsyncClient, EventLoop) { - let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883); + let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1884); mqttoptions .set_keep_alive(Duration::from_secs(5)) .set_manual_acks(true) - .set_clean_start(false) - .set_session_expiry_interval(u32::MAX.into()); + .set_clean_start(false); AsyncClient::new(mqttoptions, 10) } @@ -22,9 +21,6 @@ async fn main() -> Result<(), Box> { // todo!("fix this example with new way of spawning clients") pretty_env_logger::init(); - println!(""); - println!(">>>>>>>>>>> Create broker connection, do not ack broker publishes!!!"); - // create mqtt connection with clean_session = false and manual_acks = true let (client, mut eventloop) = create_conn(); @@ -54,9 +50,6 @@ async fn main() -> Result<(), Box> { } } - println!(""); - println!(">>>>>>>>>>> Create new broker connection to get unack packets again!!!"); - // create new broker connection let (client, mut eventloop) = create_conn(); @@ -73,10 +66,9 @@ async fn main() -> Result<(), Box> { let c = client.clone(); tokio::spawn(async move { let mut ack = c.get_manual_ack(&publish); - ack.set_reason(ManualAckReason::UnspecifiedError); - ack.set_reason_string("Testing error".to_string().into()); + ack.set_reason(ManualAckReason::Success); + ack.set_reason_string("There is no error".to_string().into()); c.manual_ack(ack).await.unwrap(); - // c.ack(&publish).await.unwrap(); }); } } From 060d26897e8f656ba143e306bcbe2862caf24eb5 Mon Sep 17 00:00:00 2001 From: CQ Xiao Date: Tue, 30 Apr 2024 11:57:08 +0800 Subject: [PATCH 4/7] Keep minimal change in example. --- rumqttc/examples/async_manual_acks_v5.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rumqttc/examples/async_manual_acks_v5.rs b/rumqttc/examples/async_manual_acks_v5.rs index b22d0af9..50b2fc21 100644 --- a/rumqttc/examples/async_manual_acks_v5.rs +++ b/rumqttc/examples/async_manual_acks_v5.rs @@ -77,7 +77,7 @@ async fn main() -> Result<(), Box> { } async fn requests(client: &AsyncClient) { - for i in 1..=5 { + for i in 1..=10 { client .publish("hello/world", QoS::AtLeastOnce, false, vec![1; i]) .await From a91c59fbcd386dc9220c82d09c491a3cc9639b2f Mon Sep 17 00:00:00 2001 From: CQ Xiao Date: Wed, 8 May 2024 14:26:55 +0800 Subject: [PATCH 5/7] Export ManualAck --- rumqttc/src/v5/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rumqttc/src/v5/mod.rs b/rumqttc/src/v5/mod.rs index 5bda232a..2e2ece39 100644 --- a/rumqttc/src/v5/mod.rs +++ b/rumqttc/src/v5/mod.rs @@ -19,7 +19,7 @@ use crate::{NetworkOptions, Transport}; use mqttbytes::v5::*; -pub use client::{AsyncClient, Client, ClientError, Connection, Iter, ManualAckReason}; +pub use client::{AsyncClient, Client, ClientError, Connection, Iter, ManualAck, ManualAckReason}; pub use eventloop::{ConnectionError, Event, EventLoop}; pub use state::{MqttState, StateError}; From 83b5b9a632f4b9f823df5bc9d41e1e37bfc9e9d5 Mon Sep 17 00:00:00 2001 From: CQ Xiao Date: Thu, 16 May 2024 14:51:26 +0800 Subject: [PATCH 6/7] Add comment docs on manual ack usage. --- rumqttc/src/v5/client.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/rumqttc/src/v5/client.rs b/rumqttc/src/v5/client.rs index 059e5a3d..591bb551 100644 --- a/rumqttc/src/v5/client.rs +++ b/rumqttc/src/v5/client.rs @@ -178,6 +178,10 @@ impl AsyncClient { } /// Get a MQTT ManualAck (PubAck/PubRec) for manual_ack/try_manual_ack to send to the `EventLoop`. Only needed in if `manual_acks` flag is set. + /// This is useful when you want to ack a publish later. + /// By default the ack reason code is success, you can change it using `ack.set_reason`. + /// By default the ack reason string is empty, you can change it using `ack.set_reason_string`. + /// By default the ack user properties is empty, you can change it using `ack.set_user_properties`. pub fn get_manual_ack(&self, publish: &Publish) -> ManualAck { match publish.qos { QoS::AtMostOnce => ManualAck::None, @@ -697,6 +701,10 @@ impl Client { } /// Get a MQTT ManualAck (PubAck/PubRec) for manual_ack/try_manual_ack to send to the `EventLoop`. Only needed in if `manual_acks` flag is set. + /// This is useful when you want to ack a publish later. + /// By default the ack reason code is success, you can change it using `ack.set_reason`. + /// By default the ack reason string is empty, you can change it using `ack.set_reason_string`. + /// By default the ack user properties is empty, you can change it using `ack.set_user_properties`. pub fn get_manual_ack(&self, publish: &Publish) -> ManualAck { self.client.get_manual_ack(publish) } From 58aa423dbe90c0f65bae0bfa5c511818c272eb1f Mon Sep 17 00:00:00 2001 From: CQ Xiao Date: Thu, 16 May 2024 14:51:46 +0800 Subject: [PATCH 7/7] Update example comments on manual ack usage. --- rumqttc/examples/async_manual_acks_v5.rs | 26 +++++++++++++++++------- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/rumqttc/examples/async_manual_acks_v5.rs b/rumqttc/examples/async_manual_acks_v5.rs index 50b2fc21..e76cc214 100644 --- a/rumqttc/examples/async_manual_acks_v5.rs +++ b/rumqttc/examples/async_manual_acks_v5.rs @@ -61,15 +61,27 @@ async fn main() -> Result<(), Box> { Packet::Publish(publish) => publish, _ => continue, }; - // this time we will ack incoming publishes. + // this time we will ack incoming publishes in two different ways. // Its important not to block notifier as this can cause deadlock. - let c = client.clone(); - tokio::spawn(async move { + if (publish.pkid & 1) == 0 { + // Ack with all default: reason code success, no reason string, properties none + let c = client.clone(); + tokio::spawn(async move { + c.ack(&publish).await.unwrap(); + }); + } else { + // Ack with custom reason code and reason string + let c = client.clone(); + // Get manual ack packet for later acking let mut ack = c.get_manual_ack(&publish); - ack.set_reason(ManualAckReason::Success); - ack.set_reason_string("There is no error".to_string().into()); - c.manual_ack(ack).await.unwrap(); - }); + tokio::spawn(async move { + // Customize reason code (if not set, default is Success) + ack.set_reason(ManualAckReason::Success); + // Customize reason string (if not set, default is empty) + ack.set_reason_string("There is no error".to_string().into()); + c.manual_ack(ack).await.unwrap(); + }); + } } }