diff --git a/Cargo.lock b/Cargo.lock index 2a8a6d37..220bf055 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1951,7 +1951,6 @@ dependencies = [ "tokio", "tokio-native-tls", "tokio-rustls", - "url", "ws_stream_tungstenite", ] diff --git a/benchmarks/clients/rumqttasync.rs b/benchmarks/clients/rumqttasync.rs index 1f6cb2bd..0a9f949d 100644 --- a/benchmarks/clients/rumqttasync.rs +++ b/benchmarks/clients/rumqttasync.rs @@ -17,7 +17,7 @@ async fn main() { } pub async fn start(id: &str, payload_size: usize, count: usize) -> Result<(), Box> { - let mut mqttoptions = MqttOptions::new(id, "localhost", 1883); + let mut mqttoptions = MqttOptions::new(id, "localhost")?; mqttoptions.set_keep_alive(Duration::from_secs(20)); mqttoptions.set_inflight(100); diff --git a/benchmarks/clients/rumqttasyncqos0.rs b/benchmarks/clients/rumqttasyncqos0.rs index cd6d6656..e0ddb696 100644 --- a/benchmarks/clients/rumqttasyncqos0.rs +++ b/benchmarks/clients/rumqttasyncqos0.rs @@ -17,7 +17,7 @@ async fn main() { } pub async fn start(id: &str, payload_size: usize, count: usize) -> Result<(), Box> { - let mut mqttoptions = MqttOptions::new(id, "localhost", 1883); + let mut mqttoptions = MqttOptions::new(id, "localhost")?; mqttoptions.set_keep_alive(Duration::from_secs(20)); mqttoptions.set_inflight(100); diff --git a/benchmarks/clients/rumqttsync.rs b/benchmarks/clients/rumqttsync.rs index 42a2f6f0..f91354bf 100644 --- a/benchmarks/clients/rumqttsync.rs +++ b/benchmarks/clients/rumqttsync.rs @@ -13,7 +13,7 @@ fn main() { } pub fn start(id: &str, payload_size: usize, count: usize) -> Result<(), Box> { - let mut mqttoptions = MqttOptions::new(id, "localhost", 1883); + let mut mqttoptions = MqttOptions::new(id, "localhost")?; mqttoptions.set_keep_alive(Duration::from_secs(20)); mqttoptions.set_inflight(100); diff --git a/rumqttc/Cargo.toml b/rumqttc/Cargo.toml index 551dcab5..7588dca6 100644 --- a/rumqttc/Cargo.toml +++ b/rumqttc/Cargo.toml @@ -43,8 +43,6 @@ http = { version = "1.0.0", optional = true } # native-tls tokio-native-tls = { version = "0.3.1", optional = true } native-tls = { version = "0.2.11", optional = true } -# url -url = { version = "2", default-features = false, optional = true } # proxy async-http-proxy = { version = "1.2.5", features = ["runtime-tokio", "basic-auth"], optional = true } diff --git a/rumqttc/examples/async_manual_acks.rs b/rumqttc/examples/async_manual_acks.rs index e5360aa0..f228a6fb 100644 --- a/rumqttc/examples/async_manual_acks.rs +++ b/rumqttc/examples/async_manual_acks.rs @@ -5,7 +5,7 @@ use std::error::Error; use std::time::Duration; fn create_conn() -> (AsyncClient, EventLoop) { - let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883); + let mut mqttoptions = MqttOptions::new("test-1", "localhost").unwrap(); mqttoptions .set_keep_alive(Duration::from_secs(5)) .set_manual_acks(true) diff --git a/rumqttc/examples/async_manual_acks_v5.rs b/rumqttc/examples/async_manual_acks_v5.rs index bcf1bf35..75e6bc23 100644 --- a/rumqttc/examples/async_manual_acks_v5.rs +++ b/rumqttc/examples/async_manual_acks_v5.rs @@ -7,7 +7,7 @@ use std::error::Error; use std::time::Duration; fn create_conn() -> (AsyncClient, EventLoop) { - let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1884); + let mut mqttoptions = MqttOptions::new("test-1", "localhost").unwrap(); mqttoptions .set_keep_alive(Duration::from_secs(5)) .set_manual_acks(true) diff --git a/rumqttc/examples/asyncpubsub.rs b/rumqttc/examples/asyncpubsub.rs index d4d70d3f..b008989d 100644 --- a/rumqttc/examples/asyncpubsub.rs +++ b/rumqttc/examples/asyncpubsub.rs @@ -9,7 +9,7 @@ async fn main() -> Result<(), Box> { pretty_env_logger::init(); // color_backtrace::install(); - let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883); + let mut mqttoptions = MqttOptions::new("test-1", "localhost")?; mqttoptions.set_keep_alive(Duration::from_secs(5)); let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10); diff --git a/rumqttc/examples/asyncpubsub_v5.rs b/rumqttc/examples/asyncpubsub_v5.rs index fd53b53f..a7a999f5 100644 --- a/rumqttc/examples/asyncpubsub_v5.rs +++ b/rumqttc/examples/asyncpubsub_v5.rs @@ -10,7 +10,7 @@ async fn main() -> Result<(), Box> { pretty_env_logger::init(); // color_backtrace::install(); - let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1884); + let mut mqttoptions = MqttOptions::new("test-1", "localhost")?; mqttoptions.set_keep_alive(Duration::from_secs(5)); let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10); diff --git a/rumqttc/examples/serde.rs b/rumqttc/examples/serde.rs index 478a88d5..6fec9000 100644 --- a/rumqttc/examples/serde.rs +++ b/rumqttc/examples/serde.rs @@ -32,7 +32,7 @@ impl TryFrom<&[u8]> for Message { } fn main() { - let mqqt_opts = MqttOptions::new("test-1", "localhost", 1883); + let mqqt_opts = MqttOptions::new("test-1", "localhost").unwrap(); let (client, mut connection) = Client::new(mqqt_opts, 10); client.subscribe("hello/rumqtt", QoS::AtMostOnce).unwrap(); diff --git a/rumqttc/examples/subscription_ids.rs b/rumqttc/examples/subscription_ids.rs index b741bfaf..c5103252 100644 --- a/rumqttc/examples/subscription_ids.rs +++ b/rumqttc/examples/subscription_ids.rs @@ -10,7 +10,7 @@ use std::time::Duration; async fn main() -> Result<(), Box> { pretty_env_logger::init(); - let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1884); + let mut mqttoptions = MqttOptions::new("test-1", "localhost")?; mqttoptions.set_keep_alive(Duration::from_secs(5)); let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10); diff --git a/rumqttc/examples/syncpubsub.rs b/rumqttc/examples/syncpubsub.rs index 5c42ad43..1edc1761 100644 --- a/rumqttc/examples/syncpubsub.rs +++ b/rumqttc/examples/syncpubsub.rs @@ -5,7 +5,7 @@ use std::time::Duration; fn main() { pretty_env_logger::init(); - let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883); + let mut mqttoptions = MqttOptions::new("test-1", "localhost").unwrap(); let will = LastWill::new("hello/world", "good bye", QoS::AtMostOnce, false); mqttoptions .set_keep_alive(Duration::from_secs(5)) diff --git a/rumqttc/examples/syncpubsub_v5.rs b/rumqttc/examples/syncpubsub_v5.rs index cc8632d4..4351d100 100644 --- a/rumqttc/examples/syncpubsub_v5.rs +++ b/rumqttc/examples/syncpubsub_v5.rs @@ -6,7 +6,7 @@ use std::time::Duration; fn main() { pretty_env_logger::init(); - let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1884); + let mut mqttoptions = MqttOptions::new("test-1", "localhost").unwrap(); let will = LastWill::new("hello/world", "good bye", QoS::AtMostOnce, false, None); mqttoptions .set_keep_alive(Duration::from_secs(5)) diff --git a/rumqttc/examples/syncrecv.rs b/rumqttc/examples/syncrecv.rs index f0da8177..21199cde 100644 --- a/rumqttc/examples/syncrecv.rs +++ b/rumqttc/examples/syncrecv.rs @@ -5,7 +5,7 @@ use std::time::Duration; fn main() { pretty_env_logger::init(); - let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883); + let mut mqttoptions = MqttOptions::new("test-1", "localhost").unwrap(); let will = LastWill::new("hello/world", "good bye", QoS::AtMostOnce, false); mqttoptions .set_keep_alive(Duration::from_secs(5)) diff --git a/rumqttc/examples/syncrecv_v5.rs b/rumqttc/examples/syncrecv_v5.rs index 7de064a5..810d7c23 100644 --- a/rumqttc/examples/syncrecv_v5.rs +++ b/rumqttc/examples/syncrecv_v5.rs @@ -6,7 +6,7 @@ use std::time::Duration; fn main() { pretty_env_logger::init(); - let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1884); + let mut mqttoptions = MqttOptions::new("test-1", "localhost").unwrap(); let will = LastWill::new("hello/world", "good bye", QoS::AtMostOnce, false, None); mqttoptions .set_keep_alive(Duration::from_secs(5)) diff --git a/rumqttc/examples/tls.rs b/rumqttc/examples/tls.rs index dc08636f..026861fd 100644 --- a/rumqttc/examples/tls.rs +++ b/rumqttc/examples/tls.rs @@ -9,7 +9,7 @@ async fn main() -> Result<(), Box> { pretty_env_logger::init(); color_backtrace::install(); - let mut mqttoptions = MqttOptions::new("test-1", "mqtt.example.server", 8883); + let mut mqttoptions = MqttOptions::new("test-1", "mqtt.example.server:8883")?; mqttoptions.set_keep_alive(std::time::Duration::from_secs(5)); mqttoptions.set_credentials("username", "password"); diff --git a/rumqttc/examples/tls2.rs b/rumqttc/examples/tls2.rs index c088adb0..a3b6e933 100644 --- a/rumqttc/examples/tls2.rs +++ b/rumqttc/examples/tls2.rs @@ -8,7 +8,7 @@ async fn main() -> Result<(), Box> { pretty_env_logger::init(); color_backtrace::install(); - let mut mqttoptions = MqttOptions::new("test-1", "localhost", 8883); + let mut mqttoptions = MqttOptions::new("test-1", "localhost:8883")?; mqttoptions.set_keep_alive(std::time::Duration::from_secs(5)); // Dummies to prevent compilation error in CI diff --git a/rumqttc/examples/topic_alias.rs b/rumqttc/examples/topic_alias.rs index 394ad3cd..64189280 100644 --- a/rumqttc/examples/topic_alias.rs +++ b/rumqttc/examples/topic_alias.rs @@ -9,7 +9,7 @@ use std::time::Duration; async fn main() -> Result<(), Box> { pretty_env_logger::init(); - let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1884); + let mut mqttoptions = MqttOptions::new("test-1", "localhost")?; mqttoptions.set_keep_alive(Duration::from_secs(5)); mqttoptions.set_topic_alias_max(10.into()); diff --git a/rumqttc/examples/websocket.rs b/rumqttc/examples/websocket.rs index 8aceb5bf..4956aba4 100644 --- a/rumqttc/examples/websocket.rs +++ b/rumqttc/examples/websocket.rs @@ -6,12 +6,10 @@ use tokio::{task, time}; async fn main() -> Result<(), Box> { pretty_env_logger::init(); - // port parameter is ignored when scheme is websocket let mut mqttoptions = MqttOptions::new( "clientId-aSziq39Bp3", "ws://broker.mqttdashboard.com:8000/mqtt", - 8000, - ); + )?; mqttoptions.set_transport(Transport::Ws); mqttoptions.set_keep_alive(Duration::from_secs(60)); diff --git a/rumqttc/examples/websocket_proxy.rs b/rumqttc/examples/websocket_proxy.rs index 42c12353..9388d65f 100644 --- a/rumqttc/examples/websocket_proxy.rs +++ b/rumqttc/examples/websocket_proxy.rs @@ -8,12 +8,10 @@ async fn main() -> Result<(), Box> { pretty_env_logger::init(); - // port parameter is ignored when scheme is websocket let mut mqttoptions = MqttOptions::new( "clientId-aSziq39Bp3", "ws://broker.mqttdashboard.com:8000/mqtt", - 8000, - ); + )?; mqttoptions.set_transport(Transport::Ws); mqttoptions.set_keep_alive(Duration::from_secs(60)); // Presumes that there is a proxy server already set up listening on 127.0.0.1:8100 diff --git a/rumqttc/src/client.rs b/rumqttc/src/client.rs index 863b7319..6ad3d8b4 100644 --- a/rumqttc/src/client.rs +++ b/rumqttc/src/client.rs @@ -500,7 +500,7 @@ mod test { fn calling_iter_twice_on_connection_shouldnt_panic() { use std::time::Duration; - let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883); + let mut mqttoptions = MqttOptions::new("test-1", "localhost").unwrap(); let will = LastWill::new("hello/world", "good bye", QoS::AtMostOnce, false); mqttoptions .set_keep_alive(Duration::from_secs(5)) diff --git a/rumqttc/src/eventloop.rs b/rumqttc/src/eventloop.rs index a8aee76c..80cf4afc 100644 --- a/rumqttc/src/eventloop.rs +++ b/rumqttc/src/eventloop.rs @@ -23,9 +23,8 @@ use crate::tls; #[cfg(feature = "websocket")] use { - crate::websockets::{split_url, validate_response_headers, UrlError}, - async_tungstenite::tungstenite::client::IntoClientRequest, - ws_stream_tungstenite::WsStream, + crate::websockets::validate_response_headers, + async_tungstenite::tungstenite::client::IntoClientRequest, ws_stream_tungstenite::WsStream, }; #[cfg(feature = "proxy")] @@ -57,9 +56,6 @@ pub enum ConnectionError { NotConnAck(Packet), #[error("Requests done")] RequestsDone, - #[cfg(feature = "websocket")] - #[error("Invalid Url: {0}")] - InvalidUrl(#[from] UrlError), #[cfg(feature = "proxy")] #[error("Proxy Connect: {0}")] Proxy(#[from] ProxyError), @@ -360,14 +356,7 @@ async fn network_connect( return Ok(network); } - // For websockets domain and port are taken directly from `broker_addr` (which is a url). - let (domain, port) = match options.transport() { - #[cfg(feature = "websocket")] - Transport::Ws => split_url(&options.broker_addr)?, - #[cfg(all(feature = "use-rustls", feature = "websocket"))] - Transport::Wss(_) => split_url(&options.broker_addr)?, - _ => options.broker_address(), - }; + let (domain, port) = options.broker_address(); let tcp_stream: Box = { #[cfg(feature = "proxy")] @@ -400,7 +389,8 @@ async fn network_connect( Transport::Unix => unreachable!(), #[cfg(feature = "websocket")] Transport::Ws => { - let mut request = options.broker_addr.as_str().into_client_request()?; + let path = options.addr_path(); + let mut request = format!("ws://{domain}:{port}/{path}").into_client_request()?; request .headers_mut() .insert("Sec-WebSocket-Protocol", "mqtt".parse().unwrap()); @@ -417,7 +407,8 @@ async fn network_connect( } #[cfg(all(feature = "use-rustls", feature = "websocket"))] Transport::Wss(tls_config) => { - let mut request = options.broker_addr.as_str().into_client_request()?; + let path = options.addr_path(); + let mut request = format!("wss://{domain}:{port}/{path}").into_client_request()?; request .headers_mut() .insert("Sec-WebSocket-Protocol", "mqtt".parse().unwrap()); diff --git a/rumqttc/src/lib.rs b/rumqttc/src/lib.rs index 43dbb3be..bb1c713b 100644 --- a/rumqttc/src/lib.rs +++ b/rumqttc/src/lib.rs @@ -13,7 +13,7 @@ //! use std::time::Duration; //! use std::thread; //! -//! let mut mqttoptions = MqttOptions::new("rumqtt-sync", "test.mosquitto.org", 1883); +//! let mut mqttoptions = MqttOptions::new("rumqtt-sync", "test.mosquitto.org").unwrap(); //! mqttoptions.set_keep_alive(Duration::from_secs(5)); //! //! let (mut client, mut connection) = Client::new(mqttoptions, 10); @@ -40,7 +40,7 @@ //! //! # #[tokio::main(flavor = "current_thread")] //! # async fn main() { -//! let mut mqttoptions = MqttOptions::new("rumqtt-async", "test.mosquitto.org", 1883); +//! let mut mqttoptions = MqttOptions::new("rumqtt-async", "test.mosquitto.org").unwrap(); //! mqttoptions.set_keep_alive(Duration::from_secs(5)); //! //! let (mut client, mut eventloop) = AsyncClient::new(mqttoptions, 10); @@ -441,10 +441,12 @@ impl NetworkOptions { /// Options to configure the behaviour of MQTT connection #[derive(Clone)] pub struct MqttOptions { - /// broker address that you want to connect to + /// broker address's domain that you want to connect to broker_addr: String, /// broker port port: u16, + /// broker address's path. e.g. /mqtt + addr_path: String, // What transport protocol to use transport: Transport, /// keep alive time to send pingreq to broker when the connection is idle @@ -488,17 +490,69 @@ impl MqttOptions { /// /// ``` /// # use rumqttc::MqttOptions; - /// let options = MqttOptions::new("123", "localhost", 1883); + /// let options = MqttOptions::new("123", "localhost").unwrap(); /// ``` - pub fn new, T: Into>(id: S, host: T, port: u16) -> MqttOptions { - MqttOptions { + pub fn new, T: AsRef>(id: S, host: T) -> Result { + // mqtt[s]://[username][:password]@host.domain[:port] + // ref: https://github.com/mqtt/mqtt.org/wiki/URI-Scheme + let mut host = host.as_ref(); + + let (mut transport, mut port) = (Transport::Tcp, 1883); + + if let Some((scheme, rest)) = host.split_once("://") { + host = rest; + (transport, port) = match scheme { + // Encrypted connections are supported, but require explicit TLS configuration. We fall + // back to the unencrypted transport layer, so that `set_transport` can be used to + // configure the encrypted transport layer with the provided TLS configuration. + // NOTE(swanx): why no use-native-tls!? + #[cfg(feature = "use-rustls")] + "mqtts" | "ssl" => (Transport::tls_with_default_config(), 8883), + "mqtt" | "tcp" => (Transport::Tcp, 1883), + #[cfg(feature = "websocket")] + "ws" => (Transport::Ws, 8000), + #[cfg(all(feature = "use-rustls", feature = "websocket"))] + "wss" => (Transport::wss_with_default_config(), 8000), + _ => return Err(OptionError::Scheme), + }; + } + + let mut credentials = None; + if let Some((user, rest)) = host.split_once('@') { + host = rest; + credentials = user + .split_once(':') + .or(Some((user, ""))) + .map(|(u, p)| (u.into(), p.into())); + }; + + // NOTE(swanx): path is only used for websockets! + // we may want to log warning if path is going to be ignored! + let mut addr_path = ""; + if let Some((rest, path)) = host.split_once('/') { + host = rest; + addr_path = path; + }; + + // ideally we should validate host as well + // i.e. to see if it is valid domain / ip addr + // but validating it manually is quite big task + // using dependency like url::Host doesn't support no_std + // we can use url once they support no_std! + if let Some((rest, p)) = host.split_once(':') { + host = rest; + port = p.parse::().map_err(|_| OptionError::Port)?; + }; + + Ok(MqttOptions { broker_addr: host.into(), port, - transport: Transport::tcp(), + addr_path: addr_path.into(), + transport, keep_alive: Duration::from_secs(60), clean_session: true, client_id: id.into(), - credentials: None, + credentials, max_incoming_packet_size: 10 * 1024, max_outgoing_packet_size: 10 * 1024, request_channel_capacity: 10, @@ -511,40 +565,7 @@ impl MqttOptions { proxy: None, #[cfg(feature = "websocket")] request_modifier: None, - } - } - - #[cfg(feature = "url")] - /// Creates an [`MqttOptions`] object by parsing provided string with the [url] crate's - /// [`Url::parse(url)`](url::Url::parse) method and is only enabled when run using the "url" feature. - /// - /// ``` - /// # use rumqttc::MqttOptions; - /// let options = MqttOptions::parse_url("mqtt://example.com:1883?client_id=123").unwrap(); - /// ``` - /// - /// **NOTE:** A url must be prefixed with one of either `tcp://`, `mqtt://`, `ssl://`,`mqtts://`, - /// `ws://` or `wss://` to denote the protocol for establishing a connection with the broker. - /// - /// **NOTE:** Encrypted connections(i.e. `mqtts://`, `ssl://`, `wss://`) by default use the - /// system's root certificates. To configure with custom certificates, one may use the - /// [`set_transport`](MqttOptions::set_transport) method. - /// - /// ```ignore - /// # use rumqttc::{MqttOptions, Transport}; - /// # use tokio_rustls::rustls::ClientConfig; - /// # let root_cert_store = rustls::RootCertStore::empty(); - /// # let client_config = ClientConfig::builder() - /// # .with_root_certificates(root_cert_store) - /// # .with_no_client_auth(); - /// let mut options = MqttOptions::parse_url("mqtts://example.com?client_id=123").unwrap(); - /// options.set_transport(Transport::tls_with_config(client_config.into())); - /// ``` - pub fn parse_url>(url: S) -> Result { - let url = url::Url::parse(&url.into())?; - let options = MqttOptions::try_from(url)?; - - Ok(options) + }) } /// Broker address @@ -552,6 +573,15 @@ impl MqttOptions { (self.broker_addr.clone(), self.port) } + pub fn addr_path(&self) -> String { + self.addr_path.clone() + } + + pub fn set_port(&mut self, port: u16) -> &mut Self { + self.port = port; + self + } + pub fn set_last_will(&mut self, will: LastWill) -> &mut Self { self.last_will = Some(will); self @@ -618,7 +648,7 @@ impl MqttOptions { /// /// ```should_panic /// # use rumqttc::MqttOptions; - /// let mut options = MqttOptions::new("", "localhost", 1883); + /// let mut options = MqttOptions::new("", "localhost").unwrap(); /// options.set_clean_session(false); /// ``` pub fn set_clean_session(&mut self, clean_session: bool) -> &mut Self { @@ -728,169 +758,13 @@ impl MqttOptions { } } -#[cfg(feature = "url")] #[derive(Debug, PartialEq, Eq, thiserror::Error)] pub enum OptionError { #[error("Unsupported URL scheme.")] Scheme, - #[error("Missing client ID.")] - ClientId, - - #[error("Invalid keep-alive value.")] - KeepAlive, - - #[error("Invalid clean-session value.")] - CleanSession, - - #[error("Invalid max-incoming-packet-size value.")] - MaxIncomingPacketSize, - - #[error("Invalid max-outgoing-packet-size value.")] - MaxOutgoingPacketSize, - - #[error("Invalid request-channel-capacity value.")] - RequestChannelCapacity, - - #[error("Invalid max-request-batch value.")] - MaxRequestBatch, - - #[error("Invalid pending-throttle value.")] - PendingThrottle, - - #[error("Invalid inflight value.")] - Inflight, - - #[error("Unknown option: {0}")] - Unknown(String), - - #[error("Couldn't parse option from url: {0}")] - Parse(#[from] url::ParseError), -} - -#[cfg(feature = "url")] -impl std::convert::TryFrom for MqttOptions { - type Error = OptionError; - - fn try_from(url: url::Url) -> Result { - use std::collections::HashMap; - - let host = url.host_str().unwrap_or_default().to_owned(); - - let (transport, default_port) = match url.scheme() { - // Encrypted connections are supported, but require explicit TLS configuration. We fall - // back to the unencrypted transport layer, so that `set_transport` can be used to - // configure the encrypted transport layer with the provided TLS configuration. - #[cfg(feature = "use-rustls")] - "mqtts" | "ssl" => (Transport::tls_with_default_config(), 8883), - "mqtt" | "tcp" => (Transport::Tcp, 1883), - #[cfg(feature = "websocket")] - "ws" => (Transport::Ws, 8000), - #[cfg(all(feature = "use-rustls", feature = "websocket"))] - "wss" => (Transport::wss_with_default_config(), 8000), - _ => return Err(OptionError::Scheme), - }; - - let port = url.port().unwrap_or(default_port); - - let mut queries = url.query_pairs().collect::>(); - - let id = queries - .remove("client_id") - .ok_or(OptionError::ClientId)? - .into_owned(); - - let mut options = MqttOptions::new(id, host, port); - options.set_transport(transport); - - if let Some(keep_alive) = queries - .remove("keep_alive_secs") - .map(|v| v.parse::().map_err(|_| OptionError::KeepAlive)) - .transpose()? - { - options.set_keep_alive(Duration::from_secs(keep_alive)); - } - - if let Some(clean_session) = queries - .remove("clean_session") - .map(|v| v.parse::().map_err(|_| OptionError::CleanSession)) - .transpose()? - { - options.set_clean_session(clean_session); - } - - if let Some((username, password)) = { - match url.username() { - "" => None, - username => Some(( - username.to_owned(), - url.password().unwrap_or_default().to_owned(), - )), - } - } { - options.set_credentials(username, password); - } - - if let (Some(incoming), Some(outgoing)) = ( - queries - .remove("max_incoming_packet_size_bytes") - .map(|v| { - v.parse::() - .map_err(|_| OptionError::MaxIncomingPacketSize) - }) - .transpose()?, - queries - .remove("max_outgoing_packet_size_bytes") - .map(|v| { - v.parse::() - .map_err(|_| OptionError::MaxOutgoingPacketSize) - }) - .transpose()?, - ) { - options.set_max_packet_size(incoming, outgoing); - } - - if let Some(request_channel_capacity) = queries - .remove("request_channel_capacity_num") - .map(|v| { - v.parse::() - .map_err(|_| OptionError::RequestChannelCapacity) - }) - .transpose()? - { - options.request_channel_capacity = request_channel_capacity; - } - - if let Some(max_request_batch) = queries - .remove("max_request_batch_num") - .map(|v| v.parse::().map_err(|_| OptionError::MaxRequestBatch)) - .transpose()? - { - options.max_request_batch = max_request_batch; - } - - if let Some(pending_throttle) = queries - .remove("pending_throttle_usecs") - .map(|v| v.parse::().map_err(|_| OptionError::PendingThrottle)) - .transpose()? - { - options.set_pending_throttle(Duration::from_micros(pending_throttle)); - } - - if let Some(inflight) = queries - .remove("inflight_num") - .map(|v| v.parse::().map_err(|_| OptionError::Inflight)) - .transpose()? - { - options.set_inflight(inflight); - } - - if let Some((opt, _)) = queries.into_iter().next() { - return Err(OptionError::Unknown(opt.into_owned())); - } - - Ok(options) - } + #[error("Can't parse port as u16")] + Port, } // Implement Debug manually because ClientConfig doesn't implement it, so derive(Debug) doesn't @@ -922,9 +796,11 @@ mod test { #[test] #[cfg(all(feature = "use-rustls", feature = "websocket"))] fn no_scheme() { - let mut mqttoptions = MqttOptions::new("client_a", "a3f8czas.iot.eu-west-1.amazonaws.com/mqtt?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=MyCreds%2F20201001%2Feu-west-1%2Fiotdevicegateway%2Faws4_request&X-Amz-Date=20201001T130812Z&X-Amz-Expires=7200&X-Amz-Signature=9ae09b49896f44270f2707551581953e6cac71a4ccf34c7c3415555be751b2d1&X-Amz-SignedHeaders=host", 443); + let mut mqttoptions = MqttOptions::new("client_a", "a3f8czas.iot.eu-west-1.amazonaws.com/mqtt?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=MyCreds%2F20201001%2Feu-west-1%2Fiotdevicegateway%2Faws4_request&X-Amz-Date=20201001T130812Z&X-Amz-Expires=7200&X-Amz-Signature=9ae09b49896f44270f2707551581953e6cac71a4ccf34c7c3415555be751b2d1&X-Amz-SignedHeaders=host").unwrap(); - mqttoptions.set_transport(crate::Transport::wss(Vec::from("Test CA"), None, None)); + mqttoptions + .set_transport(crate::Transport::wss(Vec::from("Test CA"), None, None)) + .set_port(433); if let crate::Transport::Wss(TlsConfiguration::Simple { ca, @@ -939,77 +815,23 @@ mod test { panic!("Unexpected transport!"); } - assert_eq!(mqttoptions.broker_addr, "a3f8czas.iot.eu-west-1.amazonaws.com/mqtt?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=MyCreds%2F20201001%2Feu-west-1%2Fiotdevicegateway%2Faws4_request&X-Amz-Date=20201001T130812Z&X-Amz-Expires=7200&X-Amz-Signature=9ae09b49896f44270f2707551581953e6cac71a4ccf34c7c3415555be751b2d1&X-Amz-SignedHeaders=host"); - } - - #[test] - #[cfg(feature = "url")] - fn from_url() { - fn opt(s: &str) -> Result { - MqttOptions::parse_url(s) - } - fn ok(s: &str) -> MqttOptions { - opt(s).expect("valid options") - } - fn err(s: &str) -> OptionError { - opt(s).expect_err("invalid options") - } - - let v = ok("mqtt://host:42?client_id=foo"); - assert_eq!(v.broker_address(), ("host".to_owned(), 42)); - assert_eq!(v.client_id(), "foo".to_owned()); - - let v = ok("mqtt://host:42?client_id=foo&keep_alive_secs=5"); - assert_eq!(v.keep_alive, Duration::from_secs(5)); - - assert_eq!(err("mqtt://host:42"), OptionError::ClientId); - assert_eq!( - err("mqtt://host:42?client_id=foo&foo=bar"), - OptionError::Unknown("foo".to_owned()) - ); - assert_eq!(err("mqt://host:42?client_id=foo"), OptionError::Scheme); - assert_eq!( - err("mqtt://host:42?client_id=foo&keep_alive_secs=foo"), - OptionError::KeepAlive - ); - assert_eq!( - err("mqtt://host:42?client_id=foo&clean_session=foo"), - OptionError::CleanSession - ); - assert_eq!( - err("mqtt://host:42?client_id=foo&max_incoming_packet_size_bytes=foo"), - OptionError::MaxIncomingPacketSize - ); - assert_eq!( - err("mqtt://host:42?client_id=foo&max_outgoing_packet_size_bytes=foo"), - OptionError::MaxOutgoingPacketSize - ); - assert_eq!( - err("mqtt://host:42?client_id=foo&request_channel_capacity_num=foo"), - OptionError::RequestChannelCapacity - ); - assert_eq!( - err("mqtt://host:42?client_id=foo&max_request_batch_num=foo"), - OptionError::MaxRequestBatch - ); - assert_eq!( - err("mqtt://host:42?client_id=foo&pending_throttle_usecs=foo"), - OptionError::PendingThrottle - ); assert_eq!( - err("mqtt://host:42?client_id=foo&inflight_num=foo"), - OptionError::Inflight + mqttoptions.broker_addr, + "a3f8czas.iot.eu-west-1.amazonaws.com" ); + assert_eq!(mqttoptions.addr_path, "mqtt?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=MyCreds%2F20201001%2Feu-west-1%2Fiotdevicegateway%2Faws4_request&X-Amz-Date=20201001T130812Z&X-Amz-Expires=7200&X-Amz-Signature=9ae09b49896f44270f2707551581953e6cac71a4ccf34c7c3415555be751b2d1&X-Amz-SignedHeaders=host"); } #[test] fn accept_empty_client_id() { - let _mqtt_opts = MqttOptions::new("", "127.0.0.1", 1883).set_clean_session(true); + let _mqtt_opts = MqttOptions::new("", "127.0.0.1") + .unwrap() + .set_clean_session(true); } #[test] fn set_clean_session_when_client_id_present() { - let mut options = MqttOptions::new("client_id", "127.0.0.1", 1883); + let mut options = MqttOptions::new("client_id", "127.0.0.1").unwrap(); options.set_clean_session(false); options.set_clean_session(true); } diff --git a/rumqttc/src/v5/client.rs b/rumqttc/src/v5/client.rs index 910da504..c3360c8f 100644 --- a/rumqttc/src/v5/client.rs +++ b/rumqttc/src/v5/client.rs @@ -848,7 +848,7 @@ mod test { fn calling_iter_twice_on_connection_shouldnt_panic() { use std::time::Duration; - let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883); + let mut mqttoptions = MqttOptions::new("test-1", "localhost").unwrap(); let will = LastWill::new("hello/world", "good bye", QoS::AtMostOnce, false, None); mqttoptions .set_keep_alive(Duration::from_secs(5)) diff --git a/rumqttc/src/v5/eventloop.rs b/rumqttc/src/v5/eventloop.rs index 36c10971..e7dc95b6 100644 --- a/rumqttc/src/v5/eventloop.rs +++ b/rumqttc/src/v5/eventloop.rs @@ -24,9 +24,8 @@ use {std::path::Path, tokio::net::UnixStream}; #[cfg(feature = "websocket")] use { - crate::websockets::{split_url, validate_response_headers, UrlError}, - async_tungstenite::tungstenite::client::IntoClientRequest, - ws_stream_tungstenite::WsStream, + crate::websockets::validate_response_headers, + async_tungstenite::tungstenite::client::IntoClientRequest, ws_stream_tungstenite::WsStream, }; #[cfg(feature = "proxy")] @@ -56,9 +55,6 @@ pub enum ConnectionError { NotConnAck(Box), #[error("Requests done")] RequestsDone, - #[cfg(feature = "websocket")] - #[error("Invalid Url: {0}")] - InvalidUrl(#[from] UrlError), #[cfg(feature = "proxy")] #[error("Proxy Connect: {0}")] Proxy(#[from] ProxyError), @@ -295,14 +291,7 @@ async fn network_connect(options: &MqttOptions) -> Result split_url(&options.broker_addr)?, - #[cfg(all(feature = "use-rustls", feature = "websocket"))] - Transport::Wss(_) => split_url(&options.broker_addr)?, - _ => options.broker_address(), - }; + let (domain, port) = options.broker_address(); let tcp_stream: Box = { #[cfg(feature = "proxy")] @@ -339,7 +328,8 @@ async fn network_connect(options: &MqttOptions) -> Result unreachable!(), #[cfg(feature = "websocket")] Transport::Ws => { - let mut request = options.broker_addr.as_str().into_client_request()?; + let path = options.addr_path(); + let mut request = format!("ws://{domain}:{port}/{path}").into_client_request()?; request .headers_mut() .insert("Sec-WebSocket-Protocol", "mqtt".parse().unwrap()); @@ -356,7 +346,8 @@ async fn network_connect(options: &MqttOptions) -> Result { - let mut request = options.broker_addr.as_str().into_client_request()?; + let path = options.addr_path(); + let mut request = format!("wss://{domain}:{port}/{path}").into_client_request()?; request .headers_mut() .insert("Sec-WebSocket-Protocol", "mqtt".parse().unwrap()); diff --git a/rumqttc/src/v5/mod.rs b/rumqttc/src/v5/mod.rs index 663cfd27..95ba8d2f 100644 --- a/rumqttc/src/v5/mod.rs +++ b/rumqttc/src/v5/mod.rs @@ -62,10 +62,12 @@ type RequestModifierFn = Arc< /// Options to configure the behaviour of MQTT connection #[derive(Clone)] pub struct MqttOptions { - /// broker address that you want to connect to + /// broker address's domain that you want to connect to broker_addr: String, /// broker port port: u16, + /// broker address's path. e.g. /mqtt + addr_path: String, // What transport protocol to use transport: Transport, /// keep alive time to send pingreq to broker when the connection is idle @@ -113,18 +115,70 @@ impl MqttOptions { /// - port: The port number on which broker must be listening for incoming connections /// /// ``` - /// # use rumqttc::v5::MqttOptions; - /// let options = MqttOptions::new("123", "localhost", 1883); + /// # use rumqttc::MqttOptions; + /// let options = MqttOptions::new("123", "localhost").unwrap(); /// ``` - pub fn new, T: Into>(id: S, host: T, port: u16) -> MqttOptions { - MqttOptions { + pub fn new, T: AsRef>(id: S, host: T) -> Result { + // mqtt[s]://[username][:password]@host.domain[:port] + // ref: https://github.com/mqtt/mqtt.org/wiki/URI-Scheme + let mut host = host.as_ref(); + + let (mut transport, mut port) = (Transport::Tcp, 1883); + + if let Some((scheme, rest)) = host.split_once("://") { + host = rest; + (transport, port) = match scheme { + // Encrypted connections are supported, but require explicit TLS configuration. We fall + // back to the unencrypted transport layer, so that `set_transport` can be used to + // configure the encrypted transport layer with the provided TLS configuration. + // NOTE(swanx): why no use-native-tls!? + #[cfg(feature = "use-rustls")] + "mqtts" | "ssl" => (Transport::tls_with_default_config(), 8883), + "mqtt" | "tcp" => (Transport::Tcp, 1883), + #[cfg(feature = "websocket")] + "ws" => (Transport::Ws, 8000), + #[cfg(all(feature = "use-rustls", feature = "websocket"))] + "wss" => (Transport::wss_with_default_config(), 8000), + _ => return Err(OptionError::Scheme), + }; + } + + let mut credentials = None; + if let Some((user, rest)) = host.split_once('@') { + host = rest; + credentials = user + .split_once(':') + .or(Some((user, ""))) + .map(|(u, p)| (u.into(), p.into())); + }; + + // NOTE(swanx): path is only used for websockets! + // we may want to log warning if path is going to be ignored! + let mut addr_path = ""; + if let Some((rest, path)) = host.split_once('/') { + host = rest; + addr_path = path; + }; + + // ideally we should validate host as well + // i.e. to see if it is valid domain / ip addr + // but validating it manually is quite big task + // using dependency like url::Host doesn't support no_std + // we can use url once they support no_std! + if let Some((rest, p)) = host.split_once(':') { + host = rest; + port = p.parse::().map_err(|_| OptionError::Port)?; + }; + + Ok(MqttOptions { broker_addr: host.into(), port, - transport: Transport::tcp(), + addr_path: addr_path.into(), + transport, keep_alive: Duration::from_secs(60), clean_start: true, client_id: id.into(), - credentials: None, + credentials, request_channel_capacity: 10, max_request_batch: 0, pending_throttle: Duration::from_micros(0), @@ -139,40 +193,7 @@ impl MqttOptions { outgoing_inflight_upper_limit: None, #[cfg(feature = "websocket")] request_modifier: None, - } - } - - #[cfg(feature = "url")] - /// Creates an [`MqttOptions`] object by parsing provided string with the [url] crate's - /// [`Url::parse(url)`](url::Url::parse) method and is only enabled when run using the "url" feature. - /// - /// ``` - /// # use rumqttc::MqttOptions; - /// let options = MqttOptions::parse_url("mqtt://example.com:1883?client_id=123").unwrap(); - /// ``` - /// - /// **NOTE:** A url must be prefixed with one of either `tcp://`, `mqtt://`, `ssl://`,`mqtts://`, - /// `ws://` or `wss://` to denote the protocol for establishing a connection with the broker. - /// - /// **NOTE:** Encrypted connections(i.e. `mqtts://`, `ssl://`, `wss://`) by default use the - /// system's root certificates. To configure with custom certificates, one may use the - /// [`set_transport`](MqttOptions::set_transport) method. - /// - /// ```ignore - /// # use rumqttc::{MqttOptions, Transport}; - /// # use tokio_rustls::rustls::ClientConfig; - /// # let root_cert_store = rustls::RootCertStore::empty(); - /// # let client_config = ClientConfig::builder() - /// # .with_root_certificates(root_cert_store) - /// # .with_no_client_auth(); - /// let mut options = MqttOptions::parse_url("mqtts://example.com?client_id=123").unwrap(); - /// options.set_transport(Transport::tls_with_config(client_config.into())); - /// ``` - pub fn parse_url>(url: S) -> Result { - let url = url::Url::parse(&url.into())?; - let options = MqttOptions::try_from(url)?; - - Ok(options) + }) } /// Broker address @@ -180,6 +201,15 @@ impl MqttOptions { (self.broker_addr.clone(), self.port) } + pub fn addr_path(&self) -> String { + self.addr_path.clone() + } + + pub fn set_port(&mut self, port: u16) -> &mut Self { + self.port = port; + self + } + pub fn set_last_will(&mut self, will: LastWill) -> &mut Self { self.last_will = Some(will); self @@ -528,168 +558,13 @@ impl MqttOptions { } } -#[cfg(feature = "url")] #[derive(Debug, PartialEq, Eq, thiserror::Error)] pub enum OptionError { #[error("Unsupported URL scheme.")] Scheme, - #[error("Missing client ID.")] - ClientId, - - #[error("Invalid keep-alive value.")] - KeepAlive, - - #[error("Invalid clean-start value.")] - CleanStart, - - #[error("Invalid max-incoming-packet-size value.")] - MaxIncomingPacketSize, - - #[error("Invalid max-outgoing-packet-size value.")] - MaxOutgoingPacketSize, - - #[error("Invalid request-channel-capacity value.")] - RequestChannelCapacity, - - #[error("Invalid max-request-batch value.")] - MaxRequestBatch, - - #[error("Invalid pending-throttle value.")] - PendingThrottle, - - #[error("Invalid inflight value.")] - Inflight, - - #[error("Invalid conn-timeout value.")] - ConnTimeout, - - #[error("Unknown option: {0}")] - Unknown(String), - - #[error("Couldn't parse option from url: {0}")] - Parse(#[from] url::ParseError), -} - -#[cfg(feature = "url")] -impl std::convert::TryFrom for MqttOptions { - type Error = OptionError; - - fn try_from(url: url::Url) -> Result { - use std::collections::HashMap; - - let host = url.host_str().unwrap_or_default().to_owned(); - - let (transport, default_port) = match url.scheme() { - // Encrypted connections are supported, but require explicit TLS configuration. We fall - // back to the unencrypted transport layer, so that `set_transport` can be used to - // configure the encrypted transport layer with the provided TLS configuration. - #[cfg(feature = "use-rustls")] - "mqtts" | "ssl" => (Transport::tls_with_default_config(), 8883), - "mqtt" | "tcp" => (Transport::Tcp, 1883), - #[cfg(feature = "websocket")] - "ws" => (Transport::Ws, 8000), - #[cfg(all(feature = "use-rustls", feature = "websocket"))] - "wss" => (Transport::wss_with_default_config(), 8000), - _ => return Err(OptionError::Scheme), - }; - - let port = url.port().unwrap_or(default_port); - - let mut queries = url.query_pairs().collect::>(); - - let id = queries - .remove("client_id") - .ok_or(OptionError::ClientId)? - .into_owned(); - - let mut options = MqttOptions::new(id, host, port); - let mut connect_props = ConnectProperties::new(); - options.set_transport(transport); - - if let Some(keep_alive) = queries - .remove("keep_alive_secs") - .map(|v| v.parse::().map_err(|_| OptionError::KeepAlive)) - .transpose()? - { - options.set_keep_alive(Duration::from_secs(keep_alive)); - } - - if let Some(clean_start) = queries - .remove("clean_start") - .map(|v| v.parse::().map_err(|_| OptionError::CleanStart)) - .transpose()? - { - options.set_clean_start(clean_start); - } - - if let Some((username, password)) = { - match url.username() { - "" => None, - username => Some(( - username.to_owned(), - url.password().unwrap_or_default().to_owned(), - )), - } - } { - options.set_credentials(username, password); - } - - connect_props.max_packet_size = queries - .remove("max_incoming_packet_size_bytes") - .map(|v| { - v.parse::() - .map_err(|_| OptionError::MaxIncomingPacketSize) - }) - .transpose()?; - - if let Some(request_channel_capacity) = queries - .remove("request_channel_capacity_num") - .map(|v| { - v.parse::() - .map_err(|_| OptionError::RequestChannelCapacity) - }) - .transpose()? - { - options.request_channel_capacity = request_channel_capacity; - } - - if let Some(max_request_batch) = queries - .remove("max_request_batch_num") - .map(|v| v.parse::().map_err(|_| OptionError::MaxRequestBatch)) - .transpose()? - { - options.max_request_batch = max_request_batch; - } - - if let Some(pending_throttle) = queries - .remove("pending_throttle_usecs") - .map(|v| v.parse::().map_err(|_| OptionError::PendingThrottle)) - .transpose()? - { - options.set_pending_throttle(Duration::from_micros(pending_throttle)); - } - - connect_props.receive_maximum = queries - .remove("inflight_num") - .map(|v| v.parse::().map_err(|_| OptionError::Inflight)) - .transpose()?; - - if let Some(conn_timeout) = queries - .remove("conn_timeout_secs") - .map(|v| v.parse::().map_err(|_| OptionError::ConnTimeout)) - .transpose()? - { - options.set_connection_timeout(conn_timeout); - } - - if let Some((opt, _)) = queries.into_iter().next() { - return Err(OptionError::Unknown(opt.into_owned())); - } - - options.connect_properties = Some(connect_props); - Ok(options) - } + #[error("Can't parse port as u16")] + Port, } // Implement Debug manually because ClientConfig doesn't implement it, so derive(Debug) doesn't @@ -722,7 +597,7 @@ mod test { #[cfg(all(feature = "use-rustls", feature = "websocket"))] fn no_scheme() { use crate::{TlsConfiguration, Transport}; - let mut mqttoptions = MqttOptions::new("client_a", "a3f8czas.iot.eu-west-1.amazonaws.com/mqtt?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=MyCreds%2F20201001%2Feu-west-1%2Fiotdevicegateway%2Faws4_request&X-Amz-Date=20201001T130812Z&X-Amz-Expires=7200&X-Amz-Signature=9ae09b49896f44270f2707551581953e6cac71a4ccf34c7c3415555be751b2d1&X-Amz-SignedHeaders=host", 443); + let mut mqttoptions = MqttOptions::new("client_a", "a3f8czas.iot.eu-west-1.amazonaws.com/mqtt?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=MyCreds%2F20201001%2Feu-west-1%2Fiotdevicegateway%2Faws4_request&X-Amz-Date=20201001T130812Z&X-Amz-Expires=7200&X-Amz-Signature=9ae09b49896f44270f2707551581953e6cac71a4ccf34c7c3415555be751b2d1&X-Amz-SignedHeaders=host").unwrap(); mqttoptions.set_transport(Transport::wss(Vec::from("Test CA"), None, None)); @@ -739,71 +614,17 @@ mod test { panic!("Unexpected transport!"); } - assert_eq!(mqttoptions.broker_addr, "a3f8czas.iot.eu-west-1.amazonaws.com/mqtt?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=MyCreds%2F20201001%2Feu-west-1%2Fiotdevicegateway%2Faws4_request&X-Amz-Date=20201001T130812Z&X-Amz-Expires=7200&X-Amz-Signature=9ae09b49896f44270f2707551581953e6cac71a4ccf34c7c3415555be751b2d1&X-Amz-SignedHeaders=host"); - } - - #[test] - #[cfg(feature = "url")] - fn from_url() { - fn opt(s: &str) -> Result { - MqttOptions::parse_url(s) - } - fn ok(s: &str) -> MqttOptions { - opt(s).expect("valid options") - } - fn err(s: &str) -> OptionError { - opt(s).expect_err("invalid options") - } - - let v = ok("mqtt://host:42?client_id=foo"); - assert_eq!(v.broker_address(), ("host".to_owned(), 42)); - assert_eq!(v.client_id(), "foo".to_owned()); - - let v = ok("mqtt://host:42?client_id=foo&keep_alive_secs=5"); - assert_eq!(v.keep_alive, Duration::from_secs(5)); - - assert_eq!(err("mqtt://host:42"), OptionError::ClientId); - assert_eq!( - err("mqtt://host:42?client_id=foo&foo=bar"), - OptionError::Unknown("foo".to_owned()) - ); - assert_eq!(err("mqt://host:42?client_id=foo"), OptionError::Scheme); - assert_eq!( - err("mqtt://host:42?client_id=foo&keep_alive_secs=foo"), - OptionError::KeepAlive - ); - assert_eq!( - err("mqtt://host:42?client_id=foo&clean_start=foo"), - OptionError::CleanStart - ); - assert_eq!( - err("mqtt://host:42?client_id=foo&max_incoming_packet_size_bytes=foo"), - OptionError::MaxIncomingPacketSize - ); - assert_eq!( - err("mqtt://host:42?client_id=foo&request_channel_capacity_num=foo"), - OptionError::RequestChannelCapacity - ); - assert_eq!( - err("mqtt://host:42?client_id=foo&max_request_batch_num=foo"), - OptionError::MaxRequestBatch - ); - assert_eq!( - err("mqtt://host:42?client_id=foo&pending_throttle_usecs=foo"), - OptionError::PendingThrottle - ); - assert_eq!( - err("mqtt://host:42?client_id=foo&inflight_num=foo"), - OptionError::Inflight - ); assert_eq!( - err("mqtt://host:42?client_id=foo&conn_timeout_secs=foo"), - OptionError::ConnTimeout + mqttoptions.broker_addr, + "a3f8czas.iot.eu-west-1.amazonaws.com" ); + assert_eq!(mqttoptions.addr_path, "mqtt?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=MyCreds%2F20201001%2Feu-west-1%2Fiotdevicegateway%2Faws4_request&X-Amz-Date=20201001T130812Z&X-Amz-Expires=7200&X-Amz-Signature=9ae09b49896f44270f2707551581953e6cac71a4ccf34c7c3415555be751b2d1&X-Amz-SignedHeaders=host"); } #[test] fn allow_empty_client_id() { - let _mqtt_opts = MqttOptions::new("", "127.0.0.1", 1883).set_clean_start(true); + let _mqtt_opts = MqttOptions::new("", "127.0.0.1") + .unwrap() + .set_clean_start(true); } } diff --git a/rumqttc/src/websockets.rs b/rumqttc/src/websockets.rs index a4dafd51..5b1f843b 100644 --- a/rumqttc/src/websockets.rs +++ b/rumqttc/src/websockets.rs @@ -1,15 +1,5 @@ use http::{header::ToStrError, Response}; -#[derive(Debug, thiserror::Error)] -pub enum UrlError { - #[error("Invalid protocol specified inside url.")] - Protocol, - #[error("Couldn't parse host from url.")] - Host, - #[error("Couldn't parse host url.")] - Parse(#[from] http::uri::InvalidUri), -} - #[derive(Debug, thiserror::Error)] pub enum ValidationError { #[error("Websocket response does not contain subprotocol header")] @@ -39,35 +29,3 @@ pub(crate) fn validate_response_headers( Ok(()) } - -pub(crate) fn split_url(url: &str) -> Result<(String, u16), UrlError> { - let uri = url.parse::()?; - let domain = domain(&uri).ok_or(UrlError::Protocol)?; - let port = port(&uri).ok_or(UrlError::Host)?; - Ok((domain, port)) -} - -fn domain(uri: &http::Uri) -> Option { - uri.host().map(|host| { - // If host is an IPv6 address, it might be surrounded by brackets. These brackets are - // *not* part of a valid IP, so they must be stripped out. - // - // The URI from the request is guaranteed to be valid, so we don't need a separate - // check for the closing bracket. - let host = if host.starts_with('[') { - &host[1..host.len() - 1] - } else { - host - }; - - host.to_owned() - }) -} - -fn port(uri: &http::Uri) -> Option { - uri.port_u16().or_else(|| match uri.scheme_str() { - Some("wss") => Some(443), - Some("ws") => Some(80), - _ => None, - }) -} diff --git a/rumqttc/tests/reliability.rs b/rumqttc/tests/reliability.rs index 3e7acd1e..096b0963 100644 --- a/rumqttc/tests/reliability.rs +++ b/rumqttc/tests/reliability.rs @@ -77,7 +77,7 @@ async fn connection_should_timeout_on_time() { }); time::sleep(Duration::from_secs(1)).await; - let options = MqttOptions::new("dummy", "127.0.0.1", 1880); + let options = MqttOptions::new("dummy", "127.0.0.1:1880").unwrap(); let mut eventloop = EventLoop::new(options, 5); let start = Instant::now(); @@ -96,19 +96,19 @@ async fn connection_should_timeout_on_time() { #[test] #[should_panic] fn test_invalid_keep_alive_value() { - let mut options = MqttOptions::new("dummy", "127.0.0.1", 1885); + let mut options = MqttOptions::new("dummy", "127.0.0.1:1885").unwrap(); options.set_keep_alive(Duration::from_millis(10)); } #[test] fn test_zero_keep_alive_values() { - let mut options = MqttOptions::new("dummy", "127.0.0.1", 1885); + let mut options = MqttOptions::new("dummy", "127.0.0.1:1885").unwrap(); options.set_keep_alive(Duration::ZERO); } #[test] fn test_valid_keep_alive_values() { - let mut options = MqttOptions::new("dummy", "127.0.0.1", 1885); + let mut options = MqttOptions::new("dummy", "127.0.0.1:1885").unwrap(); options.set_keep_alive(Duration::from_secs(1)); } @@ -116,7 +116,7 @@ fn test_valid_keep_alive_values() { async fn idle_connection_triggers_pings_on_time() { let keep_alive = 1; - let mut options = MqttOptions::new("dummy", "127.0.0.1", 1885); + let mut options = MqttOptions::new("dummy", "127.0.0.1:1885").unwrap(); options.set_keep_alive(Duration::from_secs(keep_alive)); // Create client eventloop and poll @@ -151,7 +151,7 @@ async fn idle_connection_triggers_pings_on_time() { #[tokio::test] async fn some_outgoing_and_no_incoming_should_trigger_pings_on_time() { let keep_alive = 5; - let mut options = MqttOptions::new("dummy", "127.0.0.1", 1886); + let mut options = MqttOptions::new("dummy", "127.0.0.1:1886").unwrap(); options.set_keep_alive(Duration::from_secs(keep_alive)); @@ -195,7 +195,7 @@ async fn some_outgoing_and_no_incoming_should_trigger_pings_on_time() { #[tokio::test] async fn some_incoming_and_no_outgoing_should_trigger_pings_on_time() { let keep_alive = 5; - let mut options = MqttOptions::new("dummy", "127.0.0.1", 2000); + let mut options = MqttOptions::new("dummy", "127.0.0.1:2000").unwrap(); options.set_keep_alive(Duration::from_secs(keep_alive)); @@ -233,7 +233,7 @@ async fn some_incoming_and_no_outgoing_should_trigger_pings_on_time() { #[tokio::test] async fn detects_halfopen_connections_in_the_second_ping_request() { - let mut options = MqttOptions::new("dummy", "127.0.0.1", 2001); + let mut options = MqttOptions::new("dummy", "127.0.0.1:2001").unwrap(); options.set_keep_alive(Duration::from_secs(5)); // A broker which consumes packets but doesn't reply @@ -263,7 +263,7 @@ async fn detects_halfopen_connections_in_the_second_ping_request() { #[tokio::test] async fn requests_are_blocked_after_max_inflight_queue_size() { - let mut options = MqttOptions::new("dummy", "127.0.0.1", 1887); + let mut options = MqttOptions::new("dummy", "127.0.0.1:1887").unwrap(); options.set_inflight(5); let inflight = options.inflight(); @@ -291,7 +291,7 @@ async fn requests_are_blocked_after_max_inflight_queue_size() { #[tokio::test] async fn requests_are_recovered_after_inflight_queue_size_falls_below_max() { - let mut options = MqttOptions::new("dummy", "127.0.0.1", 1888); + let mut options = MqttOptions::new("dummy", "127.0.0.1:1888").unwrap(); options.set_inflight(3); let (client, mut eventloop) = AsyncClient::new(options, 5); @@ -330,7 +330,7 @@ async fn requests_are_recovered_after_inflight_queue_size_falls_below_max() { #[ignore] #[tokio::test] async fn packet_id_collisions_are_detected_and_flow_control_is_applied() { - let mut options = MqttOptions::new("dummy", "127.0.0.1", 1891); + let mut options = MqttOptions::new("dummy", "127.0.0.1:1891").unwrap(); options.set_inflight(10); let (client, mut eventloop) = AsyncClient::new(options, 5); @@ -446,7 +446,7 @@ async fn packet_id_collisions_are_detected_and_flow_control_is_applied() { // #[tokio::test] async fn next_poll_after_connect_failure_reconnects() { - let options = MqttOptions::new("dummy", "127.0.0.1", 3000); + let options = MqttOptions::new("dummy", "127.0.0.1:3000").unwrap(); task::spawn(async move { let _broker = Broker::new(3000, 1).await; @@ -473,7 +473,7 @@ async fn next_poll_after_connect_failure_reconnects() { #[tokio::test] async fn reconnection_resumes_from_the_previous_state() { - let mut options = MqttOptions::new("dummy", "127.0.0.1", 3001); + let mut options = MqttOptions::new("dummy", "127.0.0.1:3001").unwrap(); options.set_keep_alive(Duration::from_secs(5)); // start sending qos0 publishes. Makes sure that there is out activity but no in activity @@ -513,7 +513,7 @@ async fn reconnection_resumes_from_the_previous_state() { #[tokio::test] async fn reconnection_resends_unacked_packets_from_the_previous_connection_first() { - let mut options = MqttOptions::new("dummy", "127.0.0.1", 3002); + let mut options = MqttOptions::new("dummy", "127.0.0.1:3002").unwrap(); options.set_keep_alive(Duration::from_secs(5)); // start sending qos0 publishes. this makes sure that there is @@ -546,7 +546,7 @@ async fn reconnection_resends_unacked_packets_from_the_previous_connection_first #[tokio::test] async fn state_is_being_cleaned_properly_and_pending_request_calculated_properly() { - let mut options = MqttOptions::new("dummy", "127.0.0.1", 3004); + let mut options = MqttOptions::new("dummy", "127.0.0.1:3004").unwrap(); options.set_keep_alive(Duration::from_secs(5)); let mut network_options = NetworkOptions::new(); network_options.set_tcp_send_buffer_size(1024);