Skip to content

Commit

Permalink
Pass subject at call sites bound to an AsSubject trait
Browse files Browse the repository at this point in the history
  • Loading branch information
caspervonb authored Oct 24, 2023
1 parent 4fa004f commit 237d677
Show file tree
Hide file tree
Showing 30 changed files with 388 additions and 481 deletions.
14 changes: 7 additions & 7 deletions async-nats/benches/core_nats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pub fn publish(c: &mut Criterion) {
let rt = tokio::runtime::Runtime::new().unwrap();
let nc = rt.block_on(async {
let nc = async_nats::connect(server.client_url()).await.unwrap();
nc.publish("data".into(), "data".into()).await.unwrap();
nc.publish("data", "data".into()).await.unwrap();
nc
});

Expand Down Expand Up @@ -99,13 +99,13 @@ pub fn subscribe(c: &mut Criterion) {
started.send(()).unwrap();
loop {
client
.publish("bench".into(), Bytes::from_static(&MSG[..size]))
.publish("bench", Bytes::from_static(&MSG[..size]))
.await
.unwrap()
}
}
});
nc.publish("data".into(), "data".into()).await.unwrap();
nc.publish("data", "data".into()).await.unwrap();
ready.await.unwrap();
nc
});
Expand Down Expand Up @@ -149,7 +149,7 @@ pub fn request(c: &mut Criterion) {
.await
.unwrap();

let mut subscription = client.subscribe("bench".into()).await.unwrap();
let mut subscription = client.subscribe("bench").await.unwrap();
tokio::time::sleep(Duration::from_secs(3)).await;
started.send(()).unwrap();

Expand Down Expand Up @@ -178,18 +178,18 @@ pub fn request(c: &mut Criterion) {

async fn requests(nc: async_nats::Client, msg: Bytes, amount: u64) {
for _i in 0..amount {
nc.request("bench".into(), msg.clone()).await.unwrap();
nc.request("bench", msg.clone()).await.unwrap();
}
}

async fn publish_messages(nc: async_nats::Client, msg: Bytes, amount: u64) {
for _i in 0..amount {
nc.publish("bench".into(), msg.clone()).await.unwrap();
nc.publish("bench", msg.clone()).await.unwrap();
}
}

async fn subscribe_messages(nc: async_nats::Client, amount: u64) {
let mut sub = nc.subscribe("bench".into()).await.unwrap();
let mut sub = nc.subscribe("bench").await.unwrap();
for _ in 0..amount {
sub.next().await.unwrap();
}
Expand Down
4 changes: 2 additions & 2 deletions async-nats/benches/jetstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ pub fn jetstream_publish_async(c: &mut Criterion) {
async fn publish_sync_batch(context: async_nats::jetstream::Context, msg: Bytes, amount: u64) {
for _i in 0..amount {
context
.publish("bench".into(), msg.clone())
.publish("bench", msg.clone())
.await
.unwrap()
.await
Expand All @@ -207,7 +207,7 @@ async fn publish_async_batch(context: async_nats::jetstream::Context, msg: Bytes
}
});
for _ in 0..amount {
let ack = context.publish("bench".into(), msg.clone()).await.unwrap();
let ack = context.publish("bench", msg.clone()).await.unwrap();
tx.send(ack.into_future()).await.unwrap();
}
handle.await.unwrap();
Expand Down
4 changes: 2 additions & 2 deletions async-nats/examples/concurrent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ async fn main() -> Result<(), async_nats::Error> {
// `Subscriber` implements Rust iterator, so we can leverage
// combinators like `take()` to limit the messages intended
// to be consumed for this interaction.
let subscription = client.subscribe("greet.*".into()).await?.take(50);
let subscription = client.subscribe("greet.*").await?.take(50);

// Publish set of messages, each with order identifier.
for i in 0..50 {
client
.publish("greet.joe".into(), format!("hello {i}").into())
.publish("greet.joe", format!("hello {i}").into())
.await?;
}

Expand Down
2 changes: 1 addition & 1 deletion async-nats/examples/jetstream_pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ async fn main() -> Result<(), async_nats::Error> {
// Publish a few messages for the example.
for i in 0..10 {
jetstream
.publish(format!("events.{i}").into(), "data".into())
.publish(format!("events.{i}"), "data".into())
// The first `await` sends the publish
.await?
// The second `await` awaits a publish acknowledgement.
Expand Down
2 changes: 1 addition & 1 deletion async-nats/examples/jetstream_push.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ async fn main() -> Result<(), async_nats::Error> {
// Publish a few messages for the example.
for i in 0..10 {
jetstream
.publish(format!("events.{i}").into(), "data".into())
.publish(format!("events.{i}"), "data".into())
// The first `await` sends the publish
.await?
// The second `await` awaits a publish acknowledgement.
Expand Down
4 changes: 2 additions & 2 deletions async-nats/examples/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ async fn main() -> Result<(), async_nats::Error> {
let client = async_nats::connect(nats_url).await?;

// Create a subscription that handles one message.
let mut subscriber = client.subscribe("foo".into()).await?.take(1);
let mut subscriber = client.subscribe("foo").await?.take(1);

// Construct a Payload value and serialize it.
let payload = Payload {
Expand All @@ -30,7 +30,7 @@ async fn main() -> Result<(), async_nats::Error> {
let bytes = serde_json::to_vec(&json!(payload))?;

// Publish the serialized payload.
client.publish("foo".into(), bytes.into()).await?;
client.publish("foo", bytes.into()).await?;

while let Some(message) = subscriber.next().await {
// Deserialize the message payload into a Payload value.
Expand Down
8 changes: 4 additions & 4 deletions async-nats/examples/multiple_subs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ async fn main() -> Result<(), async_nats::Error> {
tokio::task::spawn({
let client = client.clone();
async move {
let mut subscriber = client.subscribe("foo".into()).await?;
let mut subscriber = client.subscribe("foo").await?;

println!("Awaiting messages on foo");
while let Some(message) = subscriber.next().await {
Expand All @@ -24,7 +24,7 @@ async fn main() -> Result<(), async_nats::Error> {
tokio::task::spawn({
let client = client.clone();
async move {
let mut subscriber = client.subscribe("bar".into()).await?;
let mut subscriber = client.subscribe("bar").await?;

println!("Awaiting messages on bar");
while let Some(message) = subscriber.next().await {
Expand All @@ -42,7 +42,7 @@ async fn main() -> Result<(), async_nats::Error> {
async move {
let now = Instant::now();
for _ in 0..10_000 {
client.publish("foo".into(), "data".into()).await?;
client.publish("foo", "data".into()).await?;
}
Ok::<std::time::Duration, async_nats::Error>(now.elapsed())
}
Expand All @@ -54,7 +54,7 @@ async fn main() -> Result<(), async_nats::Error> {
async move {
let now = Instant::now();
for _ in 0..10_000 {
client.publish("bar".into(), "data".into()).await?;
client.publish("bar", "data".into()).await?;
}
Ok::<std::time::Duration, async_nats::Error>(now.elapsed())
}
Expand Down
2 changes: 1 addition & 1 deletion async-nats/examples/sub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ async fn main() -> Result<(), async_nats::Error> {
let client = async_nats::connect("nats://localhost:4222").await?;

let now = Instant::now();
let mut subscriber = client.subscribe("foo".into()).await.unwrap();
let mut subscriber = client.subscribe("foo").await.unwrap();

println!("Awaiting messages");
while let Some(message) = subscriber.next().await {
Expand Down
Loading

0 comments on commit 237d677

Please sign in to comment.