Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Manual ACK packet support #855

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 21 additions & 6 deletions rumqttc/examples/async_manual_acks_v5.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use rumqttc::v5::mqttbytes::v5::Packet;
use rumqttc::v5::mqttbytes::QoS;
use tokio::{task, time};

use rumqttc::v5::{AsyncClient, Event, EventLoop, MqttOptions};
use rumqttc::v5::{AsyncClient, Event, EventLoop, ManualAckReason, MqttOptions};
use std::error::Error;
use std::time::Duration;

Expand Down Expand Up @@ -61,12 +61,27 @@ async fn main() -> Result<(), Box<dyn Error>> {
Packet::Publish(publish) => publish,
_ => continue,
};
// this time we will ack incoming publishes.
// this time we will ack incoming publishes in two different ways.
// Its important not to block notifier as this can cause deadlock.
let c = client.clone();
tokio::spawn(async move {
c.ack(&publish).await.unwrap();
});
if (publish.pkid & 1) == 0 {
// Ack with all default: reason code success, no reason string, properties none
let c = client.clone();
tokio::spawn(async move {
c.ack(&publish).await.unwrap();
});
} else {
// Ack with custom reason code and reason string
let c = client.clone();
// Get manual ack packet for later acking
let mut ack = c.get_manual_ack(&publish);
tokio::spawn(async move {
// Customize reason code (if not set, default is Success)
ack.set_reason(ManualAckReason::Success);
// Customize reason string (if not set, default is empty)
ack.set_reason_string("There is no error".to_string().into());
c.manual_ack(ack).await.unwrap();
});
}
}
}

Expand Down
67 changes: 67 additions & 0 deletions rumqttc/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,33 @@ impl AsyncClient {
Ok(())
}

/// Get a MQTT ManualAck (PubAck/PubRec) for manual_ack/try_manual_ack to the `EventLoop`. Only needed in if `manual_acks` flag is set.
pub fn get_manual_ack(&self, publish: &Publish) -> ManualAck {
match publish.qos {
QoS::AtMostOnce => ManualAck::None,
QoS::AtLeastOnce => ManualAck::PubAck(PubAck::new(publish.pkid)),
QoS::ExactlyOnce => ManualAck::PubRec(PubRec::new(publish.pkid)),
}
}

/// Sends a prepared MQTT ManualAck (PubAck/PubRec) to the `EventLoop`. Only needed in if `manual_acks` flag is set.
pub async fn manual_ack(&self, ack: ManualAck) -> Result<(), ClientError> {
let ack = get_manual_ack_req(ack);
if let Some(ack) = ack {
self.request_tx.send_async(ack).await?;
}
Ok(())
}

/// Attempts to send a prepared MQTT ManualAck (PubAck/PubRec) to the `EventLoop`. Only needed in if `manual_acks` flag is set.
pub fn try_manual_ack(&self, ack: ManualAck) -> Result<(), ClientError> {
let ack = get_manual_ack_req(ack);
if let Some(ack) = ack {
self.request_tx.try_send(ack)?;
}
Ok(())
}

/// Sends a MQTT PubAck to the `EventLoop`. Only needed in if `manual_acks` flag is set.
pub async fn ack(&self, publish: &Publish) -> Result<(), ClientError> {
let ack = get_ack_req(publish);
Expand Down Expand Up @@ -235,6 +262,22 @@ impl AsyncClient {
}
}


/// ManualAck packet for manual_ack
pub enum ManualAck {
None,
PubAck(PubAck),
PubRec(PubRec),
}

fn get_manual_ack_req(ack: ManualAck) -> Option<Request> {
match ack {
ManualAck::None => None,
ManualAck::PubAck(ack) => Some(Request::PubAck(ack)),
ManualAck::PubRec(ack) => Some(Request::PubRec(ack)),
}
}

fn get_ack_req(publish: &Publish) -> Option<Request> {
let ack = match publish.qos {
QoS::AtMostOnce => return None,
Expand Down Expand Up @@ -323,6 +366,30 @@ impl Client {
Ok(())
}

/// Get a MQTT ManualAck (PubAck/PubRec) for manual_ack/try_manual_ack to the `EventLoop`. Only needed in if `manual_acks` flag is set.
pub fn get_manual_ack(&self, publish: &Publish) -> ManualAck {
match publish.qos {
QoS::AtMostOnce => ManualAck::None,
QoS::AtLeastOnce => ManualAck::PubAck(PubAck::new(publish.pkid)),
QoS::ExactlyOnce => ManualAck::PubRec(PubRec::new(publish.pkid)),
}
}

/// Sends a prepared MQTT ManualAck (PubAck/PubRec) to the `EventLoop`. Only needed in if `manual_acks` flag is set.
pub async fn manual_ack(&self, ack: ManualAck) -> Result<(), ClientError> {
let ack = get_manual_ack_req(ack);
if let Some(ack) = ack {
self.client.request_tx.send(ack)?;
}
Ok(())
}

/// Attempts to send a prepared MQTT ManualAck (PubAck/PubRec) to the `EventLoop`. Only needed in if `manual_acks` flag is set.
pub fn try_manual_ack(&self, ack: ManualAck) -> Result<(), ClientError> {
self.client.try_manual_ack(ack)?;
Ok(())
}

/// Sends a MQTT PubAck to the `EventLoop`. Only needed in if `manual_acks` flag is set.
pub fn ack(&self, publish: &Publish) -> Result<(), ClientError> {
let ack = get_ack_req(publish);
Expand Down
140 changes: 140 additions & 0 deletions rumqttc/src/v5/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,37 @@ impl AsyncClient {
self.handle_try_publish(topic, qos, retain, payload, None)
}

/// Get a MQTT ManualAck (PubAck/PubRec) for manual_ack/try_manual_ack to send to the `EventLoop`. Only needed in if `manual_acks` flag is set.
/// This is useful when you want to ack a publish later.
/// By default the ack reason code is success, you can change it using `ack.set_reason`.
/// By default the ack reason string is empty, you can change it using `ack.set_reason_string`.
/// By default the ack user properties is empty, you can change it using `ack.set_user_properties`.
pub fn get_manual_ack(&self, publish: &Publish) -> ManualAck {
match publish.qos {
QoS::AtMostOnce => ManualAck::None,
QoS::AtLeastOnce => ManualAck::PubAck(PubAck::new(publish.pkid, None)),
QoS::ExactlyOnce => ManualAck::PubRec(PubRec::new(publish.pkid, None)),
}
}

/// Sends a prepared MQTT ManualAck (PubAck/PubRec) to the `EventLoop`. Only needed in if `manual_acks` flag is set.
pub async fn manual_ack(&self, ack: ManualAck) -> Result<(), ClientError> {
let ack = get_manual_ack_req(ack);
if let Some(ack) = ack {
self.request_tx.send_async(ack).await?;
}
Ok(())
}

/// Attempts to send a prepared MQTT ManualAck (PubAck/PubRec) to the `EventLoop`. Only needed in if `manual_acks` flag is set.
pub fn try_manual_ack(&self, ack: ManualAck) -> Result<(), ClientError> {
let ack = get_manual_ack_req(ack);
if let Some(ack) = ack {
self.request_tx.try_send(ack)?;
}
Ok(())
}

/// Sends a MQTT PubAck to the `EventLoop`. Only needed in if `manual_acks` flag is set.
pub async fn ack(&self, publish: &Publish) -> Result<(), ClientError> {
let ack = get_ack_req(publish);
Expand Down Expand Up @@ -450,6 +481,91 @@ impl AsyncClient {
}
}

/// Reasons for ManualAck Preparation
pub enum ManualAckReason {
Success,
NoMatchingSubscribers,
UnspecifiedError,
ImplementationSpecificError,
NotAuthorized,
TopicNameInvalid,
PacketIdentifierInUse,
QuotaExceeded,
PayloadFormatInvalid,
}
impl ManualAckReason {
pub fn code(&self) -> u8 {
match self {
ManualAckReason::Success => 0,
ManualAckReason::NoMatchingSubscribers => 16,
ManualAckReason::UnspecifiedError => 128,
ManualAckReason::ImplementationSpecificError => 131,
ManualAckReason::NotAuthorized => 135,
ManualAckReason::TopicNameInvalid => 144,
ManualAckReason::PacketIdentifierInUse => 145,
ManualAckReason::QuotaExceeded => 151,
ManualAckReason::PayloadFormatInvalid => 153,
}
}
}

/// ManualAck packet for manual_ack
pub enum ManualAck {
None,
PubAck(PubAck),
PubRec(PubRec),
}

impl ManualAck {
/// Set reason code for manual_ack sending
pub fn set_reason(&mut self, reason: ManualAckReason) -> &mut Self {
match self {
ManualAck::None => (),
ManualAck::PubAck(ack) => ack.set_code(reason.code()),
ManualAck::PubRec(ack) => ack.set_code(reason.code()),
}
self
}

/// Set reason code number for manual_ack sending
pub fn set_code(&mut self, code: u8) -> &mut Self {
match self {
ManualAck::None => (),
ManualAck::PubAck(ack) => ack.set_code(code),
ManualAck::PubRec(ack) => ack.set_code(code),
}
self
}

/// Set reason string on manual_ack properties
pub fn set_reason_string(&mut self, reason_string: Option<String>) -> &mut Self {
match self {
ManualAck::None => (),
ManualAck::PubAck(ack) => ack.set_reason_string(reason_string),
ManualAck::PubRec(ack) => ack.set_reason_string(reason_string),
}
self
}

/// Set user properties on manual_ack properties
pub fn set_user_properties(&mut self, user_properties: Vec<(String, String)>) -> &mut Self {
match self {
ManualAck::None => (),
ManualAck::PubAck(ack) => ack.set_user_properties(user_properties),
ManualAck::PubRec(ack) => ack.set_user_properties(user_properties),
}
self
}
}

fn get_manual_ack_req(ack: ManualAck) -> Option<Request> {
match ack {
ManualAck::None => None,
ManualAck::PubAck(ack) => Some(Request::PubAck(ack)),
ManualAck::PubRec(ack) => Some(Request::PubRec(ack)),
}
}

fn get_ack_req(publish: &Publish) -> Option<Request> {
let ack = match publish.qos {
QoS::AtMostOnce => return None,
Expand Down Expand Up @@ -584,6 +700,30 @@ impl Client {
self.client.try_publish(topic, qos, retain, payload)
}

/// Get a MQTT ManualAck (PubAck/PubRec) for manual_ack/try_manual_ack to send to the `EventLoop`. Only needed in if `manual_acks` flag is set.
/// This is useful when you want to ack a publish later.
/// By default the ack reason code is success, you can change it using `ack.set_reason`.
/// By default the ack reason string is empty, you can change it using `ack.set_reason_string`.
/// By default the ack user properties is empty, you can change it using `ack.set_user_properties`.
pub fn get_manual_ack(&self, publish: &Publish) -> ManualAck {
self.client.get_manual_ack(publish)
}

/// Sends a prepared MQTT ManualAck (PubAck/PubRec) to the `EventLoop`. Only needed in if `manual_acks` flag is set.
pub fn manual_ack(&self, ack: ManualAck) -> Result<(), ClientError> {
let ack = get_manual_ack_req(ack);
if let Some(ack) = ack {
self.client.request_tx.send(ack)?;
}
Ok(())
}

/// Attempts to send a prepared MQTT ManualAck (PubAck/PubRec) to the `EventLoop`. Only needed in if `manual_acks` flag is set.
pub fn try_manual_ack(&self, ack: ManualAck) -> Result<(), ClientError> {
self.client.try_manual_ack(ack)?;
Ok(())
}

/// Sends a MQTT PubAck to the `EventLoop`. Only needed in if `manual_acks` flag is set.
pub fn ack(&self, publish: &Publish) -> Result<(), ClientError> {
let ack = get_ack_req(publish);
Expand Down
2 changes: 1 addition & 1 deletion rumqttc/src/v5/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::{NetworkOptions, Transport};

use mqttbytes::v5::*;

pub use client::{AsyncClient, Client, ClientError, Connection, Iter};
pub use client::{AsyncClient, Client, ClientError, Connection, Iter, ManualAck, ManualAckReason};
pub use eventloop::{ConnectionError, Event, EventLoop};
pub use state::{MqttState, StateError};

Expand Down
26 changes: 26 additions & 0 deletions rumqttc/src/v5/mqttbytes/v5/puback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,32 @@ impl PubAck {
}
}

pub fn set_code(&mut self, code: u8) {
self.reason = reason(code).unwrap();
}

pub fn set_reason_string(&mut self, reason_string: Option<String>) {
if let Some(props) = &mut self.properties {
props.reason_string = reason_string;
} else {
self.properties = Some(PubAckProperties {
reason_string,
user_properties: Vec::<(String, String)>::new(),
});
}
}

pub fn set_user_properties(&mut self, user_properties: Vec<(String, String)>) {
if let Some(props) = &mut self.properties {
props.user_properties = user_properties;
} else {
self.properties = Some(PubAckProperties {
reason_string: None,
user_properties,
});
}
}

pub fn size(&self) -> usize {
if self.reason == PubAckReason::Success && self.properties.is_none() {
return 4;
Expand Down
30 changes: 28 additions & 2 deletions rumqttc/src/v5/mqttbytes/v5/pubrec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,32 @@ impl PubRec {
}
}

pub fn set_code(&mut self, code: u8) {
self.reason = reason(code).unwrap();
}

pub fn set_reason_string(&mut self, reason_string: Option<String>) {
if let Some(props) = &mut self.properties {
props.reason_string = reason_string;
} else {
self.properties = Some(PubRecProperties {
reason_string,
user_properties: Vec::<(String, String)>::new(),
});
}
}

pub fn set_user_properties(&mut self, user_properties: Vec<(String, String)>) {
if let Some(props) = &mut self.properties {
props.user_properties = user_properties;
} else {
self.properties = Some(PubRecProperties {
reason_string: None,
user_properties,
});
}
}

pub fn size(&self) -> usize {
let len = self.len();
let remaining_len_size = len_len(len);
Expand Down Expand Up @@ -83,12 +109,12 @@ impl PubRec {
}

let properties = PubRecProperties::read(&mut bytes)?;
let puback = PubRec {
let pubrec = PubRec {
pkid,
reason: reason(ack_reason)?,
properties,
};
Ok(puback)
Ok(pubrec)
}

pub fn write(&self, buffer: &mut BytesMut) -> Result<usize, Error> {
Expand Down