Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor Mqttoptions #789

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion benchmarks/clients/rumqttasync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ async fn main() {
}

pub async fn start(id: &str, payload_size: usize, count: usize) -> Result<(), Box<dyn Error>> {
let mut mqttoptions = MqttOptions::new(id, "localhost", 1883);
let mut mqttoptions = MqttOptions::new(id, "localhost")?;
mqttoptions.set_keep_alive(Duration::from_secs(20));
mqttoptions.set_inflight(100);

Expand Down
2 changes: 1 addition & 1 deletion benchmarks/clients/rumqttasyncqos0.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ async fn main() {
}

pub async fn start(id: &str, payload_size: usize, count: usize) -> Result<(), Box<dyn Error>> {
let mut mqttoptions = MqttOptions::new(id, "localhost", 1883);
let mut mqttoptions = MqttOptions::new(id, "localhost")?;
mqttoptions.set_keep_alive(Duration::from_secs(20));
mqttoptions.set_inflight(100);

Expand Down
2 changes: 1 addition & 1 deletion benchmarks/clients/rumqttsync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ fn main() {
}

pub fn start(id: &str, payload_size: usize, count: usize) -> Result<(), Box<dyn Error>> {
let mut mqttoptions = MqttOptions::new(id, "localhost", 1883);
let mut mqttoptions = MqttOptions::new(id, "localhost")?;
mqttoptions.set_keep_alive(Duration::from_secs(20));
mqttoptions.set_inflight(100);

Expand Down
2 changes: 0 additions & 2 deletions rumqttc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ http = { version = "1.0.0", optional = true }
# native-tls
tokio-native-tls = { version = "0.3.1", optional = true }
native-tls = { version = "0.2.11", optional = true }
# url
url = { version = "2", default-features = false, optional = true }
# proxy
async-http-proxy = { version = "1.2.5", features = ["runtime-tokio", "basic-auth"], optional = true }

Expand Down
2 changes: 1 addition & 1 deletion rumqttc/examples/async_manual_acks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::error::Error;
use std::time::Duration;

fn create_conn() -> (AsyncClient, EventLoop) {
let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883);
let mut mqttoptions = MqttOptions::new("test-1", "localhost").unwrap();
mqttoptions
.set_keep_alive(Duration::from_secs(5))
.set_manual_acks(true)
Expand Down
2 changes: 1 addition & 1 deletion rumqttc/examples/async_manual_acks_v5.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::error::Error;
use std::time::Duration;

fn create_conn() -> (AsyncClient, EventLoop) {
let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1884);
let mut mqttoptions = MqttOptions::new("test-1", "localhost").unwrap();
mqttoptions
.set_keep_alive(Duration::from_secs(5))
.set_manual_acks(true)
Expand Down
2 changes: 1 addition & 1 deletion rumqttc/examples/asyncpubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
pretty_env_logger::init();
// color_backtrace::install();

let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883);
let mut mqttoptions = MqttOptions::new("test-1", "localhost")?;
mqttoptions.set_keep_alive(Duration::from_secs(5));

let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10);
Expand Down
2 changes: 1 addition & 1 deletion rumqttc/examples/asyncpubsub_v5.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
pretty_env_logger::init();
// color_backtrace::install();

let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1884);
let mut mqttoptions = MqttOptions::new("test-1", "localhost")?;
mqttoptions.set_keep_alive(Duration::from_secs(5));

let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10);
Expand Down
2 changes: 1 addition & 1 deletion rumqttc/examples/serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ impl TryFrom<&[u8]> for Message {
}

fn main() {
let mqqt_opts = MqttOptions::new("test-1", "localhost", 1883);
let mqqt_opts = MqttOptions::new("test-1", "localhost").unwrap();

let (client, mut connection) = Client::new(mqqt_opts, 10);
client.subscribe("hello/rumqtt", QoS::AtMostOnce).unwrap();
Expand Down
2 changes: 1 addition & 1 deletion rumqttc/examples/subscription_ids.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::time::Duration;
async fn main() -> Result<(), Box<dyn Error>> {
pretty_env_logger::init();

let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1884);
let mut mqttoptions = MqttOptions::new("test-1", "localhost")?;
mqttoptions.set_keep_alive(Duration::from_secs(5));

let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10);
Expand Down
2 changes: 1 addition & 1 deletion rumqttc/examples/syncpubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::time::Duration;
fn main() {
pretty_env_logger::init();

let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883);
let mut mqttoptions = MqttOptions::new("test-1", "localhost").unwrap();
let will = LastWill::new("hello/world", "good bye", QoS::AtMostOnce, false);
mqttoptions
.set_keep_alive(Duration::from_secs(5))
Expand Down
2 changes: 1 addition & 1 deletion rumqttc/examples/syncpubsub_v5.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::time::Duration;
fn main() {
pretty_env_logger::init();

let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1884);
let mut mqttoptions = MqttOptions::new("test-1", "localhost").unwrap();
let will = LastWill::new("hello/world", "good bye", QoS::AtMostOnce, false, None);
mqttoptions
.set_keep_alive(Duration::from_secs(5))
Expand Down
2 changes: 1 addition & 1 deletion rumqttc/examples/syncrecv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::time::Duration;
fn main() {
pretty_env_logger::init();

let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883);
let mut mqttoptions = MqttOptions::new("test-1", "localhost").unwrap();
let will = LastWill::new("hello/world", "good bye", QoS::AtMostOnce, false);
mqttoptions
.set_keep_alive(Duration::from_secs(5))
Expand Down
2 changes: 1 addition & 1 deletion rumqttc/examples/syncrecv_v5.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::time::Duration;
fn main() {
pretty_env_logger::init();

let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1884);
let mut mqttoptions = MqttOptions::new("test-1", "localhost").unwrap();
let will = LastWill::new("hello/world", "good bye", QoS::AtMostOnce, false, None);
mqttoptions
.set_keep_alive(Duration::from_secs(5))
Expand Down
2 changes: 1 addition & 1 deletion rumqttc/examples/tls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
pretty_env_logger::init();
color_backtrace::install();

let mut mqttoptions = MqttOptions::new("test-1", "mqtt.example.server", 8883);
let mut mqttoptions = MqttOptions::new("test-1", "mqtt.example.server:8883")?;
mqttoptions.set_keep_alive(std::time::Duration::from_secs(5));
mqttoptions.set_credentials("username", "password");

Expand Down
2 changes: 1 addition & 1 deletion rumqttc/examples/tls2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
pretty_env_logger::init();
color_backtrace::install();

let mut mqttoptions = MqttOptions::new("test-1", "localhost", 8883);
let mut mqttoptions = MqttOptions::new("test-1", "localhost:8883")?;
mqttoptions.set_keep_alive(std::time::Duration::from_secs(5));

// Dummies to prevent compilation error in CI
Expand Down
2 changes: 1 addition & 1 deletion rumqttc/examples/topic_alias.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::time::Duration;
async fn main() -> Result<(), Box<dyn Error>> {
pretty_env_logger::init();

let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1884);
let mut mqttoptions = MqttOptions::new("test-1", "localhost")?;
mqttoptions.set_keep_alive(Duration::from_secs(5));
mqttoptions.set_topic_alias_max(10.into());

Expand Down
4 changes: 1 addition & 3 deletions rumqttc/examples/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,10 @@ use tokio::{task, time};
async fn main() -> Result<(), Box<dyn Error>> {
pretty_env_logger::init();

// port parameter is ignored when scheme is websocket
let mut mqttoptions = MqttOptions::new(
"clientId-aSziq39Bp3",
"ws://broker.mqttdashboard.com:8000/mqtt",
8000,
);
)?;
mqttoptions.set_transport(Transport::Ws);
mqttoptions.set_keep_alive(Duration::from_secs(60));

Expand Down
4 changes: 1 addition & 3 deletions rumqttc/examples/websocket_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,10 @@ async fn main() -> Result<(), Box<dyn Error>> {

pretty_env_logger::init();

// port parameter is ignored when scheme is websocket
let mut mqttoptions = MqttOptions::new(
"clientId-aSziq39Bp3",
"ws://broker.mqttdashboard.com:8000/mqtt",
8000,
);
)?;
mqttoptions.set_transport(Transport::Ws);
mqttoptions.set_keep_alive(Duration::from_secs(60));
// Presumes that there is a proxy server already set up listening on 127.0.0.1:8100
Expand Down
2 changes: 1 addition & 1 deletion rumqttc/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ mod test {
fn calling_iter_twice_on_connection_shouldnt_panic() {
use std::time::Duration;

let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883);
let mut mqttoptions = MqttOptions::new("test-1", "localhost").unwrap();
let will = LastWill::new("hello/world", "good bye", QoS::AtMostOnce, false);
mqttoptions
.set_keep_alive(Duration::from_secs(5))
Expand Down
23 changes: 7 additions & 16 deletions rumqttc/src/eventloop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,8 @@ use crate::tls;

#[cfg(feature = "websocket")]
use {
crate::websockets::{split_url, validate_response_headers, UrlError},
async_tungstenite::tungstenite::client::IntoClientRequest,
ws_stream_tungstenite::WsStream,
crate::websockets::validate_response_headers,
async_tungstenite::tungstenite::client::IntoClientRequest, ws_stream_tungstenite::WsStream,
};

#[cfg(feature = "proxy")]
Expand Down Expand Up @@ -57,9 +56,6 @@ pub enum ConnectionError {
NotConnAck(Packet),
#[error("Requests done")]
RequestsDone,
#[cfg(feature = "websocket")]
#[error("Invalid Url: {0}")]
InvalidUrl(#[from] UrlError),
#[cfg(feature = "proxy")]
#[error("Proxy Connect: {0}")]
Proxy(#[from] ProxyError),
Expand Down Expand Up @@ -360,14 +356,7 @@ async fn network_connect(
return Ok(network);
}

// For websockets domain and port are taken directly from `broker_addr` (which is a url).
let (domain, port) = match options.transport() {
#[cfg(feature = "websocket")]
Transport::Ws => split_url(&options.broker_addr)?,
#[cfg(all(feature = "use-rustls", feature = "websocket"))]
Transport::Wss(_) => split_url(&options.broker_addr)?,
_ => options.broker_address(),
};
let (domain, port) = options.broker_address();

let tcp_stream: Box<dyn N> = {
#[cfg(feature = "proxy")]
Expand Down Expand Up @@ -400,7 +389,8 @@ async fn network_connect(
Transport::Unix => unreachable!(),
#[cfg(feature = "websocket")]
Transport::Ws => {
let mut request = options.broker_addr.as_str().into_client_request()?;
let path = options.addr_path();
let mut request = format!("ws://{domain}:{port}/{path}").into_client_request()?;
request
.headers_mut()
.insert("Sec-WebSocket-Protocol", "mqtt".parse().unwrap());
Expand All @@ -417,7 +407,8 @@ async fn network_connect(
}
#[cfg(all(feature = "use-rustls", feature = "websocket"))]
Transport::Wss(tls_config) => {
let mut request = options.broker_addr.as_str().into_client_request()?;
let path = options.addr_path();
let mut request = format!("wss://{domain}:{port}/{path}").into_client_request()?;
request
.headers_mut()
.insert("Sec-WebSocket-Protocol", "mqtt".parse().unwrap());
Expand Down
Loading
Loading