diff --git a/Cargo.toml b/Cargo.toml index 0fddc70e8..a26f3d28f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -55,6 +55,7 @@ serde_json = "1.0.64" serde_nanos = "0.1.1" serde_repr = "0.1.7" memchr = "2.4.0" +thiserror = "1.0" url = "2.2.2" time = { version = "0.3.6", features = ["parsing", "formatting", "serde", "serde-well-known"] } @@ -73,12 +74,17 @@ nats_test_server = { path = "nats_test_server" } quicli = "0.4.0" smol = "1.2.5" structopt = "0.3.21" +test-case = "1.2" nats_016 = { package = "nats", version = "0.16.0" } [[bench]] name = "nats_bench" harness = false +[[bench]] +name = "subject_bench" +harness = false + [[example]] name = "nats-box" path = "examples/nats-box/main.rs" diff --git a/benches/subject_bench.rs b/benches/subject_bench.rs new file mode 100644 index 000000000..3cbda0b15 --- /dev/null +++ b/benches/subject_bench.rs @@ -0,0 +1,25 @@ +use criterion::{criterion_group, criterion_main, Criterion}; +use nats::Subject; + +const WIRETAP: &str = ">"; +const MULTIPLE_SINGLE_WILDCARDS: &str = "fAN.*.sdb.*"; +const SHORT_SUBJECT: &str = "$SYS.REQ.SERVER.PING"; +const SUBJECT_MULTI_WILDCARD: &str = + "id.00112233445566778899aabbccddeeff.token1.token2.token3.token4.>"; +const LONG_SUBJECT: &str = "id.00112233445566778899aabbccddeeff.info.token1.*.tokenabzdd.!§$toke.*.kjadslfha.iiiiiäöü.--__--.*.%%&jjkkll.>"; + +fn validate(c: &mut Criterion) { + let mut group = c.benchmark_group("validate"); + group.bench_function("wiretap", |b| b.iter(|| Subject::new(WIRETAP).is_ok())); + group.bench_function("short mixed", |b| { + b.iter(|| Subject::new(MULTIPLE_SINGLE_WILDCARDS).is_ok()) + }); + group.bench_function("short", |b| b.iter(|| Subject::new(SHORT_SUBJECT).is_ok())); + group.bench_function("mw", |b| { + b.iter(|| Subject::new(SUBJECT_MULTI_WILDCARD).is_ok()) + }); + group.bench_function("long", |b| b.iter(|| Subject::new(LONG_SUBJECT).is_ok())); +} + +criterion_group!(benches, validate); +criterion_main!(benches); diff --git a/src/asynk.rs b/src/asynk.rs index 7f2d8d00d..0951f6600 100644 --- a/src/asynk.rs +++ b/src/asynk.rs @@ -111,8 +111,7 @@ use std::time::Duration; use blocking::unblock; use crossbeam_channel::{Receiver, Sender}; -use crate::header::HeaderMap; -use crate::IntoServerList; +use crate::{header::HeaderMap, AsSubject, IntoServerList, Subject, SubjectBuf}; /// Connect to a NATS server at the given url. /// @@ -141,39 +140,46 @@ impl Connection { } /// Publishes a message. - pub async fn publish(&self, subject: &str, msg: impl AsRef<[u8]>) -> io::Result<()> { - self.publish_with_reply_or_headers(subject, None, None, msg) + pub async fn publish(&self, subject: impl AsSubject, msg: impl AsRef<[u8]>) -> io::Result<()> { + self.publish_with_reply_or_headers(subject.as_subject()?, None, None, msg) .await } /// Publishes a message with a reply subject. pub async fn publish_request( &self, - subject: &str, - reply: &str, + subject: impl AsSubject, + reply: impl AsSubject, msg: impl AsRef<[u8]>, ) -> io::Result<()> { - if let Some(res) = - self.inner - .try_publish_with_reply_or_headers(subject, Some(reply), None, &msg) - { - return res; + let subject = subject.as_subject()?.to_owned(); + let reply = reply.as_subject()?.to_owned(); + + let res = self + .inner + .try_publish_with_reply_or_headers(&subject, Some(&reply), None, &msg); + match res { + Err(e) if e.kind() == io::ErrorKind::WouldBlock => { + let msg = msg.as_ref().to_vec(); + let inner = self.inner.clone(); + unblock(move || inner.publish_request(&subject, &reply, msg)).await + } + _ => Ok(()), } - let subject = subject.to_string(); - let reply = reply.to_string(); - let msg = msg.as_ref().to_vec(); - let inner = self.inner.clone(); - unblock(move || inner.publish_request(&subject, &reply, msg)).await } /// Creates a new unique subject for receiving replies. - pub fn new_inbox(&self) -> String { + pub fn new_inbox(&self) -> SubjectBuf { self.inner.new_inbox() } /// Publishes a message and waits for the response. - pub async fn request(&self, subject: &str, msg: impl AsRef<[u8]>) -> io::Result { - let subject = subject.to_string(); + pub async fn request( + &self, + subject: impl AsSubject, + msg: impl AsRef<[u8]>, + ) -> io::Result { + let subject = subject.as_subject()?.to_owned(); let msg = msg.as_ref().to_vec(); let inner = self.inner.clone(); let msg = unblock(move || inner.request(&subject, msg)).await?; @@ -184,11 +190,11 @@ impl Connection { /// timeout duration is reached pub async fn request_timeout( &self, - subject: &str, + subject: impl AsSubject, msg: impl AsRef<[u8]>, timeout: Duration, ) -> io::Result { - let subject = subject.to_string(); + let subject = subject.as_subject()?.to_owned(); let msg = msg.as_ref().to_vec(); let inner = self.inner.clone(); let msg = unblock(move || inner.request_timeout(&subject, msg, timeout)).await?; @@ -199,10 +205,10 @@ impl Connection { /// response. pub async fn request_multi( &self, - subject: &str, + subject: impl AsSubject, msg: impl AsRef<[u8]>, ) -> io::Result { - let subject = subject.to_string(); + let subject = subject.as_subject()?.to_owned(); let msg = msg.as_ref().to_vec(); let inner = self.inner.clone(); let sub = unblock(move || inner.request_multi(&subject, msg)).await?; @@ -215,8 +221,8 @@ impl Connection { } /// Creates a subscription. - pub async fn subscribe(&self, subject: &str) -> io::Result { - let subject = subject.to_string(); + pub async fn subscribe(&self, subject: impl AsSubject) -> io::Result { + let subject = subject.as_subject()?.to_owned(); let inner = self.inner.clone(); let inner = unblock(move || inner.subscribe(&subject)).await?; let (_closer_tx, closer_rx) = crossbeam_channel::bounded(0); @@ -228,9 +234,13 @@ impl Connection { } /// Creates a queue subscription. - pub async fn queue_subscribe(&self, subject: &str, queue: &str) -> io::Result { - let subject = subject.to_string(); - let queue = queue.to_string(); + pub async fn queue_subscribe( + &self, + subject: impl AsSubject, + queue: impl AsSubject, + ) -> io::Result { + let subject = subject.as_subject()?.to_owned(); + let queue = queue.as_subject()?.to_owned(); let inner = self.inner.clone(); let inner = unblock(move || inner.queue_subscribe(&subject, &queue)).await?; let (_closer_tx, closer_rx) = crossbeam_channel::bounded(0); @@ -288,28 +298,39 @@ impl Connection { } /// Publish a message which may have a reply subject or headers set. - pub async fn publish_with_reply_or_headers( + pub async fn publish_with_reply_or_headers<'s>( &self, - subject: &str, - reply: Option<&str>, - headers: Option<&HeaderMap>, + subject: &Subject, + reply: Option<&Subject>, + headers: Option<&'s HeaderMap>, msg: impl AsRef<[u8]>, ) -> io::Result<()> { - if let Some(res) = self - .inner - .try_publish_with_reply_or_headers(subject, reply, headers, &msg) - { - return res; - } - let subject = subject.to_string(); - let reply = reply.map(str::to_owned); + let subject = subject.to_owned(); + let reply = reply.map(ToOwned::to_owned); let headers = headers.map(HeaderMap::clone); - let msg = msg.as_ref().to_vec(); - let inner = self.inner.clone(); - unblock(move || { - inner.publish_with_reply_or_headers(&subject, reply.as_deref(), headers.as_ref(), msg) - }) - .await + + let res = self.inner.try_publish_with_reply_or_headers( + &subject, + reply.as_deref(), + headers.clone().as_ref(), + &msg, + ); + match res { + Err(e) if e.kind() == io::ErrorKind::WouldBlock => { + let msg = msg.as_ref().to_vec(); + let inner = self.inner.clone(); + unblock(move || { + inner.publish_with_reply_or_headers( + &subject, + reply.as_deref(), + headers.as_ref(), + msg, + ) + }) + .await + } + _ => Ok(()), + } } } @@ -383,11 +404,11 @@ impl Subscription { #[derive(Clone)] pub struct Message { /// The subject this message came from. - pub subject: String, + pub subject: SubjectBuf, /// Optional reply subject that may be used for sending a response to this /// message. - pub reply: Option, + pub reply: Option, /// The message contents. pub data: Vec, @@ -424,17 +445,18 @@ impl Message { /// but cannot be used on it's own for associated methods as those require `Client` injected into `Message` /// and will error without it. pub fn new( - subject: &str, - reply: Option<&str>, - data: impl AsRef<[u8]>, + subject: SubjectBuf, + reply: Option, + data: Vec, headers: Option, ) -> Message { Message { - subject: subject.to_string(), - reply: reply.map(String::from), - data: data.as_ref().to_vec(), + subject, + reply, + data, headers, - ..Default::default() + client: None, + double_acked: Arc::new(AtomicBool::new(false)), } } @@ -449,26 +471,15 @@ impl Message { crate::message::MESSAGE_NOT_BOUND, ) })?; - if let Some(res) = client.try_publish(reply.as_str(), None, None, msg.as_ref()) { - return res; - } - // clone only if we have to move the data to the thread - let client = client.clone(); - let reply = reply.to_owned(); - let msg = msg.as_ref().to_vec(); - unblock(move || client.publish(&reply, None, None, msg.as_ref())).await - } -} - -impl Default for Message { - fn default() -> Message { - Message { - subject: String::from(""), - reply: None, - data: Vec::new(), - headers: None, - client: None, - double_acked: Arc::new(AtomicBool::new(false)), + let res = client.try_publish(reply, None, None, msg.as_ref()); + match res { + Err(e) if e.kind() == io::ErrorKind::WouldBlock => { + let client = client.clone(); + let reply = reply.to_owned(); + let msg = msg.as_ref().to_vec(); + unblock(move || client.publish(&reply, None, None, msg.as_ref())).await + } + _ => Ok(()), } } } diff --git a/src/client.rs b/src/client.rs index 227e9d2f2..4025cafa4 100644 --- a/src/client.rs +++ b/src/client.rs @@ -29,6 +29,7 @@ use crate::connector::{Connector, NatsStream, ServerAddress}; use crate::message::Message; use crate::proto::{self, ClientOp, ServerOp}; use crate::{header::HeaderMap, inject_delay, inject_io_failure, Options, ServerInfo}; +use crate::{Subject, SubjectBuf}; const BUF_CAPACITY: usize = 32 * 1024; @@ -88,8 +89,8 @@ pub(crate) type Preprocessor = Box bool + Send + Sync>; /// A registered subscription. pub(crate) struct Subscription { - subject: String, - queue_group: Option, + subject: SubjectBuf, + queue_group: Option, messages: channel::Sender, preprocess: Preprocessor, pub(crate) pending_messages_limit: Option, @@ -400,8 +401,8 @@ impl Client { /// Subscribes to a subject. pub(crate) fn subscribe( &self, - subject: &str, - queue_group: Option<&str>, + subject: &Subject, + queue_group: Option<&Subject>, ) -> io::Result<(u64, channel::Receiver)> { self.subscribe_with_preprocessor(subject, queue_group, Box::new(|_, _| false)) } @@ -409,8 +410,8 @@ impl Client { /// Subscribe to a subject with a message preprocessor. pub(crate) fn subscribe_with_preprocessor( &self, - subject: &str, - queue_group: Option<&str>, + subject: &Subject, + queue_group: Option<&Subject>, message_processor: Preprocessor, ) -> io::Result<(u64, channel::Receiver)> { inject_delay(); @@ -441,8 +442,8 @@ impl Client { read.subscriptions.insert( sid, Subscription { - subject: subject.to_string(), - queue_group: queue_group.map(ToString::to_string), + subject: subject.to_owned(), + queue_group: queue_group.map(|sub| sub.to_owned()), messages: sender, preprocess: message_processor, pending_messages_limit: None, @@ -465,7 +466,7 @@ impl Client { /// Resubscribes an existing subscription by unsubscribing from the old subject and subscribing /// to the new subject returning a new sid while retaining the existing channel receiver. - pub(crate) fn resubscribe(&self, old_sid: u64, new_subject: &str) -> io::Result { + pub(crate) fn resubscribe(&self, old_sid: u64, new_subject: &Subject) -> io::Result { // Inject random delays when testing. inject_delay(); @@ -553,14 +554,25 @@ impl Client { /// Publishes a message with optional reply subject and headers. pub fn publish( &self, - subject: &str, - reply_to: Option<&str>, + subject: &Subject, + reply_to: Option<&Subject>, headers: Option<&HeaderMap>, msg: &[u8], ) -> io::Result<()> { // Inject random delays when testing. inject_delay(); + if subject.contains_wildcards() + || reply_to + .as_ref() + .map_or(false, |reply| reply.contains_wildcards()) + { + return Err(Error::new( + ErrorKind::InvalidInput, + "when publishing subjects must not contain any wildcards", + )); + } + let server_info = self.server_info.lock(); if headers.is_some() && !server_info.headers { return Err(Error::new( @@ -625,22 +637,25 @@ impl Client { /// /// This only works when the write buffer has enough space to encode the /// whole message. + /// + /// If this operation would block an error with [`io::ErrorKind::WouldBlock`] is returned. pub fn try_publish( &self, - subject: &str, - reply_to: Option<&str>, + subject: &Subject, + reply_to: Option<&Subject>, headers: Option<&HeaderMap>, msg: &[u8], - ) -> Option> { + ) -> io::Result<()> { // Check if the client is closed. if let Err(e) = self.check_shutdown() { - return Some(Err(e)); + return Err(e); } // Estimate how many bytes the message will consume when written into // the stream. We must make a conservative guess: it's okay to // overestimate but not to underestimate. - let mut estimate = 1024 + subject.len() + reply_to.map_or(0, str::len) + msg.len(); + + let mut estimate = 1024 + subject.len() + reply_to.map_or(0, |r| r.len()) + msg.len(); if let Some(headers) = headers { estimate += headers .iter() @@ -663,19 +678,20 @@ impl Client { } }; - let mut write = self.state.write.try_lock()?; + let mut write = self.state.write.try_lock().ok_or_else(|| { + io::Error::new(io::ErrorKind::WouldBlock, "Can not acquire write lock") + })?; match write.writer.as_mut() { None => { // If reconnecting, write into the buffer. - let res = proto::encode(&mut write.buffer, op).and_then(|_| write.buffer.flush()); - Some(res) + proto::encode(&mut write.buffer, op).and_then(|_| write.buffer.flush()) } Some(mut writer) => { // Check if there's enough space in the buffer to encode the // whole message. if BUF_CAPACITY - writer.buffer().len() < estimate { - return None; + return Err(io::Error::new(io::ErrorKind::WouldBlock, "Buffer to small")); } // If connected, write into the writer. This is not going to @@ -691,7 +707,7 @@ impl Client { let mut read = self.state.read.lock(); read.pongs.clear(); } - Some(res) + res } } } @@ -769,7 +785,7 @@ impl Client { proto::encode( &mut writer, ClientOp::Sub { - subject: subscription.subject.as_str(), + subject: &subscription.subject, queue_group: subscription.queue_group.as_deref(), sid: *sid, }, diff --git a/src/connector.rs b/src/connector.rs index 5ddd7452b..dc0a5b8b0 100644 --- a/src/connector.rs +++ b/src/connector.rs @@ -510,6 +510,7 @@ fn tls_wait(mut tls: MutexGuard<'_, TlsStream>) -> io::Result<()> { // Initialize a pollfd object with readiness events we're looking for. #[allow(trivial_numeric_casts)] + #[allow(clippy::cast_possible_truncation)] let mut pollfd = pollfd { #[cfg(unix)] fd: tcp.as_raw_fd() as _, diff --git a/src/jetstream/api.rs b/src/jetstream/api.rs new file mode 100644 index 000000000..53ced9092 --- /dev/null +++ b/src/jetstream/api.rs @@ -0,0 +1,91 @@ +//! Subjects representing the `JetStream` API. + +use std::io; + +use crate::SubjectBuf; + +/// Subject for requests to create a new stream. +pub fn account_info(prefix: &str) -> SubjectBuf { + SubjectBuf::new_unchecked(format!("{}INFO", prefix)) +} + +/// Subject for requests to create a new stream. +pub fn create_stream(prefix: &str, name: &str) -> io::Result { + SubjectBuf::new(format!("{}STREAM.CREATE.{}", prefix, name)).map_err(Into::into) +} + +/// Subject for requests to update an existing stream. +pub fn update_stream(prefix: &str, name: &str) -> io::Result { + SubjectBuf::new(format!("{}STREAM.UPDATE.{}", prefix, name)).map_err(Into::into) +} + +/// Subject for requests to purge an existing stream. +pub fn purge_stream(prefix: &str, name: &str) -> io::Result { + SubjectBuf::new(format!("{}STREAM.PURGE.{}", prefix, name)).map_err(Into::into) +} + +/// Subject for requests to delete an existing stream. +pub fn delete_stream(prefix: &str, name: &str) -> io::Result { + SubjectBuf::new(format!("{}STREAM.DELETE.{}", prefix, name)).map_err(Into::into) +} + +/// Subject to requests the names of existing streams. +pub fn stream_names(prefix: &str) -> SubjectBuf { + SubjectBuf::new_unchecked(format!("{}STREAM.NAMES", prefix)) +} + +/// Subject to requests the list of existing streams. +/// +/// This provides more information than [`stream_names()`]. +pub fn stream_list(prefix: &str) -> SubjectBuf { + SubjectBuf::new_unchecked(format!("{}STREAM.LIST", prefix)) +} + +/// Subject to requests detailed information about a streams. +pub fn stream_info(prefix: &str, stream: &str) -> SubjectBuf { + SubjectBuf::new_unchecked(format!("{}STREAM.INFO.{}", prefix, stream)) +} + +/// Subject to requests a specific message from a stream. +pub fn stream_get_message(prefix: &str, stream: &str) -> SubjectBuf { + SubjectBuf::new_unchecked(format!("{}STREAM.MSG.GET.{}", prefix, stream)) +} + +/// Subject to requests a specific message from a stream. +pub fn stream_delete_message(prefix: &str, stream: &str) -> SubjectBuf { + SubjectBuf::new_unchecked(format!("{}STREAM.MSG.DELETE.{}", prefix, stream)) +} + +/// Subject to requests to create a new consumer for a stream. +pub fn create_consumer(prefix: &str, stream: &str) -> io::Result { + SubjectBuf::new(format!("{}CONSUMER.CREATE.{}", prefix, stream)).map_err(Into::into) +} + +/// Subject to requests to create a new durable consumer for a stream. +pub fn create_durable_consumer( + prefix: &str, + stream: &str, + consumer: &str, +) -> io::Result { + SubjectBuf::new(format!( + "{}CONSUMER.DURABLE.CREATE.{}.{}", + prefix, stream, consumer + )) + .map_err(Into::into) +} + +/// Subject to requests to delete a consumer. +pub fn delete_consumer(prefix: &str, stream: &str, consumer: &str) -> io::Result { + SubjectBuf::new(format!("{}CONSUMER.DELETE.{}.{}", prefix, stream, consumer)) + .map_err(Into::into) +} + +/// Subject to requests the list of consumers of a streams. +pub fn consumer_list(prefix: &str, stream: &str) -> SubjectBuf { + SubjectBuf::new_unchecked(format!("{}CONSUMER.LIST.{}", prefix, stream)) +} + +/// Subject to requests the information about a consumer. +pub fn consumer_info(prefix: &str, stream: &str, consumer: &str) -> SubjectBuf { + SubjectBuf::new_unchecked(format!("{}CONSUMER.INFO.{}.{}", prefix, stream, consumer)) +} diff --git a/src/jetstream/mod.rs b/src/jetstream/mod.rs index 0a9e6a9ad..d8396f3f4 100644 --- a/src/jetstream/mod.rs +++ b/src/jetstream/mod.rs @@ -71,7 +71,7 @@ //! //! js.add_stream("my_stream")?; //! js.add_consumer("my_stream", ConsumerConfig { -//! deliver_subject: Some("my_deliver_subject".to_string()), +//! deliver_subject: Some("my_deliver_subject".parse()?), //! durable_name: Some("my_durable_consumer".to_string()), //! ..Default::default() //! })?; @@ -113,6 +113,8 @@ use serde_repr::{Deserialize_repr, Serialize_repr}; const ORDERED_IDLE_HEARTBEAT: Duration = Duration::from_nanos(5_000_000_000); +mod api; + /// Pull subscriptions pub mod pull_subscription; @@ -133,7 +135,7 @@ pub type PullSubscibeOptions = PullSubscribeOptions; use crate::{ header::{self, HeaderMap}, - Connection, Message, + AsSubject, Connection, Message, Subject, SubjectBuf, }; /// `JetStream` options @@ -448,7 +450,7 @@ pub struct Error { } impl Error { - /// Returns the status code assosciated with this error + /// Returns the status code associated with this error pub fn code(&self) -> usize { self.code } @@ -495,7 +497,7 @@ struct PagedResponse { #[derive(Debug)] pub struct PagedIterator<'a, T> { manager: &'a JetStream, - subject: String, + subject: SubjectBuf, offset: i64, items: VecDeque, done: bool, @@ -567,18 +569,24 @@ impl JetStream { } /// Publishes a message to `JetStream` - pub fn publish(&self, subject: &str, data: impl AsRef<[u8]>) -> io::Result { - self.publish_with_options_or_headers(subject, None, None, data) + pub fn publish(&self, subject: &Sub, data: impl AsRef<[u8]>) -> io::Result + where + Sub: AsSubject + ?Sized, + { + self.publish_with_options_or_headers(subject.as_subject()?, None, None, data) } /// Publishes a message to `JetStream` with the given options. - pub fn publish_with_options( + pub fn publish_with_options( &self, - subject: &str, + subject: &Sub, data: impl AsRef<[u8]>, options: &PublishOptions, - ) -> io::Result { - self.publish_with_options_or_headers(subject, Some(options), None, data) + ) -> io::Result + where + Sub: AsSubject + ?Sized, + { + self.publish_with_options_or_headers(subject.as_subject()?, Some(options), None, data) } /// Publishes a `Message` to `JetStream`. @@ -606,13 +614,16 @@ impl JetStream { } /// Publishes a message to `JetStream` with the given options and/or headers. - pub(crate) fn publish_with_options_or_headers( + pub(crate) fn publish_with_options_or_headers( &self, - subject: &str, + subject: &Sub, maybe_options: Option<&PublishOptions>, maybe_headers: Option<&HeaderMap>, msg: impl AsRef<[u8]>, - ) -> io::Result { + ) -> io::Result + where + Sub: AsSubject + ?Sized, + { let maybe_headers = if let Some(options) = maybe_options { let mut headers = maybe_headers.map_or_else(HeaderMap::default, HeaderMap::clone); @@ -644,7 +655,7 @@ impl JetStream { let maybe_timeout = maybe_options.and_then(|options| options.timeout); let res_msg = self.connection.request_with_headers_or_timeout( - subject, + subject.as_subject()?, maybe_headers.as_ref(), maybe_timeout, msg, @@ -669,7 +680,6 @@ impl JetStream { /// # Example /// /// ``` - /// # fn main() -> std::io::Result<()> { /// # let client = nats::connect("demo.nats.io")?; /// # let context = nats::jetstream::new(client); /// # context.add_stream("ephemeral"); @@ -677,11 +687,13 @@ impl JetStream { /// # /// let subscription = context.subscribe("ephemeral")?; /// println!("Received message {:?}", subscription.next()); - /// # Ok(()) - /// # } + /// # Ok::<(), std::io::Error>(()) /// ``` - pub fn subscribe(&self, subject: &str) -> io::Result { - self.do_push_subscribe(subject, None, None) + pub fn subscribe(&self, subject: &Sub) -> io::Result + where + Sub: AsSubject + ?Sized, + { + self.do_push_subscribe(subject.as_subject()?, None, None) } /// Creates a pull subscription. @@ -689,7 +701,6 @@ impl JetStream { /// # Example /// ``` /// # use nats::jetstream::BatchOptions; - /// # fn main() -> std::io::Result<()> { /// # let client = nats::connect("demo.nats.io")?; /// # let context = nats::jetstream::new(client); /// # @@ -703,25 +714,30 @@ impl JetStream { /// println!("received message: {:?}", message); /// Ok(()) /// })?; - /// # Ok(()) - /// # } + /// # Ok::<(), std::io::Error>(()) /// ``` - pub fn pull_subscribe(&self, subject: &str) -> io::Result { - self.do_pull_subscribe(subject, None) + pub fn pull_subscribe(&self, subject: &Sub) -> io::Result + where + Sub: AsSubject + ?Sized, + { + self.do_pull_subscribe(subject.as_subject()?, None) } /// Creates a `PullSubscription` with options. - pub fn pull_subscribe_with_options( + pub fn pull_subscribe_with_options( &self, - subject: &str, + subject: &Sub, options: &PullSubscribeOptions, - ) -> io::Result { - self.do_pull_subscribe(subject, Some(options)) + ) -> io::Result + where + Sub: AsSubject + ?Sized, + { + self.do_pull_subscribe(subject.as_subject()?, Some(options)) } pub(crate) fn do_pull_subscribe( &self, - subject: &str, + subject: &Subject, maybe_options: Option<&PullSubscribeOptions>, ) -> io::Result { // Find the stream mapped to the subject if not bound to a stream already. @@ -739,11 +755,14 @@ impl JetStream { // check mismatches between user config and info // Make sure this new subject matches or is a subset. - if !info.config.filter_subject.is_empty() && subject != info.config.filter_subject { - return Err(io::Error::new( - io::ErrorKind::Other, - "subjects do not match", - )); + match info.config.filter_subject { + Some(filter_subject) if !filter_subject.matches(subject) => { + return Err(io::Error::new( + io::ErrorKind::Other, + "subjects do not match", + )); + } + _ => {} } Ok(info) @@ -771,7 +790,7 @@ impl JetStream { deliver_policy: DeliverPolicy::All, ack_policy: AckPolicy::Explicit, // Do filtering always, server will clear as needed. - filter_subject: subject.to_string(), + filter_subject: Some(subject.to_owned()), replay_policy: ReplayPolicy::Instant, ..Default::default() }) @@ -786,7 +805,7 @@ impl JetStream { let consumer_info = process_consumer_info(consumer_info)?; let inbox = self.connection.new_inbox(); - let (pid, messages) = self.connection.0.client.subscribe(inbox.as_str(), None)?; + let (pid, messages) = self.connection.0.client.subscribe(&inbox, None)?; Ok(PullSubscription::new( pid, @@ -806,20 +825,24 @@ impl JetStream { /// # Example /// /// ```no_run - /// # use nats::jetstream::{ SubscribeOptions }; - /// # fn main() -> std::io::Result<()> { + /// # use nats::{jetstream::{ SubscribeOptions }}; /// # let nc = nats::connect("demo.nats.io")?; /// # let js = nats::jetstream::new(nc); - /// let sub = js.subscribe_with_options("foo", &SubscribeOptions::bind("existing_stream".to_string(), "existing_consumer".to_string()))?; - /// # Ok(()) - /// # } + /// let sub = js.subscribe_with_options( + /// "foo", + /// &SubscribeOptions::bind("existing_stream".to_string(), "existing_consumer".to_string()) + /// )?; + /// # Ok::<(), std::io::Error>(()) /// ``` - pub fn subscribe_with_options( + pub fn subscribe_with_options( &self, - subject: &str, + subject: &Sub, options: &SubscribeOptions, - ) -> io::Result { - self.do_push_subscribe(subject, None, Some(options)) + ) -> io::Result + where + Sub: AsSubject + ?Sized, + { + self.do_push_subscribe(subject.as_subject()?, None, Some(options)) } /// Creates a push-based consumer subscription with a queue group. @@ -828,16 +851,22 @@ impl JetStream { /// # Example /// /// ``` - /// # fn main() -> std::io::Result<()> { /// # let client = nats::connect("demo.nats.io")?; /// # let context = nats::jetstream::new(client); /// # context.add_stream("queue"); /// let subscription = context.queue_subscribe("queue", "queue_group")?; - /// # Ok(()) - /// # } + /// # Ok::<(), std::io::Error>(()) /// ``` - pub fn queue_subscribe(&self, subject: &str, queue: &str) -> io::Result { - self.do_push_subscribe(subject, Some(queue), None) + pub fn queue_subscribe( + &self, + subject: &Sub, + queue: &Que, + ) -> io::Result + where + Sub: AsSubject + ?Sized, + Que: AsSubject + ?Sized, + { + self.do_push_subscribe(subject.as_subject()?, Some(queue.as_subject()?), None) } /// Creates a push-based consumer subscription with a queue group and options. @@ -845,22 +874,32 @@ impl JetStream { /// If a durable name is not set within the options provided options then the queue group will /// be used as the durable name. /// - pub fn queue_subscribe_with_options( + pub fn queue_subscribe_with_options( &self, - subject: &str, - queue: &str, + subject: &Sub, + queue: &Que, options: &SubscribeOptions, - ) -> io::Result { - self.do_push_subscribe(subject, Some(queue), Some(options)) + ) -> io::Result + where + Sub: AsSubject + ?Sized, + Que: AsSubject + ?Sized, + { + self.do_push_subscribe( + subject.as_subject()?, + Some(queue.as_subject()?), + Some(options), + ) } fn do_push_subscribe( &self, - subject: &str, - maybe_queue: Option<&str>, + subject: &Subject, + maybe_queue: Option<&Subject>, maybe_options: Option<&SubscribeOptions>, ) -> io::Result { // If no stream name is specified the subject cannot be empty. + // @MattesWhite Note: .is_empty() is from Deref but the constructor actually prevents this invariant + // If it is still valid to pass no subject we should cover this differently. if subject.is_empty() && maybe_options .map(|options| options.stream_name.as_ref()) @@ -949,16 +988,19 @@ impl JetStream { let process_consumer_info = |info: ConsumerInfo| { // Make sure this new subject matches or is a subset. - if !info.config.filter_subject.is_empty() && subject != info.config.filter_subject { - return Err(io::Error::new( - io::ErrorKind::Other, - "subject does not match consumer", - )); + match info.config.filter_subject.as_ref() { + Some(filter) if !subject.matches(filter) => { + return Err(io::Error::new( + io::ErrorKind::Other, + "subject does not match consumer", + )) + } + _ => {} } if let Some(deliver_group) = info.config.deliver_group.as_ref() { if let Some(queue) = maybe_queue { - if deliver_group != queue { + if !deliver_group.matches(queue) { return Err(io::Error::new( io::ErrorKind::Other, format!( @@ -1142,7 +1184,7 @@ impl JetStream { // Figure out if we have a consumer name let maybe_durable_name = maybe_options .and_then(|options| options.durable_name.as_deref()) - .or(maybe_queue); + .or_else(|| maybe_queue.map(AsRef::as_ref)); let maybe_consumer_name = maybe_options .and_then(|options| options.consumer_name.as_deref()) @@ -1210,12 +1252,12 @@ impl JetStream { }; // Do filtering always, server will clear as needed. - config.filter_subject = subject.to_string(); + config.filter_subject = Some(subject.to_owned()); // Pass the queue to the consumer config if let Some(queue) = maybe_queue { if config.durable_name.is_none() { - config.durable_name = Some(queue.to_owned()); + config.durable_name = Some(queue.as_str().to_owned()); } config.deliver_group = Some(queue.to_owned()); @@ -1317,12 +1359,10 @@ impl JetStream { .and_then(|headers| headers.get(header::NATS_CONSUMER_STALLED)); if let Some(consumer_stalled) = maybe_consumer_stalled { - context.connection.try_publish_with_reply_or_headers( - consumer_stalled, - None, - None, - b"", - ); + context + .connection + .try_publish_with_reply_or_headers(consumer_stalled, None, None, b"") + .ok(); } // if it is not an ordered consumer, don't handle sequence mismatch. @@ -1465,7 +1505,7 @@ impl JetStream { "the stream name must not be empty", )); } - let subject: String = format!("{}STREAM.CREATE.{}", self.api_prefix(), config.name); + let subject = api::create_stream(self.api_prefix(), &config.name)?; let req = serde_json::ser::to_vec(&config)?; self.js_request(&subject, &req) } @@ -1478,7 +1518,7 @@ impl JetStream { "the stream name must not be empty", )); } - let subject: String = format!("{}STREAM.UPDATE.{}", self.api_prefix(), config.name); + let subject = api::update_stream(self.api_prefix(), &config.name)?; let req = serde_json::ser::to_vec(&config)?; self.js_request(&subject, &req) } @@ -1487,7 +1527,7 @@ impl JetStream { /// use the `list_streams` method instead. pub fn stream_names(&self) -> PagedIterator<'_, String> { PagedIterator { - subject: format!("{}STREAM.NAMES", self.api_prefix()), + subject: api::stream_names(self.api_prefix()), manager: self, offset: 0, items: Default::default(), @@ -1499,7 +1539,8 @@ impl JetStream { let req = serde_json::ser::to_vec(&StreamNamesRequest { subject: subject.to_string(), })?; - let request_subject = format!("{}STREAM.NAMES", self.api_prefix()); + + let request_subject = api::stream_names(self.api_prefix()); self.js_request::(&request_subject, &req) .map(|resp| resp.streams)? .map_or_else( @@ -1516,7 +1557,7 @@ impl JetStream { /// List all `JetStream` streams. pub fn list_streams(&self) -> PagedIterator<'_, StreamInfo> { PagedIterator { - subject: format!("{}STREAM.LIST", self.api_prefix()), + subject: api::stream_list(self.api_prefix()), manager: self, offset: 0, items: Default::default(), @@ -1536,7 +1577,7 @@ impl JetStream { "the stream name must not be empty", )); } - let subject: String = format!("{}CONSUMER.LIST.{}", self.api_prefix(), stream); + let subject = api::consumer_list(self.api_prefix(), stream); Ok(PagedIterator { subject, @@ -1556,7 +1597,7 @@ impl JetStream { "the stream name must not be empty", )); } - let subject: String = format!("{}STREAM.INFO.{}", self.api_prefix(), stream); + let subject = api::stream_info(self.api_prefix(), stream); self.js_request(&subject, b"") } @@ -1569,7 +1610,7 @@ impl JetStream { "the stream name must not be empty", )); } - let subject = format!("{}STREAM.PURGE.{}", self.api_prefix(), stream); + let subject = api::purge_stream(self.api_prefix(), stream)?; self.js_request(&subject, b"") } @@ -1577,7 +1618,7 @@ impl JetStream { pub fn purge_stream_subject>( &self, stream: S, - filter_subject: &str, + filter_subject: &Subject, ) -> io::Result { let stream: &str = stream.as_ref(); if stream.is_empty() { @@ -1587,9 +1628,9 @@ impl JetStream { )); } - let subject = format!("{}STREAM.PURGE.{}", self.api_prefix(), stream); + let subject = api::purge_stream(self.api_prefix(), stream)?; let request = serde_json::to_vec(&PurgeRequest { - filter: Some(filter_subject.to_string()), + filter: Some(filter_subject.to_owned()), ..Default::default() })?; @@ -1606,7 +1647,7 @@ impl JetStream { )); } - let subject = format!("{}STREAM.MSG.GET.{}", self.api_prefix(), stream); + let subject = api::stream_get_message(self.api_prefix(), stream); let request = serde_json::ser::to_vec(&StreamMessageGetRequest { seq: Some(seq), last_by_subject: None, @@ -1635,7 +1676,7 @@ impl JetStream { )); } - let subject = format!("{}STREAM.MSG.GET.{}", self.api_prefix(), stream_name); + let subject = api::stream_get_message(self.api_prefix(), stream_name); let request = serde_json::ser::to_vec(&StreamMessageGetRequest { seq: None, last_by_subject: Some(stream_subject.to_string()), @@ -1669,7 +1710,7 @@ impl JetStream { }) .unwrap(); - let subject = format!("{}STREAM.MSG.DELETE.{}", self.api_prefix(), stream); + let subject = api::stream_delete_message(self.api_prefix(), stream); self.js_request::(&subject, &req) .map(|dr| dr.success) @@ -1685,7 +1726,7 @@ impl JetStream { )); } - let subject = format!("{}STREAM.DELETE.{}", self.api_prefix(), stream); + let subject = api::delete_stream(self.api_prefix(), stream)?; self.js_request::(&subject, b"") .map(|dr| dr.success) } @@ -1705,16 +1746,11 @@ impl JetStream { )); } - let subject = if let Some(ref durable_name) = config.durable_name { - format!( - "{}CONSUMER.DURABLE.CREATE.{}.{}", - self.api_prefix(), - stream, - durable_name - ) + let subject = if let Some(consumer) = config.durable_name.as_ref() { + api::create_durable_consumer(self.api_prefix(), stream, consumer) } else { - format!("{}CONSUMER.CREATE.{}", self.api_prefix(), stream) - }; + api::create_consumer(self.api_prefix(), stream) + }?; let req = CreateConsumerRequest { stream_name: stream.into(), @@ -1746,12 +1782,7 @@ impl JetStream { )); } - let subject = format!( - "{}CONSUMER.DELETE.{}.{}", - self.api_prefix(), - stream, - consumer - ); + let subject = api::delete_consumer(self.api_prefix(), stream, consumer)?; self.js_request::(&subject, b"") .map(|dr| dr.success) @@ -1771,16 +1802,16 @@ impl JetStream { )); } let consumer: &str = consumer.as_ref(); - let subject: String = format!("{}CONSUMER.INFO.{}.{}", self.api_prefix(), stream, consumer); + let subject = api::consumer_info(self.api_prefix(), stream, consumer); self.js_request(&subject, b"") } /// Query `JetStream` account information. pub fn account_info(&self) -> io::Result { - self.js_request(&format!("{}INFO", self.api_prefix()), b"") + self.js_request(&api::account_info(self.api_prefix()), b"") } - fn js_request(&self, subject: &str, req: &[u8]) -> io::Result + fn js_request(&self, subject: &Subject, req: &[u8]) -> io::Result where Res: DeserializeOwned, { diff --git a/src/jetstream/pull_subscription.rs b/src/jetstream/pull_subscription.rs index 8ff7192d3..b62cf1da0 100644 --- a/src/jetstream/pull_subscription.rs +++ b/src/jetstream/pull_subscription.rs @@ -16,7 +16,7 @@ use std::sync::Arc; use std::time::Duration; use crate::jetstream::{ConsumerInfo, ConsumerOwnership, JetStream}; -use crate::Message; +use crate::{Message, SubjectBuf}; use super::{AckPolicy, BatchOptions}; use crossbeam_channel as channel; @@ -29,7 +29,7 @@ pub(crate) struct Inner { pub(crate) messages: channel::Receiver, /// sid of the inbox subscription - pub(crate) inbox: String, + pub(crate) inbox: SubjectBuf, /// Ack policy used in methods that automatically ack. pub(crate) consumer_ack_policy: AckPolicy, @@ -69,7 +69,7 @@ impl PullSubscription { pid: u64, consumer_info: ConsumerInfo, consumer_ownership: ConsumerOwnership, - inbox: String, + inbox: SubjectBuf, messages: channel::Receiver, context: JetStream, ) -> PullSubscription { @@ -380,7 +380,7 @@ impl PullSubscription { self.0.context.connection.publish_with_reply_or_headers( &subject, - Some(self.0.inbox.as_str()), + Some(&self.0.inbox), None, request, )?; diff --git a/src/jetstream/types.rs b/src/jetstream/types.rs index e362953e3..8856eb71b 100644 --- a/src/jetstream/types.rs +++ b/src/jetstream/types.rs @@ -13,7 +13,7 @@ use std::time::Duration; -use crate::header::HeaderMap; +use crate::{header::HeaderMap, SubjectBuf}; use serde::{Deserialize, Serialize}; use std::convert::TryFrom; use std::io::{self, ErrorKind}; @@ -36,13 +36,13 @@ pub(crate) struct StreamMessageGetRequest { pub struct RawStreamMessage { /// Subject of the message. #[serde(rename = "subject")] - pub subject: String, + pub subject: SubjectBuf, /// Sequence of the message. #[serde(rename = "seq")] pub sequence: u64, - /// Data of the mssage. + /// Data of the message. #[serde(default, rename = "data")] pub data: String, @@ -68,10 +68,10 @@ pub(crate) struct StreamMessageGetResponse { #[derive(Debug, Clone)] pub struct StreamMessage { /// Subject of the message. - pub subject: String, + pub subject: SubjectBuf, /// Sequence of the message pub sequence: u64, - /// HeaderMap that were sent with the mesage, if any. + /// HeaderMap that were sent with the message, if any. pub headers: Option, /// Payload of the message. pub data: Vec, @@ -172,7 +172,7 @@ pub struct ConsumerConfig { /// semantics in any system that is written to as a result of processing /// a message. #[serde(default, skip_serializing_if = "Option::is_none")] - pub deliver_subject: Option, + pub deliver_subject: Option, /// Setting `durable_name` to `Some(...)` will cause this consumer /// to be "durable". This may be a good choice for workloads that @@ -196,7 +196,7 @@ pub struct ConsumerConfig { pub description: Option, #[serde(default, skip_serializing_if = "Option::is_none")] /// Deliver group to use. - pub deliver_group: Option, + pub deliver_group: Option, /// Allows for a variety of options that determine how this consumer will receive messages pub deliver_policy: DeliverPolicy, /// Used in combination with `DeliverPolicy::ByStartSeq` to only select messages arriving @@ -216,8 +216,8 @@ pub struct ConsumerConfig { #[serde(default, skip_serializing_if = "is_default")] pub max_deliver: i64, /// When consuming from a Stream with many subjects, or wildcards, this selects only specific incoming subjects. Supports wildcards. - #[serde(default, skip_serializing_if = "is_default")] - pub filter_subject: String, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub filter_subject: Option, /// Whether messages are sent as quickly as possible or at the rate of receipt pub replay_policy: ReplayPolicy, /// The rate of message delivery in bits per second @@ -316,7 +316,7 @@ pub struct StreamConfig { /// Which NATS subjects to populate this stream with. Supports wildcards. Defaults to just the /// configured stream `name`. #[serde(default, skip_serializing_if = "Vec::is_empty")] - pub subjects: Vec, + pub subjects: Vec, /// How message retention is considered, `Limits` (default), `Interest` or `WorkQueue` pub retention: RetentionPolicy, /// How many Consumers can be defined for a given Stream, -1 for unlimited @@ -526,7 +526,7 @@ pub struct PurgeRequest { /// Subject to match against messages for the purge command. #[serde(default, rename = "filter", skip_serializing_if = "is_default")] - pub filter: Option, + pub filter: Option, /// Number of messages to keep. #[serde(default, rename = "filter", skip_serializing_if = "is_default")] @@ -805,7 +805,7 @@ pub struct SubscribeOptions { pub(crate) ack_wait: Option, pub(crate) replay_policy: Option, pub(crate) deliver_policy: Option, - pub(crate) deliver_subject: Option, + pub(crate) deliver_subject: Option, pub(crate) description: Option, pub(crate) durable_name: Option, pub(crate) sample_frequency: Option, @@ -967,7 +967,7 @@ impl SubscribeOptions { /// and a creation request is sent to the server. /// /// If not provided, an inbox will be selected. - pub fn deliver_subject(mut self, subject: String) -> Self { + pub fn deliver_subject(mut self, subject: SubjectBuf) -> Self { self.deliver_subject = Some(subject); self } @@ -1023,7 +1023,7 @@ pub struct AccountInfo { pub consumers: i64, /// Aggregated API statistics pub api: ApiStats, - /// Limits placed on the accuont + /// Limits placed on the account pub limits: AccountLimits, } diff --git a/src/kv.rs b/src/kv.rs index 305a32d50..b4b941b8b 100644 --- a/src/kv.rs +++ b/src/kv.rs @@ -23,6 +23,7 @@ use crate::jetstream::{ StreamConfig, StreamInfo, StreamMessage, SubscribeOptions, }; use crate::message::Message; +use crate::SubjectBuf; use lazy_static::lazy_static; use regex::Regex; @@ -227,7 +228,10 @@ impl JetStream { let stream_info = self.add_stream(&StreamConfig { name: format!("KV_{}", config.bucket), description: Some(config.description.to_string()), - subjects: vec![format!("$KV.{}.>", config.bucket)], + subjects: vec![SubjectBuf::new_unchecked(format!( + "$KV.{}.>", + config.bucket + ))], max_msgs_per_subject: history, max_bytes: config.max_bytes, max_age: config.max_age, @@ -451,11 +455,7 @@ impl Store { return Err(io::Error::new(io::ErrorKind::InvalidInput, "invalid key")); } - let mut subject = String::new(); - subject.push_str(&self.prefix); - subject.push_str(key); - - let publish_ack = self.context.publish(&subject, value)?; + let publish_ack = self.context.publish(&self.key_subject(key), value)?; Ok(publish_ack.sequence) } @@ -525,17 +525,18 @@ impl Store { return Err(io::Error::new(io::ErrorKind::InvalidInput, "invalid key")); } - let mut subject = String::new(); - subject.push_str(&self.prefix); - subject.push_str(key); - let mut headers = HeaderMap::default(); headers.insert( header::NATS_EXPECTED_LAST_SUBJECT_SEQUENCE, revision.to_string(), ); - let message = Message::new(&subject, None, value, Some(headers)); + let message = Message::new( + self.key_subject(key), + None, + value.as_ref().to_vec(), + Some(headers), + ); let publish_ack = self.context.publish_message(&message)?; Ok(publish_ack.sequence) @@ -569,14 +570,10 @@ impl Store { return Err(io::Error::new(io::ErrorKind::InvalidInput, "invalid key")); } - let mut subject = String::new(); - subject.push_str(&self.prefix); - subject.push_str(key); - let mut headers = HeaderMap::default(); headers.insert(KV_OPERATION, KV_OPERATION_DELETE.to_string()); - let message = Message::new(&subject, None, b"", Some(headers)); + let message = Message::new(self.key_subject(key), None, Vec::new(), Some(headers)); self.context.publish_message(&message)?; Ok(()) @@ -609,15 +606,11 @@ impl Store { return Err(io::Error::new(io::ErrorKind::InvalidInput, "invalid key")); } - let mut subject = String::new(); - subject.push_str(&self.prefix); - subject.push_str(key); - let mut headers = HeaderMap::default(); headers.insert(KV_OPERATION, KV_OPERATION_PURGE.to_string()); headers.insert(NATS_ROLLUP, ROLLUP_SUBJECT.to_string()); - let message = Message::new(&subject, None, b"", Some(headers)); + let message = Message::new(self.key_subject(key), None, Vec::new(), Some(headers)); self.context.publish_message(&message)?; Ok(()) @@ -651,12 +644,8 @@ impl Store { /// # } /// ``` pub fn keys(&self) -> io::Result { - let mut subject = String::new(); - subject.push_str(&self.prefix); - subject.push_str(ALL_KEYS); - let subscription = self.context.subscribe_with_options( - &subject, + &self.key_subject(ALL_KEYS), &SubscribeOptions::ordered() .headers_only() .deliver_last_per_subject(), @@ -704,12 +693,8 @@ impl Store { /// # } /// ``` pub fn history(&self, key: &str) -> io::Result { - let mut subject = String::new(); - subject.push_str(&self.prefix); - subject.push_str(key); - let subscription = self.context.subscribe_with_options( - &subject, + &self.key_subject(key), &SubscribeOptions::ordered() .deliver_all() .enable_flow_control() @@ -731,9 +716,8 @@ impl Store { /// Returns an iterator which iterates over each entry for specific key pattern as they happen. pub fn watch>(&self, key: T) -> io::Result { - let subject = format!("{}{}", self.prefix, key.as_ref()); let subscription = self.context.subscribe_with_options( - subject.as_str(), + &self.key_subject(key.as_ref()), &SubscribeOptions::ordered() .deliver_last_per_subject() .enable_flow_control() @@ -751,6 +735,14 @@ impl Store { pub fn bucket(&self) -> &String { &self.name } + + /// Subject from prefix and key. + fn key_subject(&self, key: &str) -> SubjectBuf { + let mut subject = String::with_capacity(self.prefix.len() + key.len()); + subject.push_str(&self.prefix); + subject.push_str(key); + SubjectBuf::new_unchecked(subject) + } } /// An iterator used to iterate through the keys of a bucket. diff --git a/src/lib.rs b/src/lib.rs index e0506805f..b79f369a7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -201,6 +201,7 @@ mod message; mod options; mod proto; mod secure_wipe; +mod subject; mod subscription; /// Header constants and types. @@ -257,6 +258,7 @@ pub use connector::{IntoServerList, ServerAddress}; pub use jetstream::JetStreamOptions; pub use message::Message; pub use options::Options; +pub use subject::{AsSubject, Error as SubjectError, Subject, SubjectBuf, Tokens}; pub use subscription::{Handler, Subscription}; /// A re-export of the `rustls` crate used in this crate, @@ -431,41 +433,63 @@ impl Connection { /// /// # Example /// ``` - /// # fn main() -> std::io::Result<()> { /// # let nc = nats::connect("demo.nats.io")?; /// let sub = nc.subscribe("foo")?; - /// # Ok(()) - /// # } + /// # Ok::<(), std::io::Error>(()) /// ``` - pub fn subscribe(&self, subject: &str) -> io::Result { - self.do_subscribe(subject, None) + pub fn subscribe(&self, subject: &Sub) -> io::Result + where + Sub: AsSubject + ?Sized, + { + self.do_subscribe(subject.as_subject()?, None) } /// Create a queue subscription for the given NATS connection. /// /// # Example /// ``` - /// # fn main() -> std::io::Result<()> { /// # let nc = nats::connect("demo.nats.io")?; /// let sub = nc.queue_subscribe("foo", "production")?; - /// # Ok(()) - /// # } + /// # Ok::<(), std::io::Error>(()) /// ``` - pub fn queue_subscribe(&self, subject: &str, queue: &str) -> io::Result { - self.do_subscribe(subject, Some(queue)) + pub fn queue_subscribe(&self, subject: &Sub, queue: &Que) -> io::Result + where + Sub: AsSubject + ?Sized, + Que: AsSubject + ?Sized, + { + self.do_subscribe(subject.as_subject()?, Some(queue.as_subject()?)) } /// Publish a message on the given subject. /// + /// It is required that the subject does not contain any wildcards else an error is returned. + /// /// # Example /// ``` - /// # fn main() -> std::io::Result<()> { /// # let nc = nats::connect("demo.nats.io")?; /// nc.publish("foo", "Hello World!")?; - /// # Ok(()) - /// # } + /// # Ok::<(), std::io::Error>(()) + /// ``` + /// + /// # Publish in loop + /// + /// When `publish()` is called in a loop you should consider turning the subject into a + /// [`SubjectBuf`] or [`Subject`]. This way the subject is only validated once. + /// + /// ``` + /// use nats::SubjectBuf; + /// + /// # let nc = nats::connect("demo.nats.io")?; + /// let subject: SubjectBuf = "foo".parse()?; + /// for _ in 0..5 { + /// nc.publish(&subject, "Hello World!")?; + /// } + /// # Ok::<(), std::io::Error>(()) /// ``` - pub fn publish(&self, subject: &str, msg: impl AsRef<[u8]>) -> io::Result<()> { + pub fn publish(&self, subject: &Sub, msg: impl AsRef<[u8]>) -> io::Result<()> + where + Sub: AsSubject + ?Sized, + { self.publish_with_reply_or_headers(subject, None, None, msg) } @@ -474,38 +498,42 @@ impl Connection { /// /// # Example /// ``` - /// # fn main() -> std::io::Result<()> { /// # let nc = nats::connect("demo.nats.io")?; /// let reply = nc.new_inbox(); /// let rsub = nc.subscribe(&reply)?; /// nc.publish_request("foo", &reply, "Help me!")?; - /// # Ok(()) - /// # } + /// # Ok::<(), std::io::Error>(()) /// ``` - pub fn publish_request( + pub fn publish_request( &self, - subject: &str, - reply: &str, + subject: &Sub, + reply: &Rep, msg: impl AsRef<[u8]>, - ) -> io::Result<()> { - self.0 - .client - .publish(subject, Some(reply), None, msg.as_ref()) + ) -> io::Result<()> + where + Sub: AsSubject + ?Sized, + Rep: AsSubject + ?Sized, + { + self.0.client.publish( + subject.as_subject()?, + Some(reply.as_subject()?), + None, + msg.as_ref(), + ) } /// Create a new globally unique inbox which can be used for replies. /// /// # Example /// ``` - /// # fn main() -> std::io::Result<()> { /// # let nc = nats::connect("demo.nats.io")?; /// let reply = nc.new_inbox(); /// let rsub = nc.subscribe(&reply)?; - /// # Ok(()) - /// # } + /// # Ok::<(), std::io::Error>(()) /// ``` - pub fn new_inbox(&self) -> String { - format!("_INBOX.{}", nuid::next()) + pub fn new_inbox(&self) -> SubjectBuf { + // Always produces a valid subject + SubjectBuf::new_unchecked(format!("_INBOX.{}", nuid::next())) } /// Publish a message on the given subject as a request and receive the @@ -513,15 +541,16 @@ impl Connection { /// /// # Example /// ``` - /// # fn main() -> std::io::Result<()> { /// # let nc = nats::connect("demo.nats.io")?; /// # nc.subscribe("foo")?.with_handler(move |m| { m.respond("ans=42")?; Ok(()) }); /// let resp = nc.request("foo", "Help me?")?; - /// # Ok(()) - /// # } + /// # Ok::<(), std::io::Error>(()) /// ``` - pub fn request(&self, subject: &str, msg: impl AsRef<[u8]>) -> io::Result { - self.request_with_headers_or_timeout(subject, None, None, msg) + pub fn request(&self, subject: &Sub, msg: impl AsRef<[u8]>) -> io::Result + where + Sub: AsSubject + ?Sized, + { + self.request_with_headers_or_timeout(subject.as_subject()?, None, None, msg) } /// Publish a message on the given subject as a request and receive the @@ -530,25 +559,26 @@ impl Connection { /// /// # Example /// ``` - /// # fn main() -> std::io::Result<()> { /// # let nc = nats::connect("demo.nats.io")?; /// # nc.subscribe("foo")?.with_handler(move |m| { m.respond("ans=42")?; Ok(()) }); /// let resp = nc.request_timeout("foo", "Help me?", std::time::Duration::from_secs(2))?; - /// # Ok(()) - /// # } + /// # Ok::<(), std::io::Error>(()) /// ``` - pub fn request_timeout( + pub fn request_timeout( &self, - subject: &str, + subject: &Sub, msg: impl AsRef<[u8]>, timeout: Duration, - ) -> io::Result { - self.request_with_headers_or_timeout(subject, None, Some(timeout), msg) + ) -> io::Result + where + Sub: AsSubject + ?Sized, + { + self.request_with_headers_or_timeout(subject.as_subject()?, None, Some(timeout), msg) } fn request_with_headers_or_timeout( &self, - subject: &str, + subject: &Subject, maybe_headers: Option<&HeaderMap>, maybe_timeout: Option, msg: impl AsRef<[u8]>, @@ -556,7 +586,7 @@ impl Connection { // Publish a request. let reply = self.new_inbox(); let sub = self.subscribe(&reply)?; - self.publish_with_reply_or_headers(subject, Some(reply.as_str()), maybe_headers, msg)?; + self.publish_with_reply_or_headers(subject, Some(&reply), maybe_headers, msg)?; // Wait for the response let result = if let Some(timeout) = maybe_timeout { @@ -582,18 +612,23 @@ impl Connection { /// /// # Example /// ``` - /// # fn main() -> std::io::Result<()> { /// # let nc = nats::connect("demo.nats.io")?; /// # nc.subscribe("foo")?.with_handler(move |m| { m.respond("ans=42")?; Ok(()) }); /// for msg in nc.request_multi("foo", "Help")?.iter().take(1) {} - /// # Ok(()) - /// # } + /// # Ok::<(), std::io::Error>(()) /// ``` - pub fn request_multi(&self, subject: &str, msg: impl AsRef<[u8]>) -> io::Result { + pub fn request_multi( + &self, + subject: &Sub, + msg: impl AsRef<[u8]>, + ) -> io::Result + where + Sub: AsSubject + ?Sized, + { // Publish a request. let reply = self.new_inbox(); let sub = self.subscribe(&reply)?; - self.publish_with_reply_or_headers(subject, Some(reply.as_str()), None, msg)?; + self.publish_with_reply_or_headers(subject, Some(&reply), None, msg)?; // Return the subscription. Ok(sub) @@ -607,11 +642,9 @@ impl Connection { /// /// # Example /// ``` - /// # fn main() -> std::io::Result<()> { /// # let nc = nats::connect("demo.nats.io")?; /// nc.flush()?; - /// # Ok(()) - /// # } + /// # Ok::<(), std::io::Error>(()) /// ``` pub fn flush(&self) -> io::Result<()> { self.flush_timeout(DEFAULT_FLUSH_TIMEOUT) @@ -625,11 +658,9 @@ impl Connection { /// /// # Example /// ``` - /// # fn main() -> std::io::Result<()> { /// # let nc = nats::connect("demo.nats.io")?; /// nc.flush()?; - /// # Ok(()) - /// # } + /// # Ok::<(), std::io::Error>(()) /// ``` pub fn flush_timeout(&self, duration: Duration) -> io::Result<()> { self.0.client.flush(duration) @@ -646,11 +677,9 @@ impl Connection { /// /// # Example /// ``` - /// # fn main() -> std::io::Result<()> { /// # let nc = nats::connect("demo.nats.io")?; /// nc.close(); - /// # Ok(()) - /// # } + /// # Ok::<(), std::io::Error>(()) /// ``` pub fn close(self) { self.0.client.flush(DEFAULT_FLUSH_TIMEOUT).ok(); @@ -663,11 +692,9 @@ impl Connection { /// /// # Example /// ``` - /// # fn main() -> std::io::Result<()> { /// # let nc = nats::connect("demo.nats.io")?; /// println!("server rtt: {:?}", nc.rtt()); - /// # Ok(()) - /// # } + /// # Ok::<(), std::io::Error>(()) /// ``` pub fn rtt(&self) -> io::Result { let start = Instant::now(); @@ -708,11 +735,9 @@ impl Connection { /// Supported as of server version 2.1.6. /// # Example /// ``` - /// # fn main() -> std::io::Result<()> { /// # let nc = nats::connect("demo.nats.io")?; /// println!("ip: {:?}", nc.client_ip()); - /// # Ok(()) - /// # } + /// # Ok::<(), std::io::Error>(()) /// ``` pub fn client_ip(&self) -> io::Result { let info = self.0.client.server_info(); @@ -745,11 +770,9 @@ impl Connection { /// /// # Example /// ``` - /// # fn main() -> std::io::Result<()> { /// # let nc = nats::connect("demo.nats.io")?; /// println!("ip: {:?}", nc.client_id()); - /// # Ok(()) - /// # } + /// # Ok::<(), std::io::Error>(()) /// ``` pub fn client_id(&self) -> u64 { self.0.client.server_info().client_id @@ -770,25 +793,25 @@ impl Connection { /// # Example /// ``` /// # use std::sync::{Arc, atomic::{AtomicBool, Ordering::SeqCst}}; - /// # fn main() -> std::io::Result<()> { + /// # use nats::Subject; /// # let nc = nats::connect("demo.nats.io")?; /// let received = Arc::new(AtomicBool::new(false)); /// let received_2 = received.clone(); + /// let subject = Subject::new("test.drain")?; /// - /// nc.subscribe("test.drain")?.with_handler(move |m| { + /// nc.subscribe(subject)?.with_handler(move |m| { /// received_2.store(true, SeqCst); /// Ok(()) /// }); /// - /// nc.publish("test.drain", "message")?; + /// nc.publish(subject, "message")?; /// nc.drain()?; /// /// # std::thread::sleep(std::time::Duration::from_secs(1)); /// /// assert!(received.load(SeqCst)); /// - /// # Ok(()) - /// # } + /// # Ok::<(), std::io::Error>(()) /// ``` pub fn drain(&self) -> io::Result<()> { self.0.client.flush(DEFAULT_FLUSH_TIMEOUT)?; @@ -800,27 +823,34 @@ impl Connection { /// /// # Example /// ```no_run - /// # fn main() -> std::io::Result<()> { + /// # use nats::Subject; /// # let nc = nats::connect("demo.nats.io")?; - /// let sub = nc.subscribe("foo.headers")?; + /// let subject = Subject::new("foo.headers")?; + /// let sub = nc.subscribe(subject)?; /// let headers = [("header1", "value1"), /// ("header2", "value2")].iter().collect(); - /// let reply_to = None; - /// nc.publish_with_reply_or_headers("foo.headers", reply_to, Some(&headers), "Hello World!")?; + /// nc.publish_with_reply_or_headers(subject, None, Some(&headers), "Hello World!")?; /// nc.flush()?; /// let message = sub.next_timeout(std::time::Duration::from_secs(2)).unwrap(); /// assert_eq!(message.headers.unwrap().len(), 2); - /// # Ok(()) - /// # } + /// # Ok::<(), std::io::Error>(()) /// ``` - pub fn publish_with_reply_or_headers( + pub fn publish_with_reply_or_headers( &self, - subject: &str, - reply: Option<&str>, + subject: &Sub, + reply: Option<&Subject>, headers: Option<&HeaderMap>, msg: impl AsRef<[u8]>, - ) -> io::Result<()> { - self.0.client.publish(subject, reply, headers, msg.as_ref()) + ) -> io::Result<()> + where + Sub: AsSubject + ?Sized, + { + self.0.client.publish( + subject.as_subject()?, + reply.map(|sub| sub.as_subject()).transpose()?, + headers, + msg.as_ref(), + ) } /// Returns the maximum payload size the most recently @@ -828,16 +858,14 @@ impl Connection { /// /// # Example /// ``` - /// # fn main() -> std::io::Result<()> { /// let nc = nats::connect("demo.nats.io")?; /// println!("max payload: {:?}", nc.max_payload()); - /// # Ok(()) - /// # } + /// # Ok::<(), std::io::Error>(()) pub fn max_payload(&self) -> usize { self.0.client.server_info.lock().max_payload } - fn do_subscribe(&self, subject: &str, queue: Option<&str>) -> io::Result { + fn do_subscribe(&self, subject: &Subject, queue: Option<&Subject>) -> io::Result { let (sid, receiver) = self.0.client.subscribe(subject, queue)?; Ok(Subscription::new( sid, @@ -848,16 +876,21 @@ impl Connection { } /// Attempts to publish a message without blocking. + /// + /// If not possible an error with [`io::ErrorKind::WouldBlock`] is returned. #[doc(hidden)] - pub fn try_publish_with_reply_or_headers( + pub fn try_publish_with_reply_or_headers( &self, - subject: &str, - reply: Option<&str>, + subject: &Sub, + reply: Option<&Subject>, headers: Option<&HeaderMap>, msg: impl AsRef<[u8]>, - ) -> Option> { + ) -> io::Result<()> + where + Sub: AsSubject + ?Sized, + { self.0 .client - .try_publish(subject, reply, headers, msg.as_ref()) + .try_publish(subject.as_subject()?, reply, headers, msg.as_ref()) } } diff --git a/src/message.rs b/src/message.rs index f8bb196f4..8daa9fca2 100644 --- a/src/message.rs +++ b/src/message.rs @@ -13,6 +13,7 @@ use std::{ fmt, io, + str::FromStr, sync::{ atomic::{AtomicBool, Ordering}, Arc, @@ -22,6 +23,7 @@ use std::{ use crate::{ client::Client, header::{self, HeaderMap}, + SubjectBuf, }; use time::OffsetDateTime; @@ -32,11 +34,11 @@ pub(crate) const MESSAGE_NOT_BOUND: &str = "message not bound to a connection"; #[derive(Clone)] pub struct Message { /// The subject this message came from. - pub subject: String, + pub subject: SubjectBuf, /// Optional reply subject that may be used for sending a response to this /// message. - pub reply: Option, + pub reply: Option, /// The message contents. pub data: Vec, @@ -73,17 +75,18 @@ impl Message { /// but cannot be used on it's own for associated methods as those require `Client` injected into `Message` /// and will error without it. pub fn new( - subject: &str, - reply: Option<&str>, - data: impl AsRef<[u8]>, + subject: SubjectBuf, + reply: Option, + data: Vec, headers: Option, ) -> Message { Message { - subject: subject.to_string(), - reply: reply.map(String::from), - data: data.as_ref().to_vec(), + subject, + reply, + data, headers, - ..Default::default() + client: None, + double_acked: Arc::new(AtomicBool::new(false)), } } @@ -96,7 +99,7 @@ impl Message { .client .as_ref() .ok_or_else(|| io::Error::new(io::ErrorKind::NotConnected, MESSAGE_NOT_BOUND))?; - client.publish(reply.as_str(), None, None, msg.as_ref())?; + client.publish(reply, None, None, msg.as_ref())?; Ok(()) } @@ -231,7 +234,7 @@ impl Message { if retries == 2 { log::warn!("double_ack is retrying until the server connection is reestablished"); } - let ack_reply = format!("_INBOX.{}", nuid::next()); + let ack_reply = SubjectBuf::new_unchecked(format!("_INBOX.{}", nuid::next())); let sub_ret = client.subscribe(&ack_reply, None); if sub_ret.is_err() { std::thread::sleep(std::time::Duration::from_millis(100)); @@ -264,139 +267,96 @@ impl Message { #[allow(clippy::eval_order_dependence)] pub fn jetstream_message_info(&self) -> Option> { const PREFIX: &str = "$JS.ACK."; - const SKIP: usize = PREFIX.len(); - let mut reply: &str = self.reply.as_ref()?; + let reply = self.reply.as_deref()?; + let reply_str = reply.as_str(); if !reply.starts_with(PREFIX) { return None; } - reply = &reply[SKIP..]; - - let mut split = reply.split('.'); - - // we should avoid allocating to prevent - // large performance degradations in - // parsing this. - let mut tokens: [Option<&str>; 10] = [None; 10]; - let mut n_tokens = 0; - for each_token in &mut tokens { - if let Some(token) = split.next() { - *each_token = Some(token); - n_tokens += 1; - } - } - - let mut token_index = 0; - - macro_rules! try_parse { - () => { - match str::parse(try_parse!(str)) { - Ok(parsed) => parsed, - Err(e) => { - log::error!( - "failed to parse jetstream reply \ - subject: {}, error: {:?}. Is your \ - nats-server up to date?", - reply, - e - ); - return None; - } - } - }; - (str) => { - if let Some(next) = tokens[token_index].take() { - #[allow(unused)] - { - // this isn't actually unused, but it's - // difficult for the compiler to infer this. - token_index += 1; - } - next - } else { - log::error!( - "unexpectedly few tokens while parsing \ - jetstream reply subject: {}. Is your \ - nats-server up to date?", - reply - ); - return None; - } - }; - } + // The first two tokens `$JS.ACK` are not considered + let n_tokens = reply.tokens().count(); + let mut tokens = reply.tokens().skip(2); - // now we can try to parse the tokens to - // individual types. We use an if-else - // chain instead of a match because it - // produces more optimal code usually, - // and we want to try the 9 (11 - the first 2) - // case first because we expect it to - // be the most common. We use >= to be - // future-proof. - if n_tokens >= 9 { + // now we can try to parse the tokens to individual types. We use an if-else chain instead + // of a match because it produces more optimal code usually, and we want to try the 11 case + // first because we expect it to be the most common. We use >= to be future-proof. + if n_tokens >= 11 { Some(crate::jetstream::JetStreamMessageInfo { domain: { - let domain: &str = try_parse!(str); + let domain: &str = tokens.next()?; if domain == "_" { None } else { Some(domain) } }, - acc_hash: Some(try_parse!(str)), - stream: try_parse!(str), - consumer: try_parse!(str), - delivered: try_parse!(), - stream_seq: try_parse!(), - consumer_seq: try_parse!(), + acc_hash: Some(tokens.next()?), + stream: tokens.next()?, + consumer: tokens.next()?, + delivered: parse_next_token(&mut tokens, reply_str)?, + stream_seq: parse_next_token(&mut tokens, reply_str)?, + consumer_seq: parse_next_token(&mut tokens, reply_str)?, published: { - let nanos: i128 = try_parse!(); + let nanos: i128 = parse_next_token(&mut tokens, reply_str)?; OffsetDateTime::from_unix_timestamp_nanos(nanos).ok()? }, - pending: try_parse!(), - token: if n_tokens >= 9 { - Some(try_parse!(str)) + pending: parse_next_token(&mut tokens, reply_str)?, + token: if n_tokens >= 11 { + Some(tokens.next()?) } else { None }, }) - } else if n_tokens == 7 { + } else if n_tokens == 9 { // we expect this to be increasingly rare, as older // servers are phased out. Some(crate::jetstream::JetStreamMessageInfo { domain: None, acc_hash: None, - stream: try_parse!(str), - consumer: try_parse!(str), - delivered: try_parse!(), - stream_seq: try_parse!(), - consumer_seq: try_parse!(), + stream: tokens.next()?, + consumer: tokens.next()?, + delivered: parse_next_token(&mut tokens, reply_str)?, + stream_seq: parse_next_token(&mut tokens, reply_str)?, + consumer_seq: parse_next_token(&mut tokens, reply_str)?, published: { - let nanos: i128 = try_parse!(); + let nanos: i128 = parse_next_token(&mut tokens, reply_str)?; OffsetDateTime::from_unix_timestamp_nanos(nanos).ok()? }, - pending: try_parse!(), + pending: parse_next_token(&mut tokens, reply_str)?, token: None, }) } else { + log::error!( + "unexpectedly few tokens while parsing \ + jetstream reply subject: {}. Is your \ + nats-server up to date?", + reply + ); None } } } -impl Default for Message { - fn default() -> Message { - Message { - subject: String::from(""), - reply: None, - data: Vec::new(), - headers: None, - client: None, - double_acked: Arc::new(AtomicBool::new(false)), - } - } +fn parse_next_token<'i, 's, T, E>( + iter: &'i mut impl Iterator, + reply: &'s str, +) -> Option +where + T: FromStr, + E: fmt::Display, +{ + iter.next()? + .parse() + .map_err(|e| { + log::error!( + "failed to parse jetstream reply subject: {}, error: {}. Is your nats-server up to date?", + reply, + e + ); + }) + .ok() } impl fmt::Debug for Message { diff --git a/src/object_store.rs b/src/object_store.rs index ab06e9f83..83d57374a 100644 --- a/src/object_store.rs +++ b/src/object_store.rs @@ -14,6 +14,8 @@ //! Support for Object Store. //! This feature is experimental and the API may change. +mod api; + use crate::header::HeaderMap; use crate::jetstream::{ DateTime, DiscardPolicy, JetStream, PushSubscription, StorageType, StreamConfig, @@ -51,7 +53,7 @@ fn is_valid_object_name(object_name: &str) -> bool { } fn sanitize_object_name(object_name: &str) -> String { - object_name.replace(".", "_").replace(" ", "_") + object_name.replace('.', "_").replace(' ', "_") } /// Configuration values for object store buckets. @@ -106,8 +108,8 @@ impl JetStream { let bucket_name = config.bucket.clone(); let stream_name = format!("OBJ_{}", bucket_name); - let chunk_subject = format!("$O.{}.C.>", bucket_name); - let meta_subject = format!("$O.{}.M.>", bucket_name); + let chunk_subject = api::object_all_chunks(&bucket_name)?; + let meta_subject = api::object_all_meta(&bucket_name)?; self.add_stream(&StreamConfig { name: stream_name, @@ -355,7 +357,7 @@ impl ObjectStore { // Grab last meta value we have. let stream_name = format!("OBJ_{}", &self.name); - let subject = format!("$O.{}.M.{}", &self.name, &object_name); + let subject = api::object_meta(&self.name, &object_name)?; let message = self.context.get_last_message(&stream_name, &subject)?; let object_info = serde_json::from_slice::(&message.data)?; @@ -413,14 +415,14 @@ impl ObjectStore { )); } - // Fetch any existing object info, if ther is any for later use. + // Fetch any existing object info, if their is any for later use. let maybe_existing_object_info = match self.info(&object_name) { Ok(object_info) => Some(object_info), Err(_) => None, }; let object_nuid = nuid::next(); - let chunk_subject = format!("$O.{}.C.{}", &self.name, &object_nuid); + let chunk_subject = api::object_chunk(&self.name, &object_nuid)?; let mut object_chunks = 0; let mut object_size = 0; @@ -440,7 +442,7 @@ impl ObjectStore { } // Create a random subject prefixed with the object stream name. - let subject = format!("$O.{}.M.{}", &self.name, &object_name); + let subject = api::object_meta(&self.name, &object_name)?; let object_info = ObjectInfo { name: object_name, description: object_meta.description, @@ -458,7 +460,7 @@ impl ObjectStore { let mut headers = HeaderMap::default(); headers.insert(NATS_ROLLUP, ROLLUP_SUBJECT.to_string()); - let message = Message::new(&subject, None, data, Some(headers)); + let message = Message::new(subject, None, data, Some(headers)); // Publish metadata self.context.publish_message(&message)?; @@ -466,7 +468,7 @@ impl ObjectStore { // Purge any old chunks. if let Some(existing_object_info) = maybe_existing_object_info { let stream_name = format!("OBJ_{}", self.name); - let chunk_subject = format!("$O.{}.C.{}", &self.name, &existing_object_info.nuid); + let chunk_subject = api::object_chunk(&self.name, &existing_object_info.nuid)?; self.context .purge_stream_subject(&stream_name, &chunk_subject)?; @@ -507,7 +509,7 @@ impl ObjectStore { return self.get(&link.name); } - let chunk_subject = format!("$O.{}.C.{}", self.name, object_info.nuid); + let chunk_subject = api::object_chunk(&self.name, &object_info.nuid)?; let subscription = self .context .subscribe_with_options(&chunk_subject, &SubscribeOptions::ordered())?; @@ -556,13 +558,13 @@ impl ObjectStore { let mut headers = HeaderMap::default(); headers.insert(NATS_ROLLUP, ROLLUP_SUBJECT.to_string()); - let subject = format!("$O.{}.M.{}", &self.name, &object_name); - let message = Message::new(&subject, None, data, Some(headers)); + let subject = api::object_meta(&self.name, object_name)?; + let message = Message::new(subject, None, data, Some(headers)); self.context.publish_message(&message)?; let stream_name = format!("OBJ_{}", self.name); - let chunk_subject = format!("$O.{}.C.{}", self.name, object_info.nuid); + let chunk_subject = api::object_chunk(&self.name, &object_info.nuid)?; self.context .purge_stream_subject(&stream_name, &chunk_subject)?; @@ -607,7 +609,7 @@ impl ObjectStore { /// # } /// ``` pub fn watch(&self) -> io::Result { - let subject = format!("$O.{}.M.>", &self.name); + let subject = api::object_all_meta(&self.name)?; let subscription = self.context.subscribe_with_options( &subject, &SubscribeOptions::ordered().deliver_last_per_subject(), diff --git a/src/object_store/api.rs b/src/object_store/api.rs new file mode 100644 index 000000000..219546c37 --- /dev/null +++ b/src/object_store/api.rs @@ -0,0 +1,25 @@ +//! Subjects representing the object store API. + +use std::io; + +use crate::SubjectBuf; + +/// Subject for requests to create a new stream. +pub fn object_meta(store: &str, object: &str) -> io::Result { + SubjectBuf::new(format!("$O.{}.M.{}", store, object)).map_err(Into::into) +} + +/// Subject for requests to create a new stream. +pub fn object_all_meta(store: &str) -> io::Result { + SubjectBuf::new(format!("$O.{}.M.>", store)).map_err(Into::into) +} + +/// Subject for requests to create a new stream. +pub fn object_chunk(store: &str, object: &str) -> io::Result { + SubjectBuf::new(format!("$O.{}.C.{}", store, object)).map_err(Into::into) +} + +/// Subject for requests to create a new stream. +pub fn object_all_chunks(store: &str) -> io::Result { + SubjectBuf::new(format!("$O.{}.C.>", store)).map_err(Into::into) +} diff --git a/src/proto.rs b/src/proto.rs index e4161e936..07e43b390 100644 --- a/src/proto.rs +++ b/src/proto.rs @@ -18,6 +18,7 @@ use std::str::{self, FromStr}; use crate::connect::ConnectInfo; use crate::{header::HeaderMap, inject_io_failure, ServerInfo}; +use crate::{Subject, SubjectBuf}; /// A protocol operation sent by the server. #[derive(Debug)] @@ -27,19 +28,19 @@ pub(crate) enum ServerOp { /// `MSG [reply-to] <#bytes>\r\n[payload]\r\n` Msg { - subject: String, + subject: SubjectBuf, sid: u64, - reply_to: Option, + reply_to: Option, payload: Vec, }, /// `HMSG [reply-to] <# header bytes> <# total /// bytes>\r\n\r\n[headers]\r\n\r\n[payload]\r\n` Hmsg { - subject: String, + subject: SubjectBuf, headers: HeaderMap, sid: u64, - reply_to: Option, + reply_to: Option, payload: Vec, }, @@ -153,8 +154,8 @@ pub(crate) fn decode(mut stream: impl BufRead) -> io::Result> { } }; - // Convert the slice into an owned string. - let subject = subject.to_string(); + // Convert the slice into an owned subject. Subjects from servers are always valid. + let subject = SubjectBuf::new_unchecked(subject.to_string()); // Parse the subject ID. let sid = u64::from_str(sid).map_err(|_| { @@ -164,8 +165,8 @@ pub(crate) fn decode(mut stream: impl BufRead) -> io::Result> { ) })?; - // Convert the slice into an owned string. - let reply_to = reply_to.map(ToString::to_string); + // Convert the slice into an owned subject. Subjects from servers are always valid. + let reply_to = reply_to.map(|sub| SubjectBuf::new_unchecked(sub.to_string())); // Parse the number of payload bytes. let num_bytes = u32::from_str(num_bytes).map_err(|_| { @@ -215,8 +216,8 @@ pub(crate) fn decode(mut stream: impl BufRead) -> io::Result> { } }; - // Convert the slice into an owned string. - let subject = subject.to_string(); + // Convert the slice into an owned string. Subjects from servers are always valid. + let subject = SubjectBuf::new_unchecked(subject.to_string()); // Parse the subject ID. let sid = u64::from_str(sid).map_err(|_| { @@ -226,8 +227,8 @@ pub(crate) fn decode(mut stream: impl BufRead) -> io::Result> { ) })?; - // Convert the slice into an owned string. - let reply_to = reply_to.map(ToString::to_string); + // Convert the slice into an owned string. Subjects from servers are always valid. + let reply_to = reply_to.map(|sub| SubjectBuf::new_unchecked(sub.to_string())); // Parse the number of payload bytes. let num_header_bytes = u32::from_str(num_header_bytes).map_err(|_| { @@ -301,23 +302,23 @@ pub(crate) enum ClientOp<'a> { /// `PUB [reply-to] <#bytes>\r\n[payload]\r\n` Pub { - subject: &'a str, - reply_to: Option<&'a str>, + subject: &'a Subject, + reply_to: Option<&'a Subject>, payload: &'a [u8], }, /// `HPUB [reply-to] <#bytes>\r\n[payload]\r\n` Hpub { - subject: &'a str, - reply_to: Option<&'a str>, + subject: &'a Subject, + reply_to: Option<&'a Subject>, headers: &'a HeaderMap, payload: &'a [u8], }, /// `SUB [queue group] \r\n` Sub { - subject: &'a str, - queue_group: Option<&'a str>, + subject: &'a Subject, + queue_group: Option<&'a Subject>, sid: u64, }, diff --git a/src/subject.rs b/src/subject.rs new file mode 100644 index 000000000..d73d2eb9d --- /dev/null +++ b/src/subject.rs @@ -0,0 +1,484 @@ +//! Typed implementation of a NATS subject. + +use std::{ + borrow::Borrow, + convert::TryFrom, + fmt, + hash::{Hash, Hasher}, + io, + ops::Deref, + str::FromStr, +}; + +use serde::{Deserialize, Serialize}; + +/// Wildcard matching a single token. +pub const SINGLE_WILDCARD: &str = "*"; + +/// Wildcard matching all following tokens. +/// +/// Only valid as last token of a [`Subject`]. +pub const MULTI_WILDCARD: &str = ">"; + +/// The character marking a multi wildcard +pub const MULTI_WILDCARD_CHAR: char = '>'; + +/// Separator of [`Token`]s. +pub const TOKEN_SEPARATOR: char = '.'; + +/// Errors validating a NATS subject. +#[derive(Debug, Copy, Clone, thiserror::Error)] +pub enum Error { + /// One of the [`Subject`]'s token is invalid. + #[error("NATS subjects's tokens are not allowed to be empty or to contain spaces or dots")] + InvalidToken, + /// The multi-wildcard token `>` is used within or at the beginning of a [`Subject`]. + #[error("The multi wildcard '>' is only allowed at the end of a subject")] + MultiWildcardInMiddle, + /// The [`Subject`] started or ended with a `.`. + #[error("The separator '.' is not allowed at the end or beginning of a subject")] + SeparatorAtEndOrBeginning, + /// Can not join [`Subject`] as it ends with a multi-wildcard as this would result in an invalid + /// [`Subject`]. + #[error("Could not join on a subject ending with the multi wildcard")] + CanNotJoin, +} + +impl From for io::Error { + fn from(err: Error) -> Self { + io::Error::new(io::ErrorKind::InvalidInput, err) + } +} + +/// Convert something into a [`Subject`]. +pub trait AsSubject { + /// Try to represent as a [`Subject`]. + fn as_subject(&self) -> Result<&Subject, io::Error>; +} + +/// A valid NATS subject. +#[repr(transparent)] +#[derive(Debug, Eq)] +pub struct Subject(str); + +/// An owned, valid NATS subject. +#[derive(Debug, Clone, Eq, Serialize, Deserialize)] +#[serde(try_from = "String")] +#[serde(into = "String")] +pub struct SubjectBuf(String); + +/// Iterator over a [`Subject`]'s tokens. +#[derive(Debug, Clone)] +pub struct Tokens<'s> { + remaining_subject: &'s str, +} + +impl Subject { + /// Constructor for a subject. + /// + /// # WARNING + /// + /// An invalid subject may brake assumptions of the [`Subject`] type. Reassure, that this call + /// definitely constructs a valid subject. + pub fn new_unchecked(sub: &str) -> &Self { + // Safety: Subject is #[repr(transparent)] therefore this is okay + #[allow(unsafe_code)] + #[allow(trivial_casts)] + unsafe { + let ptr = sub as *const _ as *const Self; + &*ptr + } + } + /// Create a new, validated NATS subject. + pub fn new(subject: &str) -> Result<&Self, Error> { + match subject.as_bytes() { + b"" => Err(Error::InvalidToken), + [b'.', ..] | [.., b'.'] => Err(Error::SeparatorAtEndOrBeginning), + s if s.starts_with(b">.") || s.windows(3).any(|win| win == b".>.") => { + Err(Error::MultiWildcardInMiddle) + } + s if s.windows(2).any(|win| win == b"..") => Err(Error::InvalidToken), + s if s.iter().any(|b| b" \t\n\r".contains(b)) => Err(Error::InvalidToken), + _ => Ok(()), + }?; + + Ok(Self::new_unchecked(subject)) + } + /// The subject as `&str`. + pub fn as_str(&self) -> &str { + self.deref() + } + /// Iterate over the subject's [`Token`]s. + pub fn tokens(&self) -> Tokens<'_> { + self.into_iter() + } + /// Check if two subjects match, considering wildcards. + pub fn matches(&self, other: &Subject) -> bool { + let mut s_tokens = self.tokens(); + let mut o_tokens = other.tokens(); + + loop { + match (s_tokens.next(), o_tokens.next()) { + (Some(MULTI_WILDCARD), Some(_)) + | (Some(_), Some(MULTI_WILDCARD)) + | (None, None) => break true, + (Some(s_t), Some(o_t)) => { + if token_match(s_t, o_t) { + continue; + } else { + break false; + } + } + (None, Some(_)) | (Some(_), None) => break false, + } + } + } + /// Check if the subjects ends with a multi wildcard. + pub fn ends_with_multi_wildcard(&self) -> bool { + self.ends_with(MULTI_WILDCARD_CHAR) + } + /// Check if the subject contains any wildcards. + /// + /// _Note:_ You can't publish to a subject that contains a wildcard. + pub fn contains_wildcards(&self) -> bool { + self.tokens() + .any(|t| t == SINGLE_WILDCARD || t == MULTI_WILDCARD) + } + /// Get the nth token of the subject. + /// + /// Returns `None` if there are not enough tokens. + pub fn get_token(&self, idx: usize) -> Option<&str> { + self.tokens().nth(idx) + } +} + +impl AsRef for Subject { + fn as_ref(&self) -> &str { + self.deref() + } +} + +impl<'s> IntoIterator for &'s Subject { + type Item = &'s str; + type IntoIter = Tokens<'s>; + + fn into_iter(self) -> Self::IntoIter { + Tokens { + remaining_subject: &self.0, + } + } +} + +impl PartialEq for Subject { + fn eq(&self, other: &str) -> bool { + self.as_str() == other + } +} + +impl fmt::Display for Subject { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.as_str()) + } +} + +impl Deref for Subject { + type Target = str; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl PartialEq for Subject { + fn eq(&self, other: &Self) -> bool { + self.0 == other.0 + } +} + +impl Hash for Subject { + fn hash(&self, state: &mut H) { + self.as_str().hash(state); + } +} + +impl ToOwned for Subject { + type Owned = SubjectBuf; + + fn to_owned(&self) -> Self::Owned { + SubjectBuf(self.0.to_owned()) + } +} + +impl AsSubject for Subject { + fn as_subject(&self) -> Result<&Subject, io::Error> { + Ok(self) + } +} + +impl<'s> AsSubject for &'s Subject { + fn as_subject(&self) -> Result<&Subject, io::Error> { + Ok(self) + } +} + +impl SubjectBuf { + /// Create a new, owned and validated NATS subject. + pub fn new(subject: String) -> Result { + Subject::new(&subject)?; + Ok(Self(subject)) + } + /// Const constructor for a subject buffer without validation. + /// + /// # WARNING + /// + /// An invalid subject may brake assumptions of the [`SubjectBuf`] type. Reassure, that this call + /// definitely constructs a valid subject buffer. + pub const fn new_unchecked(subject: String) -> Self { + Self(subject) + } + /// Convert the subject buffer into the inner string. + pub fn into_inner(self) -> String { + self.0 + } + /// Append a token. + pub fn join(mut self, token: &str) -> Result { + if !valid_token(token) { + Err(Error::InvalidToken) + } else if self.0.ends_with(MULTI_WILDCARD_CHAR) { + Err(Error::CanNotJoin) + } else { + self.0.reserve(token.len() + 1); + self.0.push(TOKEN_SEPARATOR); + self.0.push_str(token); + Ok(self) + } + } +} + +impl FromStr for SubjectBuf { + type Err = Error; + + fn from_str(s: &str) -> Result { + Subject::new(s)?; + Ok(SubjectBuf(s.to_owned())) + } +} + +impl From for String { + fn from(sub: SubjectBuf) -> Self { + sub.0 + } +} + +impl TryFrom for SubjectBuf { + type Error = Error; + + fn try_from(value: String) -> Result { + Self::new(value) + } +} + +impl PartialEq for SubjectBuf { + fn eq(&self, other: &str) -> bool { + self.as_str() == other + } +} + +impl<'s> PartialEq<&'s str> for SubjectBuf { + fn eq(&self, other: &&'s str) -> bool { + self.as_str() == *other + } +} + +impl fmt::Display for SubjectBuf { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.as_ref()) + } +} + +impl Deref for SubjectBuf { + type Target = Subject; + + fn deref(&self) -> &Self::Target { + Subject::new_unchecked(&self.0) + } +} + +impl AsRef for SubjectBuf { + fn as_ref(&self) -> &Subject { + self.deref() + } +} + +impl PartialEq for SubjectBuf { + fn eq(&self, other: &Self) -> bool { + self.0 == other.0 + } +} + +impl Hash for SubjectBuf { + fn hash(&self, state: &mut H) { + self.as_str().hash(state); + } +} + +impl Borrow for SubjectBuf { + fn borrow(&self) -> &Subject { + self.deref() + } +} + +impl AsSubject for SubjectBuf { + fn as_subject(&self) -> Result<&Subject, io::Error> { + Ok(self.deref()) + } +} + +impl<'sb> AsSubject for &'sb SubjectBuf { + fn as_subject(&self) -> Result<&Subject, io::Error> { + Ok(self.deref()) + } +} + +impl<'s> Iterator for Tokens<'s> { + type Item = &'s str; + + fn next(&mut self) -> Option { + if self.remaining_subject.is_empty() { + None + } else if let Some((token, rest)) = self.remaining_subject.split_once(TOKEN_SEPARATOR) { + self.remaining_subject = rest; + Some(token) + } else { + let last = std::mem::take(&mut self.remaining_subject); + Some(last) + } + } +} + +impl AsSubject for str { + fn as_subject(&self) -> Result<&Subject, io::Error> { + Subject::new(self).map_err(Into::into) + } +} + +impl<'s> AsSubject for &'s str { + fn as_subject(&self) -> Result<&Subject, io::Error> { + (*self).as_subject() + } +} + +impl AsSubject for String { + fn as_subject(&self) -> Result<&Subject, io::Error> { + self.as_str().as_subject() + } +} + +impl<'s> AsSubject for &'s String { + fn as_subject(&self) -> Result<&Subject, io::Error> { + (*self).as_subject() + } +} + +fn valid_token(token: &str) -> bool { + !token.is_empty() && !token.contains(['.', ' ', '\n', '\t', '\r']) +} + +fn token_match(lt: &str, rt: &str) -> bool { + lt == rt + || lt == SINGLE_WILDCARD + || rt == SINGLE_WILDCARD + || lt == MULTI_WILDCARD + || rt == MULTI_WILDCARD +} + +#[cfg(test)] +mod test { + use super::*; + + use test_case::test_case; + + #[test_case("" => false ; "empty")] + #[test_case("*" => true ; "single wildcard")] + #[test_case(">" => true ; "multi wildcard")] + #[test_case(">>" => true ; "double multi wildcard")] + #[test_case("!" => true ; "special char")] + #[test_case("á" => true ; "non ascii")] + #[test_case("probe" => true ; "valid name")] + #[test_case("pröbe" => true ; "non alphanumeric")] + #[test_case("$SYS" => true ; "system account")] + #[test_case("ab.cd" => false ; "contains dot")] + #[test_case("ab cd" => false ; "contains space")] + fn validate_token(token: &str) -> bool { + valid_token(token) + } + + #[test_case("" => false ; "empty")] + #[test_case("*" => true ; "single wildcard")] + #[test_case(">" => true ; "wire tap")] + #[test_case("abc.12345.cda.>" => true ; "end with multi")] + #[test_case("uu.12345" => true ; "plain")] + #[test_case("fAN.*.sdb.*" => true ; "multiple single wildcards")] + #[test_case("zzz.>.cdc" => false ; "middle multi wildcard")] + #[test_case("zzz.*." => false ; "ending dot")] + #[test_case(".dot" => false ; "starting dot")] + #[test_case("dot..dot" => false ; "empty token")] + #[test_case(">>" => true ; "double multi wildcard")] + #[test_case("hi.**.no" => true ; "double single wildcard")] + fn validate_subject(subject: &str) -> bool { + Subject::new(subject).is_ok() + } + + #[test_case("*", "abc" => true ; "single wildcard")] + #[test_case("cba", "*" => true ; "single wildcard reverse")] + #[test_case(">", "abc" => true ; "multi wildcard")] + #[test_case("cba", ">" => true ; "multi wildcard reverse")] + #[test_case("*", ">" => true ; "mixed wildcards")] + #[test_case("cba", "abc" => false ; "unequal tokens")] + fn match_tokens(l: &str, r: &str) -> bool { + token_match(l, r) + } + + #[test_case("cba", "abc" => false ; "unequal subjects")] + #[test_case("cba.*", "cba.abc" => true ; "single wildcard")] + #[test_case("cba.*.zzz", "cba.abc.zzz" => true ; "single wildcard middle")] + #[test_case("ab.cd.ef", "ab.cd" => false ; "longer")] + #[test_case("ab.cd", "ab.cd.ef" => false ; "longer reverse")] + #[test_case(">", "cba.abc.zzz" => true ; "wire tap")] + #[test_case(">", "cba.*.zzz" => true ; "wire tap against single wildcard")] + #[test_case("cba.>", "cba.abc.zzz" => true ; "multi wildcard")] + #[test_case("*.>", "cba.abc.zzz" => true ; "both wildcards")] + #[test_case("cba.*.zzz", "cba.abc.yyy" => false ; "not matching")] + fn match_subjects(l: &str, r: &str) -> bool { + let l = Subject::new(l).unwrap(); + let r = Subject::new(r).unwrap(); + l.matches(r) + } + + #[test_case("abc", &["def"], "abc.def" ; "single token")] + #[test_case("abc", &["def", "ghi", "012"], "abc.def.ghi.012" ; "more tokens")] + #[test_case(">", &["abc"], "" => panics ; "wire tap")] + #[test_case("abc.def.>", &["abc"], "" => panics ; "join on multi wildcard")] + #[test_case("abc.def", &["*"], "abc.def.*" ; "single wildcard")] + #[test_case("abc.def", &["*", "fed"], "abc.def.*.fed" ; "single wildcard and more")] + #[test_case("abc", &[">"], "abc.>" ; "multi wildcard")] + #[test_case("abc", &[">", "cba"], "" => panics ; "multi wildcard and more")] + fn join_subject(base: &str, appends: &[&str], expect: &str) { + let mut base = SubjectBuf::new(base.to_owned()).unwrap(); + for append in appends { + base = base.join(append).unwrap(); + } + + assert_eq!(base, expect); + } + + #[test] + fn same_hash() -> Result<(), Error> { + let sub = Subject::new("foo.bar")?; + let buf = sub.to_owned(); + let mut map = std::collections::HashSet::new(); + map.insert(buf); + assert!(map.get(sub).is_some()); + Ok(()) + } +} diff --git a/tests/drop.rs b/tests/drop.rs index b136b29f8..f437b695b 100644 --- a/tests/drop.rs +++ b/tests/drop.rs @@ -11,7 +11,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use nats::jetstream; use smol::future::FutureExt; use std::{ io, @@ -123,7 +122,7 @@ fn close_responsiveness_regression_jetstream() { js.add_stream(nats::jetstream::StreamConfig { name: "TEST".to_string(), - subjects: vec!["subject".to_string()], + subjects: vec!["subject".parse().unwrap()], ..Default::default() }) .unwrap(); @@ -147,7 +146,7 @@ fn close_responsiveness_regression_jetstream_complex() { jetstream .add_stream(nats::jetstream::StreamConfig { name: "stream10".to_string(), - subjects: vec!["subject11".to_string(), "subject12".to_string()], + subjects: vec!["subject11".parse().unwrap(), "subject12".parse().unwrap()], max_msgs_per_subject: 1, storage: nats::jetstream::StorageType::Memory, ..Default::default() @@ -172,7 +171,7 @@ fn close_responsiveness_regression_jetstream_complex() { crossbeam_channel::Sender, crossbeam_channel::Receiver, ) = crossbeam_channel::bounded(32); - sub.clone().with_process_handler(move |msg| { + sub.clone().with_process_handler(move |_| { result_tx .send(1) .expect("failed to report back over channel"); diff --git a/tests/jetstream.rs b/tests/jetstream.rs index 53187446e..782d3de12 100644 --- a/tests/jetstream.rs +++ b/tests/jetstream.rs @@ -64,7 +64,11 @@ fn jetstream_publish() { // Create the stream using our client API. js.add_stream(StreamConfig { name: "TEST".to_string(), - subjects: vec!["test".to_string(), "foo".to_string(), "bar".to_string()], + subjects: vec![ + "test".parse().unwrap(), + "foo".parse().unwrap(), + "bar".parse().unwrap(), + ], ..Default::default() }) .unwrap(); @@ -261,10 +265,10 @@ fn jetstream_subscribe() { js.add_stream(&StreamConfig { name: "TEST".to_string(), subjects: vec![ - "foo".to_string(), - "bar".to_string(), - "baz".to_string(), - "foo.*".to_string(), + "foo".parse().unwrap(), + "bar".parse().unwrap(), + "baz".parse().unwrap(), + "foo.*".parse().unwrap(), ], ..Default::default() }) @@ -312,10 +316,10 @@ fn jetstream_subscribe_durable() { js.add_stream(&StreamConfig { name: "TEST".to_string(), subjects: vec![ - "foo".to_string(), - "bar".to_string(), - "baz".to_string(), - "foo.*".to_string(), + "foo".parse().unwrap(), + "bar".parse().unwrap(), + "baz".parse().unwrap(), + "foo.*".parse().unwrap(), ], ..Default::default() }) @@ -374,10 +378,10 @@ fn jetstream_queue_subscribe() { js.add_stream(&StreamConfig { name: "TEST".to_string(), subjects: vec![ - "foo".to_string(), - "bar".to_string(), - "baz".to_string(), - "foo.*".to_string(), + "foo".parse().unwrap(), + "bar".parse().unwrap(), + "baz".parse().unwrap(), + "foo.*".parse().unwrap(), ], ..Default::default() }) @@ -440,33 +444,32 @@ fn jetstream_queue_subscribe_no_mismatch_handle() { jsm.add_stream(StreamConfig { name: "jobs_stream".to_string(), discard: DiscardPolicy::Old, - subjects: vec!["waiting_jobs".to_string()], + subjects: vec!["waiting_jobs".parse().unwrap()], retention: RetentionPolicy::WorkQueue, storage: StorageType::File, ..Default::default() }) .unwrap(); - let c = jsm - .add_consumer( - "jobs_stream", - ConsumerConfig { - deliver_group: Some("dg".to_string()), - durable_name: Some("durable".to_string()), - deliver_policy: DeliverPolicy::All, - ack_policy: AckPolicy::Explicit, - deliver_subject: Some("deliver_subject".to_string()), - ..Default::default() - }, - ) - .unwrap(); + jsm.add_consumer( + "jobs_stream", + ConsumerConfig { + deliver_group: Some("dg".parse().unwrap()), + durable_name: Some("durable".to_string()), + deliver_policy: DeliverPolicy::All, + ack_policy: AckPolicy::Explicit, + deliver_subject: Some("deliver_subject".parse().unwrap()), + ..Default::default() + }, + ) + .unwrap(); let job_sub = jsm .queue_subscribe_with_options( "waiting_jobs", "dg", &SubscribeOptions::bind("jobs_stream".to_string(), "durable".to_string()) - .deliver_subject("deliver_subject".to_string()) + .deliver_subject("deliver_subject".parse().unwrap()) .ack_explicit() .deliver_all() .replay_instant(), @@ -486,7 +489,7 @@ fn jetstream_queue_subscribe_no_mismatch_handle() { "waiting_jobs", "dg", &SubscribeOptions::bind("jobs_stream".to_string(), "durable".to_string()) - .deliver_subject("deliver_subject".to_string()) + .deliver_subject("deliver_subject".parse().unwrap()) .ack_explicit() .deliver_all() .replay_instant(), @@ -512,10 +515,10 @@ fn jetstream_flow_control() { js.add_stream(&StreamConfig { name: "TEST".to_string(), subjects: vec![ - "foo".to_string(), - "bar".to_string(), - "baz".to_string(), - "foo.*".to_string(), + "foo".parse().unwrap(), + "bar".parse().unwrap(), + "baz".parse().unwrap(), + "foo.*".parse().unwrap(), ], ..Default::default() }) @@ -528,7 +531,7 @@ fn jetstream_flow_control() { "foo", &SubscribeOptions::new() .durable_name("foo".to_string()) - .deliver_subject("fs".to_string()) + .deliver_subject("fs".parse().unwrap()) .idle_heartbeat(Duration::from_millis(300)) .enable_flow_control(), ) @@ -562,10 +565,10 @@ fn jetstream_ordered() { js.add_stream(&StreamConfig { name: "TEST".to_string(), subjects: vec![ - "foo".to_string(), - "bar".to_string(), - "baz".to_string(), - "foo.*".to_string(), + "foo".parse().unwrap(), + "bar".parse().unwrap(), + "baz".parse().unwrap(), + "foo.*".parse().unwrap(), ], ..Default::default() }) @@ -601,7 +604,7 @@ fn jetstream_pull_subscribe_fetch() { js.add_stream(&StreamConfig { name: "TEST".to_string(), - subjects: vec!["foo".to_string()], + subjects: vec!["foo".parse().unwrap()], ..Default::default() }) .unwrap(); @@ -653,7 +656,7 @@ fn jetstream_pull_subscribe_timeout_fetch() { js.add_stream(&StreamConfig { name: "TEST".to_string(), - subjects: vec!["foo".to_string()], + subjects: vec!["foo".parse().unwrap()], ..Default::default() }) .unwrap(); @@ -711,7 +714,7 @@ fn jetstream_pull_subscribe_fetch_with_handler() { js.add_stream(&StreamConfig { name: "TEST".to_string(), - subjects: vec!["foo".to_string()], + subjects: vec!["foo".parse().unwrap()], ..Default::default() }) .unwrap(); @@ -762,7 +765,7 @@ fn jetstream_pull_subscribe_ephemeral() { js.add_stream(&StreamConfig { name: "TEST".to_string(), - subjects: vec!["foo".to_string()], + subjects: vec!["foo".parse().unwrap()], ..Default::default() }) .unwrap(); diff --git a/tests/kv.rs b/tests/kv.rs index 108f8cbda..324c186fa 100644 --- a/tests/kv.rs +++ b/tests/kv.rs @@ -239,7 +239,7 @@ fn key_value_bind() { context .add_stream(&StreamConfig { name: "KV_TEST".to_string(), - subjects: vec!["foo".to_string()], + subjects: vec!["foo".parse().unwrap()], ..Default::default() }) .unwrap(); diff --git a/tests/no_messages.rs b/tests/no_messages.rs index 708e96965..bc5c1bd5f 100644 --- a/tests/no_messages.rs +++ b/tests/no_messages.rs @@ -23,7 +23,7 @@ fn no_messages() { js.add_stream(&StreamConfig { name: "TEST".to_string(), - subjects: vec!["foo".to_string()], + subjects: vec!["foo".parse().unwrap()], ..Default::default() }) .unwrap(); diff --git a/tests/request_timeout.rs b/tests/request_timeout.rs index 7235a2801..643b2981e 100644 --- a/tests/request_timeout.rs +++ b/tests/request_timeout.rs @@ -23,7 +23,7 @@ fn request_timeout() { js.add_stream(&StreamConfig { name: "TEST".to_string(), - subjects: vec!["foo".to_string()], + subjects: vec!["foo".parse().unwrap()], ..Default::default() }) .unwrap();