Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a subject type #222

Closed
wants to merge 28 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ serde_json = "1.0.64"
serde_nanos = "0.1.1"
serde_repr = "0.1.7"
memchr = "2.4.0"
thiserror = "1.0"
url = "2.2.2"
time = { version = "0.3.6", features = ["parsing", "formatting", "serde", "serde-well-known"] }

Expand All @@ -73,12 +74,17 @@ nats_test_server = { path = "nats_test_server" }
quicli = "0.4.0"
smol = "1.2.5"
structopt = "0.3.21"
test-case = "1.2"
nats_016 = { package = "nats", version = "0.16.0" }

[[bench]]
name = "nats_bench"
harness = false

[[bench]]
name = "subject_bench"
harness = false

[[example]]
name = "nats-box"
path = "examples/nats-box/main.rs"
Expand Down
25 changes: 25 additions & 0 deletions benches/subject_bench.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
use criterion::{criterion_group, criterion_main, Criterion};
use nats::Subject;

const WIRETAP: &str = ">";
const MULTIPLE_SINGLE_WILDCARDS: &str = "fAN.*.sdb.*";
const SHORT_SUBJECT: &str = "$SYS.REQ.SERVER.PING";
const SUBJECT_MULTI_WILDCARD: &str =
"id.00112233445566778899aabbccddeeff.token1.token2.token3.token4.>";
const LONG_SUBJECT: &str = "id.00112233445566778899aabbccddeeff.info.token1.*.tokenabzdd.!§$toke.*.kjadslfha.iiiiiäöü.--__--.*.%%&jjkkll.>";

fn validate(c: &mut Criterion) {
let mut group = c.benchmark_group("validate");
group.bench_function("wiretap", |b| b.iter(|| Subject::new(WIRETAP).is_ok()));
group.bench_function("short mixed", |b| {
b.iter(|| Subject::new(MULTIPLE_SINGLE_WILDCARDS).is_ok())
});
group.bench_function("short", |b| b.iter(|| Subject::new(SHORT_SUBJECT).is_ok()));
group.bench_function("mw", |b| {
b.iter(|| Subject::new(SUBJECT_MULTI_WILDCARD).is_ok())
});
group.bench_function("long", |b| b.iter(|| Subject::new(LONG_SUBJECT).is_ok()));
}

criterion_group!(benches, validate);
criterion_main!(benches);
161 changes: 86 additions & 75 deletions src/asynk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,7 @@ use std::time::Duration;
use blocking::unblock;
use crossbeam_channel::{Receiver, Sender};

use crate::header::HeaderMap;
use crate::IntoServerList;
use crate::{header::HeaderMap, AsSubject, IntoServerList, Subject, SubjectBuf};

/// Connect to a NATS server at the given url.
///
Expand Down Expand Up @@ -141,39 +140,46 @@ impl Connection {
}

/// Publishes a message.
pub async fn publish(&self, subject: &str, msg: impl AsRef<[u8]>) -> io::Result<()> {
self.publish_with_reply_or_headers(subject, None, None, msg)
pub async fn publish(&self, subject: impl AsSubject, msg: impl AsRef<[u8]>) -> io::Result<()> {
self.publish_with_reply_or_headers(subject.as_subject()?, None, None, msg)
.await
}

/// Publishes a message with a reply subject.
pub async fn publish_request(
&self,
subject: &str,
reply: &str,
subject: impl AsSubject,
reply: impl AsSubject,
msg: impl AsRef<[u8]>,
) -> io::Result<()> {
if let Some(res) =
self.inner
.try_publish_with_reply_or_headers(subject, Some(reply), None, &msg)
{
return res;
let subject = subject.as_subject()?.to_owned();
let reply = reply.as_subject()?.to_owned();

let res = self
.inner
.try_publish_with_reply_or_headers(&subject, Some(&reply), None, &msg);
match res {
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
let msg = msg.as_ref().to_vec();
let inner = self.inner.clone();
unblock(move || inner.publish_request(&subject, &reply, msg)).await
}
_ => Ok(()),
}
let subject = subject.to_string();
let reply = reply.to_string();
let msg = msg.as_ref().to_vec();
let inner = self.inner.clone();
unblock(move || inner.publish_request(&subject, &reply, msg)).await
}

/// Creates a new unique subject for receiving replies.
pub fn new_inbox(&self) -> String {
pub fn new_inbox(&self) -> SubjectBuf {
self.inner.new_inbox()
}

/// Publishes a message and waits for the response.
pub async fn request(&self, subject: &str, msg: impl AsRef<[u8]>) -> io::Result<Message> {
let subject = subject.to_string();
pub async fn request(
&self,
subject: impl AsSubject,
msg: impl AsRef<[u8]>,
) -> io::Result<Message> {
let subject = subject.as_subject()?.to_owned();
let msg = msg.as_ref().to_vec();
let inner = self.inner.clone();
let msg = unblock(move || inner.request(&subject, msg)).await?;
Expand All @@ -184,11 +190,11 @@ impl Connection {
/// timeout duration is reached
pub async fn request_timeout(
&self,
subject: &str,
subject: impl AsSubject,
msg: impl AsRef<[u8]>,
timeout: Duration,
) -> io::Result<Message> {
let subject = subject.to_string();
let subject = subject.as_subject()?.to_owned();
let msg = msg.as_ref().to_vec();
let inner = self.inner.clone();
let msg = unblock(move || inner.request_timeout(&subject, msg, timeout)).await?;
Expand All @@ -199,10 +205,10 @@ impl Connection {
/// response.
pub async fn request_multi(
&self,
subject: &str,
subject: impl AsSubject,
msg: impl AsRef<[u8]>,
) -> io::Result<Subscription> {
let subject = subject.to_string();
let subject = subject.as_subject()?.to_owned();
let msg = msg.as_ref().to_vec();
let inner = self.inner.clone();
let sub = unblock(move || inner.request_multi(&subject, msg)).await?;
Expand All @@ -215,8 +221,8 @@ impl Connection {
}

/// Creates a subscription.
pub async fn subscribe(&self, subject: &str) -> io::Result<Subscription> {
let subject = subject.to_string();
pub async fn subscribe(&self, subject: impl AsSubject) -> io::Result<Subscription> {
let subject = subject.as_subject()?.to_owned();
let inner = self.inner.clone();
let inner = unblock(move || inner.subscribe(&subject)).await?;
let (_closer_tx, closer_rx) = crossbeam_channel::bounded(0);
Expand All @@ -228,9 +234,13 @@ impl Connection {
}

/// Creates a queue subscription.
pub async fn queue_subscribe(&self, subject: &str, queue: &str) -> io::Result<Subscription> {
let subject = subject.to_string();
let queue = queue.to_string();
pub async fn queue_subscribe(
&self,
subject: impl AsSubject,
queue: impl AsSubject,
) -> io::Result<Subscription> {
let subject = subject.as_subject()?.to_owned();
let queue = queue.as_subject()?.to_owned();
let inner = self.inner.clone();
let inner = unblock(move || inner.queue_subscribe(&subject, &queue)).await?;
let (_closer_tx, closer_rx) = crossbeam_channel::bounded(0);
Expand Down Expand Up @@ -288,28 +298,39 @@ impl Connection {
}

/// Publish a message which may have a reply subject or headers set.
pub async fn publish_with_reply_or_headers(
pub async fn publish_with_reply_or_headers<'s>(
&self,
subject: &str,
reply: Option<&str>,
headers: Option<&HeaderMap>,
subject: &Subject,
reply: Option<&Subject>,
headers: Option<&'s HeaderMap>,
msg: impl AsRef<[u8]>,
) -> io::Result<()> {
if let Some(res) = self
.inner
.try_publish_with_reply_or_headers(subject, reply, headers, &msg)
{
return res;
}
let subject = subject.to_string();
let reply = reply.map(str::to_owned);
let subject = subject.to_owned();
let reply = reply.map(ToOwned::to_owned);
let headers = headers.map(HeaderMap::clone);
let msg = msg.as_ref().to_vec();
let inner = self.inner.clone();
unblock(move || {
inner.publish_with_reply_or_headers(&subject, reply.as_deref(), headers.as_ref(), msg)
})
.await

let res = self.inner.try_publish_with_reply_or_headers(
&subject,
reply.as_deref(),
headers.clone().as_ref(),
&msg,
);
match res {
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
let msg = msg.as_ref().to_vec();
let inner = self.inner.clone();
unblock(move || {
inner.publish_with_reply_or_headers(
&subject,
reply.as_deref(),
headers.as_ref(),
msg,
)
})
.await
}
_ => Ok(()),
}
}
}

Expand Down Expand Up @@ -383,11 +404,11 @@ impl Subscription {
#[derive(Clone)]
pub struct Message {
/// The subject this message came from.
pub subject: String,
pub subject: SubjectBuf,

/// Optional reply subject that may be used for sending a response to this
/// message.
pub reply: Option<String>,
pub reply: Option<SubjectBuf>,

/// The message contents.
pub data: Vec<u8>,
Expand Down Expand Up @@ -424,17 +445,18 @@ impl Message {
/// but cannot be used on it's own for associated methods as those require `Client` injected into `Message`
/// and will error without it.
pub fn new(
subject: &str,
reply: Option<&str>,
data: impl AsRef<[u8]>,
subject: SubjectBuf,
reply: Option<SubjectBuf>,
data: Vec<u8>,
headers: Option<HeaderMap>,
) -> Message {
Message {
subject: subject.to_string(),
reply: reply.map(String::from),
data: data.as_ref().to_vec(),
subject,
reply,
data,
headers,
..Default::default()
client: None,
double_acked: Arc::new(AtomicBool::new(false)),
}
}

Expand All @@ -449,26 +471,15 @@ impl Message {
crate::message::MESSAGE_NOT_BOUND,
)
})?;
if let Some(res) = client.try_publish(reply.as_str(), None, None, msg.as_ref()) {
return res;
}
// clone only if we have to move the data to the thread
let client = client.clone();
let reply = reply.to_owned();
let msg = msg.as_ref().to_vec();
unblock(move || client.publish(&reply, None, None, msg.as_ref())).await
}
}

impl Default for Message {
fn default() -> Message {
Message {
subject: String::from(""),
reply: None,
data: Vec::new(),
headers: None,
client: None,
double_acked: Arc::new(AtomicBool::new(false)),
let res = client.try_publish(reply, None, None, msg.as_ref());
match res {
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
let client = client.clone();
let reply = reply.to_owned();
let msg = msg.as_ref().to_vec();
unblock(move || client.publish(&reply, None, None, msg.as_ref())).await
}
_ => Ok(()),
}
}
}
Expand Down
Loading