diff --git a/.config/nats.dic b/.config/nats.dic index b3adc9f98..e2e2b0174 100644 --- a/.config/nats.dic +++ b/.config/nats.dic @@ -140,3 +140,4 @@ filter_subject filter_subjects rollup IoT +ObjectMeta diff --git a/.github/ISSUE_TEMPLATE/blank_issue.md b/.github/ISSUE_TEMPLATE/blank_issue.md deleted file mode 100644 index a635f616f..000000000 --- a/.github/ISSUE_TEMPLATE/blank_issue.md +++ /dev/null @@ -1,5 +0,0 @@ ---- -name: Blank Issue -about: Create an issue with a blank template. -labels: 'needs triage' ---- diff --git a/.github/ISSUE_TEMPLATE/bugs.yml b/.github/ISSUE_TEMPLATE/bugs.yml deleted file mode 100644 index efe66dcc8..000000000 --- a/.github/ISSUE_TEMPLATE/bugs.yml +++ /dev/null @@ -1,58 +0,0 @@ -name: Bug Report -description: Report a bug found in the NATS Rust client -labels: ["bug", "needs triage"] -body: - - type: markdown - attributes: - value: | - Make sure you fill all the information in this form and include a [Minimal, Complete, and Verifiable example](https://stackoverflow.com/help/mcve) - - type: input - id: nats_version - attributes: - label: NATS version - description: What is the version of NATS Rust client that you're using? - placeholder: | - Run: grep 'name = "nats"' Cargo.lock -A 1 - validations: - required: true - - type: input - id: rusts_version - attributes: - label: rustc version - description: What is the version of rustc that you're using? - placeholder: | - Run: rustc --version (we support Rust 1.41 and up) - validations: - required: true - - type: input - id: os_container_env - attributes: - label: OS/Container environment - description: What is the OS or container environment you're running the NATS Rust client on? - placeholder: ex. Debian 11.6 - validations: - required: true - - type: textarea - id: steps_to_reproduce - attributes: - label: Steps or code to reproduce the issue - description: How can we reproduce the issue? - placeholder: Your steps/code to reproduce the issue - validations: - requred: true - - type: textarea - id: expected_result - attributes: - label: Expected result - description: What is the expected result? - placeholder: Your expected result - validations: - required: true - - type: textarea - id: actual_result - attributes: - label: Actual result - description: What is the actual result? - placeholder: Your actual result - validations: - required: true diff --git a/.github/ISSUE_TEMPLATE/config.yml b/.github/ISSUE_TEMPLATE/config.yml index e989d6e87..b98b0060f 100644 --- a/.github/ISSUE_TEMPLATE/config.yml +++ b/.github/ISSUE_TEMPLATE/config.yml @@ -1,5 +1,8 @@ -blank_issues_enabled: true +blank_issues_enabled: false contact_links: - - name: NATS Slack + - name: Discussion + url: https://github.com/nats-io/nats.rs/discussions + about: Ideal for ideas, feedback, or longer form questions. + - name: Chat url: https://slack.nats.io - about: Please ask and answer questions in the rust channel here. + about: Ideal for short, one-off questions, general conversation, and meeting other NATS users! diff --git a/.github/ISSUE_TEMPLATE/defect.yml b/.github/ISSUE_TEMPLATE/defect.yml new file mode 100644 index 000000000..99b4800a8 --- /dev/null +++ b/.github/ISSUE_TEMPLATE/defect.yml @@ -0,0 +1,41 @@ +--- +name: Defect +description: Report a defect, such as a bug or regression. +labels: + - defect +body: + - type: textarea + id: versions + attributes: + label: What version were you using? + description: Include the server version (`nats-server --version`) and any client versions when observing the issue. + validations: + required: true + - type: textarea + id: environment + attributes: + label: What environment was the server running in? + description: This pertains to the operating system, CPU architecture, and/or Docker image that was used. + validations: + required: true + - type: textarea + id: steps + attributes: + label: Is this defect reproducible? + description: Provide best-effort steps to showcase the defect. + validations: + required: true + - type: textarea + id: expected + attributes: + label: Given the capability you are leveraging, describe your expectation? + description: This may be the expected behavior or performance characteristics. + validations: + required: true + - type: textarea + id: actual + attributes: + label: Given the expectation, what is the defect you are observing? + description: This may be an unexpected behavior or regression in performance. + validations: + required: true diff --git a/.github/ISSUE_TEMPLATE/feature_request.yml b/.github/ISSUE_TEMPLATE/feature_request.yml deleted file mode 100644 index a420112df..000000000 --- a/.github/ISSUE_TEMPLATE/feature_request.yml +++ /dev/null @@ -1,36 +0,0 @@ -name: Feature Request -description: Request a feature for the NATS Rust client -labels: ["enhancement", "needs triage"] -body: - - type: textarea - id: use_case - attributes: - label: Use case - description: What is the use case for this feature? - placeholder: Info about the use case with details... - validations: - required: true - - type: textarea - id: proposed_change - attributes: - label: Proposed change - description: What is the change that you propose? - placeholder: Details about the proposed change... - validations: - required: true - - type: input - id: who_benefits - attributes: - label: Who benefits from the change(s)? - description: Who will this be useful to? - placeholder: Beneficiaries... - validations: - required: true - - type: textarea - id: alt_approaches - attributes: - label: Alternative Approaches - description: Are there any alternative approaches? - placeholder: Alt approaches if any... - validations: - required: false diff --git a/.github/ISSUE_TEMPLATE/proposal.yml b/.github/ISSUE_TEMPLATE/proposal.yml new file mode 100644 index 000000000..d7da0ca49 --- /dev/null +++ b/.github/ISSUE_TEMPLATE/proposal.yml @@ -0,0 +1,34 @@ +--- +name: Proposal +description: Propose an enhancement or new feature. +labels: + - proposal +body: + - type: textarea + id: usecase + attributes: + label: What motivated this proposal? + description: Describe the use case justifying this request. + validations: + required: true + - type: textarea + id: change + attributes: + label: What is the proposed change? + description: This could be a behavior change, enhanced API, or a branch new feature. + validations: + required: true + - type: textarea + id: benefits + attributes: + label: Who benefits from this change? + description: Describe how this not only benefits you. + validations: + required: false + - type: textarea + id: alternates + attributes: + label: What alternatives have you evaluated? + description: This could be using existing features or relying on an external dependency. + validations: + required: false diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 4071d5a6c..cb5d75e00 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -183,8 +183,8 @@ jobs: run: | set -eo pipefail echo "msrv check" - rustup install 1.65.0 - cargo +1.65.0 check + rustup install 1.67.0 + cargo +1.67.0 check check_examples: name: check (examples) diff --git a/README.md b/README.md index 7ae9766de..d9a803996 100644 --- a/README.md +++ b/README.md @@ -26,7 +26,7 @@ There are two clients available in two separate crates: [![License Apache 2](https://img.shields.io/badge/License-Apache2-blue.svg)](https://www.apache.org/licenses/LICENSE-2.0) [![Crates.io](https://img.shields.io/crates/v/async-nats.svg)](https://crates.io/crates/async-nats) [![Documentation](https://docs.rs/async-nats/badge.svg)](https://docs.rs/async-nats/) -[![Build Status](https://github.com/nats-io/nats.rs/workflows/Rust/badge.svg)](https://github.com/nats-io/nats.rs/actions) +[![Build Status](https://github.com/nats-io/nats.rs/actions/workflows/test.yml/badge.svg?branch=main)](https://github.com/nats-io/nats.rs/actions) New async Tokio-based NATS client. @@ -48,7 +48,7 @@ Any feedback related to this client is welcomed. [![License Apache 2](https://img.shields.io/badge/License-Apache2-blue.svg)](https://www.apache.org/licenses/LICENSE-2.0) [![Crates.io](https://img.shields.io/crates/v/nats.svg)](https://crates.io/crates/nats) [![Documentation](https://docs.rs/nats/badge.svg)](https://docs.rs/nats/) -[![Build Status](https://github.com/nats-io/nats.rs/workflows/Rust/badge.svg)](https://github.com/nats-io/nats.rs/actions) +[![Build Status](https://github.com/nats-io/nats.rs/actions/workflows/test.yml/badge.svg?branch=main)](https://github.com/nats-io/nats.rs/actions) Legacy *synchronous* client that supports: diff --git a/async-nats/Cargo.toml b/async-nats/Cargo.toml index 8e55cd322..32679f76b 100644 --- a/async-nats/Cargo.toml +++ b/async-nats/Cargo.toml @@ -3,7 +3,7 @@ name = "async-nats" authors = ["Tomasz Pietrek ", "Casper Beyer "] version = "0.31.0" edition = "2021" -rust = "1.64.0" +rust = "1.67.0" description = "A async Rust NATS client" license = "Apache-2.0" documentation = "https://docs.rs/async-nats" @@ -16,22 +16,22 @@ categories = ["network-programming", "api-bindings"] [dependencies] memchr = "2.4" bytes = { version = "1.4.0", features = ["serde"] } -futures = { version = "0.3.26", default-features = false, features = ["std", "async-await"] } +futures = { version = "0.3.28", default-features = false, features = ["std", "async-await"] } nkeys = "0.3.0" -once_cell = "1.17.1" -regex = "1.7.1" -serde = { version = "1.0.152", features = ["derive"] } -serde_json = "1.0.93" -serde_repr = "0.1.10" +once_cell = "1.18.0" +regex = "1.9.1" +serde = { version = "1.0.184", features = ["derive"] } +serde_json = "1.0.104" +serde_repr = "0.1.16" http = "0.2.9" -tokio = { version = "1.25.0", features = ["macros", "rt", "fs", "net", "sync", "time", "io-util"] } +tokio = { version = "1.29.0", features = ["macros", "rt", "fs", "net", "sync", "time", "io-util"] } itoa = "1" url = { version = "2"} tokio-rustls = "0.24" rustls-pemfile = "1.0.2" -nuid = "0.4.1" +nuid = "0.5" serde_nanos = "0.1.3" -time = { version = "0.3.20", features = ["parsing", "formatting", "serde", "serde-well-known"] } +time = { version = "0.3.24", features = ["parsing", "formatting", "serde", "serde-well-known"] } rustls-native-certs = "0.6" tracing = "0.1" thiserror = "1.0" @@ -39,7 +39,7 @@ base64 = "0.21" tokio-retry = "0.3" ring = "0.16" rand = "0.8" -webpki = { package = "rustls-webpki", version = "0.101.1", features = ["alloc", "std"] } +webpki = { package = "rustls-webpki", version = "0.101.2", features = ["alloc", "std"] } [dev-dependencies] criterion = { version = "0.5", features = ["async_tokio"]} @@ -58,6 +58,6 @@ slow_tests = [] [[bench]] -name = "core_nats" +name = "main" harness = false lto = true diff --git a/async-nats/benches/core_nats.rs b/async-nats/benches/core_nats.rs index ca3a275e0..f9dc9f99a 100644 --- a/async-nats/benches/core_nats.rs +++ b/async-nats/benches/core_nats.rs @@ -1,43 +1,51 @@ -use criterion::{criterion_group, criterion_main, Criterion}; +use std::time::Duration; + +use bytes::Bytes; +use criterion::{criterion_group, Criterion}; use futures::stream::StreamExt; +static MSG: &[u8] = &[22; 32768]; + pub fn publish(c: &mut Criterion) { + let messages_per_iter = 500_000; let server = nats_server::run_basic_server(); - let mut throughput_group = c.benchmark_group("async-nats: publish throughput"); - throughput_group.sample_size(30); + let mut throughput_group = c.benchmark_group("nats::publish_throughput"); + throughput_group.sample_size(10); throughput_group.warm_up_time(std::time::Duration::from_secs(1)); - let bmsg: Vec = (0..32768).map(|_| 22).collect(); - for size in [32, 1024, 8192].iter() { - throughput_group.throughput(criterion::Throughput::Bytes(*size as u64 * 100)); + for &size in [32, 1024, 8192].iter() { + throughput_group.throughput(criterion::Throughput::Bytes( + size as u64 * messages_per_iter, + )); throughput_group.bench_with_input( criterion::BenchmarkId::from_parameter(size), - size, + &size, |b, _| { let rt = tokio::runtime::Runtime::new().unwrap(); let nc = rt.block_on(async { async_nats::connect(server.client_url()).await.unwrap() }); - let msg = &bmsg[0..*size]; - b.to_async(rt).iter(move || { + b.to_async(rt).iter_with_large_drop(move || { let nc = nc.clone(); - async move { publish_messages(nc, msg, 100).await } + async move { + publish_messages(nc, Bytes::from_static(&MSG[..size]), messages_per_iter) + .await + } }); }, ); } throughput_group.finish(); - let mut messages_group = c.benchmark_group("async-nats: publish messages amount"); - messages_group.sample_size(30); + let mut messages_group = c.benchmark_group("nats::publish_amount"); + messages_group.sample_size(10); messages_group.warm_up_time(std::time::Duration::from_secs(1)); - let bmsg: Vec = (0..32768).map(|_| 22).collect(); - for size in [32, 1024, 8192].iter() { - messages_group.throughput(criterion::Throughput::Elements(100)); + for &size in [32, 1024, 8192].iter() { + messages_group.throughput(criterion::Throughput::Elements(messages_per_iter)); messages_group.bench_with_input( criterion::BenchmarkId::from_parameter(size), - size, + &size, |b, _| { let rt = tokio::runtime::Runtime::new().unwrap(); let nc = rt.block_on(async { @@ -46,11 +54,13 @@ pub fn publish(c: &mut Criterion) { nc.flush().await.unwrap(); nc }); - let msg = &bmsg[0..*size]; - b.to_async(rt).iter(move || { + b.to_async(rt).iter_with_large_drop(move || { let nc = nc.clone(); - async move { publish_messages(nc, msg, 100).await } + async move { + publish_messages(nc, Bytes::from_static(&MSG[..size]), messages_per_iter) + .await + } }); }, ); @@ -60,64 +70,134 @@ pub fn publish(c: &mut Criterion) { pub fn subscribe(c: &mut Criterion) { let server = nats_server::run_basic_server(); + let messages_per_iter = 500_000; - let mut subscribe_amount_group = c.benchmark_group("subscribe amount"); - subscribe_amount_group.sample_size(30); - subscribe_amount_group.warm_up_time(std::time::Duration::from_secs(1)); + let mut subscribe_amount_group = c.benchmark_group("nats::subscribe_amount"); + subscribe_amount_group.sample_size(10); - for size in [32, 1024, 8192].iter() { - subscribe_amount_group.throughput(criterion::Throughput::Elements(100)); + for &size in [32, 1024, 8192].iter() { + let url = server.client_url(); + subscribe_amount_group.throughput(criterion::Throughput::Elements(messages_per_iter)); subscribe_amount_group.bench_with_input( criterion::BenchmarkId::from_parameter(size), - size, - |b, _| { + &size, + move |b, _| { let rt = tokio::runtime::Runtime::new().unwrap(); - let nc = rt.block_on(async { - let nc = async_nats::connect(server.client_url()).await.unwrap(); - + let url = url.clone(); + let nc = rt.block_on(async move { + let nc = async_nats::ConnectOptions::new() + .connect(url.clone()) + .await + .unwrap(); + let (started, ready) = tokio::sync::oneshot::channel(); tokio::task::spawn({ - let nc = nc.clone(); async move { - let bmsg: Vec = (0..32768).map(|_| 22).collect(); - let msg = &bmsg[0..*size].to_vec(); + let client = async_nats::ConnectOptions::new() + .connect(url) + .await + .unwrap(); + started.send(()).unwrap(); loop { - nc.publish("bench".to_string(), msg.clone().into()) + client + .publish("bench".to_string(), Bytes::from_static(&MSG[..size])) .await - .unwrap(); + .unwrap() } } }); nc.publish("data".to_string(), "data".into()).await.unwrap(); nc.flush().await.unwrap(); + ready.await.unwrap(); nc }); - b.to_async(rt).iter(move || { + b.to_async(rt).iter_with_large_drop(move || { + let nc = nc.clone(); + async move { subscribe_messages(nc, messages_per_iter).await } + }); + }, + ); + } + subscribe_amount_group.finish(); +} + +pub fn request(c: &mut Criterion) { + let server = nats_server::run_basic_server(); + let messages_per_iter = 10_000; + + let mut subscribe_amount_group = c.benchmark_group("nats::request_amount"); + subscribe_amount_group.sample_size(10); + + for &size in [32, 1024, 8192].iter() { + let url = server.client_url(); + subscribe_amount_group.throughput(criterion::Throughput::Elements(messages_per_iter)); + subscribe_amount_group.bench_with_input( + criterion::BenchmarkId::from_parameter(size), + &size, + move |b, _| { + let rt = tokio::runtime::Runtime::new().unwrap(); + let url = url.clone(); + let nc = rt.block_on(async move { + let nc = async_nats::ConnectOptions::new() + .connect(url.clone()) + .await + .unwrap(); + let (started, ready) = tokio::sync::oneshot::channel(); + tokio::task::spawn({ + async move { + let client = async_nats::ConnectOptions::new() + .connect(url) + .await + .unwrap(); + + let mut subscription = client.subscribe("bench".into()).await.unwrap(); + tokio::time::sleep(Duration::from_secs(3)).await; + started.send(()).unwrap(); + + while let Some(request) = subscription.next().await { + client + .publish(request.reply.unwrap(), "".into()) + .await + .unwrap(); + client.flush().await.unwrap(); + } + } + }); + nc.flush().await.unwrap(); + ready.await.unwrap(); + nc + }); + b.to_async(rt).iter_with_large_drop(move || { let nc = nc.clone(); - async move { subscribe_messages(nc, 100).await } + async move { + requests(nc, Bytes::from_static(&MSG[..size]), messages_per_iter).await + } }); }, ); } subscribe_amount_group.finish(); } -async fn publish_messages(nc: async_nats::Client, msg: &'_ [u8], amount: usize) { - let msg = msg.to_vec(); + +async fn requests(nc: async_nats::Client, msg: Bytes, amount: u64) { + for _i in 0..amount { + nc.request("bench".into(), msg.clone()).await.unwrap(); + } +} + +async fn publish_messages(nc: async_nats::Client, msg: Bytes, amount: u64) { for _i in 0..amount { - nc.publish("bench".into(), msg.clone().into()) - .await - .unwrap(); + nc.publish("bench".into(), msg.clone()).await.unwrap(); } nc.flush().await.unwrap(); } -async fn subscribe_messages(nc: async_nats::Client, amount: usize) { +async fn subscribe_messages(nc: async_nats::Client, amount: u64) { let mut sub = nc.subscribe("bench".into()).await.unwrap(); for _ in 0..amount { sub.next().await.unwrap(); } } -criterion_group!(benches, publish, subscribe); -criterion_main!(benches); +criterion_group!(core_nats, publish, subscribe, request); diff --git a/async-nats/benches/jetstream.rs b/async-nats/benches/jetstream.rs new file mode 100644 index 000000000..fc5fea139 --- /dev/null +++ b/async-nats/benches/jetstream.rs @@ -0,0 +1,216 @@ +use std::future::IntoFuture; + +use async_nats::jetstream::stream; +use bytes::Bytes; +use criterion::{criterion_group, Criterion}; + +static MSG: &[u8] = &[22; 32768]; + +pub fn jetstream_publish_sync(c: &mut Criterion) { + let messages_per_iter = 50_000; + let server = nats_server::run_server("tests/configs/jetstream.conf"); + let mut throughput_group = c.benchmark_group("jetstream::sync_publish_throughput"); + throughput_group.sample_size(10); + throughput_group.warm_up_time(std::time::Duration::from_secs(1)); + + for &size in [32, 1024, 8192].iter() { + throughput_group.throughput(criterion::Throughput::Bytes( + size as u64 * messages_per_iter, + )); + throughput_group.bench_with_input( + criterion::BenchmarkId::from_parameter(size), + &size, + |b, _| { + let rt = tokio::runtime::Runtime::new().unwrap(); + let context = rt.block_on(async { + let context = async_nats::jetstream::new( + async_nats::connect(server.client_url()).await.unwrap(), + ); + + let stream = context + .create_stream(stream::Config { + name: "bench".to_owned(), + subjects: vec!["bench".to_string()], + ..Default::default() + }) + .await + .unwrap(); + stream.purge().await.unwrap(); + context + }); + + b.to_async(rt).iter_with_large_drop(move || { + let nc = context.clone(); + async move { + publish_sync_batch(nc, Bytes::from_static(&MSG[..size]), messages_per_iter) + .await + } + }); + }, + ); + } + throughput_group.finish(); + + let mut messages_group = c.benchmark_group("jetstream sync publish messages amount"); + messages_group.sample_size(10); + messages_group.warm_up_time(std::time::Duration::from_secs(1)); + + for &size in [32, 1024, 8192].iter() { + messages_group.throughput(criterion::Throughput::Elements(messages_per_iter)); + messages_group.bench_with_input( + criterion::BenchmarkId::from_parameter(size), + &size, + |b, _| { + let rt = tokio::runtime::Runtime::new().unwrap(); + let context = rt.block_on(async { + let context = async_nats::jetstream::new( + async_nats::connect(server.client_url()).await.unwrap(), + ); + + let stream = context + .create_stream(stream::Config { + name: "bench".to_owned(), + subjects: vec!["bench".to_string()], + ..Default::default() + }) + .await + .unwrap(); + stream.purge().await.unwrap(); + context + }); + + b.to_async(rt).iter_with_large_drop(move || { + let context = context.clone(); + async move { + publish_sync_batch( + context, + Bytes::from_static(&MSG[..size]), + messages_per_iter, + ) + .await + } + }); + }, + ); + } + messages_group.finish(); +} + +pub fn jetstream_publish_async(c: &mut Criterion) { + let messages_per_iter = 50_000; + let server = nats_server::run_server("tests/configs/jetstream.conf"); + let mut throughput_group = c.benchmark_group("jetstream async publish throughput"); + throughput_group.sample_size(10); + throughput_group.warm_up_time(std::time::Duration::from_secs(1)); + + for &size in [32, 1024, 8192].iter() { + throughput_group.throughput(criterion::Throughput::Bytes( + size as u64 * messages_per_iter, + )); + throughput_group.bench_with_input( + criterion::BenchmarkId::from_parameter(size), + &size, + |b, _| { + let rt = tokio::runtime::Runtime::new().unwrap(); + let context = rt.block_on(async { + let context = async_nats::jetstream::new( + async_nats::connect(server.client_url()).await.unwrap(), + ); + + let stream = context + .create_stream(stream::Config { + name: "bench".to_owned(), + subjects: vec!["bench".to_string()], + ..Default::default() + }) + .await + .unwrap(); + stream.purge().await.unwrap(); + context + }); + + b.to_async(rt).iter_with_large_drop(move || { + let nc = context.clone(); + async move { + publish_async_batch(nc, Bytes::from_static(&MSG[..size]), messages_per_iter) + .await + } + }); + }, + ); + } + throughput_group.finish(); + + let mut messages_group = c.benchmark_group("jetstream::async_publish_messages_amount"); + + messages_group.sample_size(10); + messages_group.warm_up_time(std::time::Duration::from_secs(1)); + + for &size in [32, 1024, 8192].iter() { + messages_group.throughput(criterion::Throughput::Elements(messages_per_iter)); + messages_group.bench_with_input( + criterion::BenchmarkId::from_parameter(size), + &size, + |b, _| { + let rt = tokio::runtime::Runtime::new().unwrap(); + let context = rt.block_on(async { + let context = async_nats::jetstream::new( + async_nats::connect(server.client_url()).await.unwrap(), + ); + + let stream = context + .create_stream(stream::Config { + name: "bench".to_owned(), + subjects: vec!["bench".to_string()], + ..Default::default() + }) + .await + .unwrap(); + stream.purge().await.unwrap(); + context + }); + + b.to_async(rt).iter_with_large_drop(move || { + let context = context.clone(); + async move { + publish_async_batch( + context, + Bytes::from_static(&MSG[..size]), + messages_per_iter, + ) + .await + } + }); + }, + ); + } + messages_group.finish(); +} +async fn publish_sync_batch(context: async_nats::jetstream::Context, msg: Bytes, amount: u64) { + for _i in 0..amount { + context + .publish("bench".into(), msg.clone()) + .await + .unwrap() + .await + .unwrap(); + } +} + +async fn publish_async_batch(context: async_nats::jetstream::Context, msg: Bytes, amount: u64) { + // This acts as a semaphore that does not allow for more than 10 publish acks awaiting. + let (tx, mut rx) = tokio::sync::mpsc::channel(10); + + let handle = tokio::task::spawn(async move { + for _ in 0..amount { + rx.recv().await.unwrap(); + } + }); + for _ in 0..amount { + let ack = context.publish("bench".into(), msg.clone()).await.unwrap(); + tx.send(ack.into_future()).await.unwrap(); + } + handle.await.unwrap(); +} + +criterion_group!(jetstream, jetstream_publish_sync, jetstream_publish_async); diff --git a/async-nats/benches/main.rs b/async-nats/benches/main.rs new file mode 100644 index 000000000..18fba05d3 --- /dev/null +++ b/async-nats/benches/main.rs @@ -0,0 +1,7 @@ +use criterion::criterion_main; + +// Import the benchmark groups from both files +mod core_nats; +mod jetstream; + +criterion_main!(core_nats::core_nats, jetstream::jetstream); diff --git a/async-nats/src/client.rs b/async-nats/src/client.rs index 882f54e40..94a15edf9 100644 --- a/async-nats/src/client.rs +++ b/async-nats/src/client.rs @@ -18,7 +18,7 @@ use super::{header::HeaderMap, status::StatusCode, Command, Message, Subscriber} use crate::error::Error; use bytes::Bytes; use futures::future::TryFutureExt; -use futures::stream::StreamExt; +use futures::StreamExt; use once_cell::sync::Lazy; use regex::Regex; use std::fmt::Display; @@ -26,11 +26,11 @@ use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use std::time::Duration; use thiserror::Error; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, oneshot}; use tracing::trace; static VERSION_RE: Lazy = - Lazy::new(|| Regex::new(r#"\Av?([0-9]+)\.?([0-9]+)?\.?([0-9]+)?"#).unwrap()); + Lazy::new(|| Regex::new(r"\Av?([0-9]+)\.?([0-9]+)?\.?([0-9]+)?").unwrap()); /// An error returned from the [`Client::publish`], [`Client::publish_with_headers`], /// [`Client::publish_with_reply`] or [`Client::publish_with_reply_and_headers`] functions. @@ -71,7 +71,7 @@ impl Client { info, state, sender, - next_subscription_id: Arc::new(AtomicU64::new(0)), + next_subscription_id: Arc::new(AtomicU64::new(1)), subscription_capacity: capacity, inbox_prefix, request_timeout, @@ -335,42 +335,83 @@ impl Client { subject: String, request: Request, ) -> Result { - let inbox = request.inbox.unwrap_or_else(|| self.new_inbox()); - let timeout = request.timeout.unwrap_or(self.request_timeout); - let mut sub = self.subscribe(inbox.clone()).await?; - let payload: Bytes = request.payload.unwrap_or_else(Bytes::new); - match request.headers { - Some(headers) => { - self.publish_with_reply_and_headers(subject, inbox, headers, payload) - .await? + if let Some(inbox) = request.inbox { + let timeout = request.timeout.unwrap_or(self.request_timeout); + let mut sub = self.subscribe(inbox.clone()).await?; + let payload: Bytes = request.payload.unwrap_or_else(Bytes::new); + match request.headers { + Some(headers) => { + self.publish_with_reply_and_headers(subject, inbox, headers, payload) + .await? + } + None => self.publish_with_reply(subject, inbox, payload).await?, } - None => self.publish_with_reply(subject, inbox, payload).await?, - } - self.flush() - .await - .map_err(|err| RequestError::with_source(RequestErrorKind::Other, err))?; - let request = match timeout { - Some(timeout) => { - tokio::time::timeout(timeout, sub.next()) - .map_err(|err| RequestError::with_source(RequestErrorKind::TimedOut, err)) - .await? + self.flush() + .await + .map_err(|err| RequestError::with_source(RequestErrorKind::Other, err))?; + let request = match timeout { + Some(timeout) => { + tokio::time::timeout(timeout, sub.next()) + .map_err(|err| RequestError::with_source(RequestErrorKind::TimedOut, err)) + .await? + } + None => sub.next().await, + }; + match request { + Some(message) => { + if message.status == Some(StatusCode::NO_RESPONDERS) { + return Err(RequestError::with_source( + RequestErrorKind::NoResponders, + "no responders", + )); + } + Ok(message) + } + None => Err(RequestError::with_source( + RequestErrorKind::Other, + "broken pipe", + )), } - None => sub.next().await, - }; - match request { - Some(message) => { - if message.status == Some(StatusCode::NO_RESPONDERS) { - return Err(RequestError::with_source( - RequestErrorKind::NoResponders, - "no responders", - )); + } else { + let (sender, receiver) = oneshot::channel(); + + let payload = request.payload.unwrap_or_else(Bytes::new); + let respond = self.new_inbox(); + let headers = request.headers; + + self.sender + .send(Command::Request { + subject, + payload, + respond, + headers, + sender, + }) + .map_err(|err| RequestError::with_source(RequestErrorKind::Other, err)) + .await?; + + let timeout = request.timeout.unwrap_or(self.request_timeout); + let request = match timeout { + Some(timeout) => { + tokio::time::timeout(timeout, receiver) + .map_err(|err| RequestError::with_source(RequestErrorKind::TimedOut, err)) + .await? + } + None => receiver.await, + }; + + match request { + Ok(message) => { + if message.status == Some(StatusCode::NO_RESPONDERS) { + return Err(RequestError::with_source( + RequestErrorKind::NoResponders, + "no responders", + )); + } + Ok(message) } - Ok(message) + Err(err) => Err(RequestError::with_source(RequestErrorKind::Other, err)), } - None => Err(RequestError::with_source( - RequestErrorKind::Other, - "broken pipe", - )), } } diff --git a/async-nats/src/connection.rs b/async-nats/src/connection.rs index e1104d781..2790de2a1 100644 --- a/async-nats/src/connection.rs +++ b/async-nats/src/connection.rs @@ -22,7 +22,7 @@ use tokio::io::{AsyncReadExt, AsyncWrite}; use bytes::{Buf, BytesMut}; use tokio::io; -use crate::header::{HeaderMap, HeaderName}; +use crate::header::{HeaderMap, HeaderName, IntoHeaderValue}; use crate::status::StatusCode; use crate::{ClientOp, ServerError, ServerOp}; @@ -304,7 +304,7 @@ impl Connection { } value.truncate(value.trim_end().len()); - headers.append(name, value); + headers.append(name, value.into_header_value()); } return Ok(Some(ServerOp::Message { diff --git a/async-nats/src/error/mod.rs b/async-nats/src/error.rs similarity index 87% rename from async-nats/src/error/mod.rs rename to async-nats/src/error.rs index 0bbf12e14..13cd633af 100644 --- a/async-nats/src/error/mod.rs +++ b/async-nats/src/error.rs @@ -1,3 +1,16 @@ +// Copyright 2020-2023 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + use std::fmt::{Debug, Display}; /// The error type for the NATS client, generic by the kind of error. diff --git a/async-nats/src/header.rs b/async-nats/src/header.rs index d0aa63906..fcc7d3ba7 100644 --- a/async-nats/src/header.rs +++ b/async-nats/src/header.rs @@ -20,7 +20,7 @@ //! NATS [Message][crate::Message] headers, modeled loosely after the [http::header] crate. -use std::{collections::HashMap, fmt, slice, str::FromStr}; +use std::{collections::HashMap, fmt, slice::Iter, str::FromStr}; use bytes::Bytes; use serde::{Deserialize, Serialize}; @@ -46,7 +46,7 @@ use serde::{Deserialize, Serialize}; #[derive(Clone, PartialEq, Eq, Debug, Default, Deserialize, Serialize)] pub struct HeaderMap { - inner: HashMap, + inner: HashMap>, } impl FromIterator<(HeaderName, HeaderValue)> for HeaderMap { @@ -60,11 +60,23 @@ impl FromIterator<(HeaderName, HeaderValue)> for HeaderMap { } impl HeaderMap { - pub fn iter(&self) -> std::collections::hash_map::Iter<'_, HeaderName, HeaderValue> { + pub fn iter(&self) -> std::collections::hash_map::Iter<'_, HeaderName, Vec> { self.inner.iter() } } +pub struct GetAll<'a, T> { + inner: Iter<'a, T>, +} + +impl<'a, T> Iterator for GetAll<'a, T> { + type Item = &'a T; + + fn next(&mut self) -> Option { + self.inner.next() + } +} + impl HeaderMap { /// Create an empty `HeaderMap`. /// @@ -113,7 +125,7 @@ impl HeaderMap { /// ``` pub fn insert(&mut self, name: K, value: V) { self.inner - .insert(name.into_header_name(), value.into_header_value()); + .insert(name.into_header_name(), vec![value.into_header_value()]); } /// Appends a new value to the list of values to a given key. @@ -127,16 +139,15 @@ impl HeaderMap { /// let mut headers = HeaderMap::new(); /// headers.append("Key", "Value"); /// headers.append("Key", "Another"); - /// ``` - pub fn append(&mut self, name: K, value: V) { + pub fn append(&mut self, name: K, value: V) { let key = name.into_header_name(); let v = self.inner.get_mut(&key); match v { Some(v) => { - v.value.push(value.to_string()); + v.push(value.into_header_value()); } None => { - self.insert(key, value.to_string().into_header_value()); + self.insert(key, value.into_header_value()); } } } @@ -150,10 +161,36 @@ impl HeaderMap { /// /// let mut headers = HeaderMap::new(); /// headers.append("Key", "Value"); - /// let key = headers.get("Key").unwrap(); + /// let values = headers.get("Key").unwrap(); /// ``` - pub fn get(&self, name: T) -> Option<&HeaderValue> { - self.inner.get(&name.into_header_name()) + pub fn get(&self, key: K) -> Option<&HeaderValue> { + self.inner + .get(&key.into_header_name()) + .and_then(|x| x.first()) + } + + /// Gets an iterator to the values for a given key. + /// + /// # Examples + /// + /// ``` + /// # use async_nats::HeaderMap; + /// + /// let mut headers = HeaderMap::new(); + /// headers.append("Key", "Value1"); + /// headers.append("Key", "Value2"); + /// let mut values = headers.get_all("Key"); + /// let value1 = values.next(); + /// let value2 = values.next(); + /// ``` + pub fn get_all(&self, key: K) -> GetAll { + let inner = self + .inner + .get(&key.into_header_name()) + .map(|x| x.iter()) + .unwrap_or([].iter()); + + GetAll { inner } } pub(crate) fn to_bytes(&self) -> Vec { @@ -163,7 +200,7 @@ impl HeaderMap { for v in vs.iter() { buf.extend_from_slice(k.as_str().as_bytes()); buf.extend_from_slice(b": "); - buf.extend_from_slice(v.as_bytes()); + buf.extend_from_slice(v.inner.as_bytes()); buf.extend_from_slice(b"\r\n"); } } @@ -172,8 +209,7 @@ impl HeaderMap { } } -/// A struct representing value of a given header. -/// Can contain one or more elements. +/// Represents NATS header field value. /// /// # Examples /// @@ -186,89 +222,120 @@ impl HeaderMap { /// ``` #[derive(Clone, PartialEq, Eq, Debug, Default, Serialize, Deserialize)] pub struct HeaderValue { - value: Vec, + inner: String, } -impl ToString for HeaderValue { - fn to_string(&self) -> String { - self.iter() - .next() - .cloned() - .unwrap_or_else(|| String::from("")) +impl fmt::Display for HeaderValue { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Display::fmt(&self.as_str(), f) } } -impl From for String { - fn from(header: HeaderValue) -> Self { - header.to_string() +impl AsRef<[u8]> for HeaderValue { + fn as_ref(&self) -> &[u8] { + self.inner.as_ref() } } -impl From<&HeaderValue> for String { - fn from(header: &HeaderValue) -> Self { - header.to_string() + +impl AsRef for HeaderValue { + fn as_ref(&self) -> &str { + self.as_str() } } -impl<'a> From<&'a HeaderValue> for &'a str { - fn from(header: &'a HeaderValue) -> Self { - header - .iter() - .next() - .map(|v| v.as_str()) - .unwrap_or_else(|| "") +impl From for HeaderValue { + fn from(v: i16) -> Self { + Self { + inner: v.to_string(), + } } } -impl FromStr for HeaderValue { - type Err = ParseHeaderValueError; +impl From for HeaderValue { + fn from(v: i32) -> Self { + Self { + inner: v.to_string(), + } + } +} - fn from_str(s: &str) -> Result { - if s.contains(['\r', '\n']) { - return Err(ParseHeaderValueError); +impl From for HeaderValue { + fn from(v: i64) -> Self { + Self { + inner: v.to_string(), } + } +} - let mut set = HeaderValue::new(); - set.value.push(s.to_string()); - Ok(set) +impl From for HeaderValue { + fn from(v: isize) -> Self { + Self { + inner: v.to_string(), + } + } +} + +impl From for HeaderValue { + fn from(v: u16) -> Self { + Self { + inner: v.to_string(), + } + } +} + +impl From for HeaderValue { + fn from(v: u32) -> Self { + Self { + inner: v.to_string(), + } } } impl From for HeaderValue { fn from(v: u64) -> Self { - let mut set = HeaderValue::new(); - set.value.push(v.to_string()); - set + Self { + inner: v.to_string(), + } } } -impl From<&str> for HeaderValue { - fn from(v: &str) -> Self { - let mut set = HeaderValue::new(); - set.value.push(v.to_string()); - set + +impl From for HeaderValue { + fn from(v: usize) -> Self { + Self { + inner: v.to_string(), + } } } -impl IntoIterator for HeaderValue { - type Item = String; +impl FromStr for HeaderValue { + type Err = ParseHeaderValueError; + + fn from_str(s: &str) -> Result { + if s.contains(['\r', '\n']) { + return Err(ParseHeaderValueError); + } - type IntoIter = std::vec::IntoIter; + Ok(HeaderValue { + inner: s.to_string(), + }) + } +} - fn into_iter(self) -> Self::IntoIter { - self.value.into_iter() +impl From<&str> for HeaderValue { + fn from(v: &str) -> Self { + Self { + inner: v.to_string(), + } } } impl HeaderValue { - pub fn new() -> HeaderValue { + pub fn new() -> Self { HeaderValue::default() } - pub fn iter(&self) -> slice::Iter { - self.value.iter() - } - pub fn as_str(&self) -> &str { - self.into() + self.inner.as_str() } } @@ -307,11 +374,12 @@ impl IntoHeaderName for HeaderName { pub trait IntoHeaderValue { fn into_header_value(self) -> HeaderValue; } + impl IntoHeaderValue for &str { fn into_header_value(self) -> HeaderValue { - let mut set = HeaderValue::new(); - set.value.push(self.to_string()); - set + HeaderValue { + inner: self.to_string(), + } } } @@ -561,10 +629,19 @@ mod tests { headers.append("Key", "value"); headers.append("Key", "second_value"); + let mut result = headers.get_all("Key"); + assert_eq!( - headers.get("Key").unwrap().value, - Vec::from_iter(["value".to_string(), "second_value".to_string()]) + result.next().unwrap(), + &HeaderValue::from_str("value").unwrap() ); + + assert_eq!( + result.next().unwrap(), + &HeaderValue::from_str("second_value").unwrap() + ); + + assert_eq!(result.next(), None); } #[test] @@ -575,10 +652,10 @@ mod tests { assert_eq!(headers.get("Key").unwrap().to_string(), "value"); - let key: String = headers.get("Key").unwrap().into(); + let key: String = headers.get("Key").unwrap().as_str().into(); assert_eq!(key, "value".to_string()); - let key: String = headers.get("Key").unwrap().to_owned().into(); + let key: String = headers.get("Key").unwrap().as_str().to_owned(); assert_eq!(key, "value".to_string()); assert_eq!(headers.get("Key").unwrap().as_str(), "value"); @@ -589,10 +666,13 @@ mod tests { let mut headers = HeaderMap::new(); headers.insert("Key", "Value"); + let mut result = headers.get_all("Key"); + assert_eq!( - headers.get("Key").unwrap().value, - Vec::from_iter(["Value".to_string()]) + result.next().unwrap(), + &HeaderValue::from_str("Value").unwrap() ); + assert_eq!(result.next(), None); } #[test] diff --git a/async-nats/src/jetstream/consumer/pull.rs b/async-nats/src/jetstream/consumer/pull.rs index 48c308eb6..c6a711304 100644 --- a/async-nats/src/jetstream/consumer/pull.rs +++ b/async-nats/src/jetstream/consumer/pull.rs @@ -398,7 +398,7 @@ impl futures::Stream for Batch { Poll::Ready(maybe_message) => match maybe_message { Some(message) => match message.status.unwrap_or(StatusCode::OK) { StatusCode::TIMEOUT => { - debug!("recived timeout. Iterator done."); + debug!("received timeout. Iterator done."); self.terminated = true; Poll::Ready(None) } @@ -1128,24 +1128,20 @@ impl futures::Stream for Stream { .headers .as_ref() .and_then(|headers| headers.get("Nats-Pending-Messages")) - .map(|h| h.iter()) - .and_then(|mut i| i.next()) - .map(|e| e.parse::()) - .unwrap_or(Ok(self.batch_config.batch)) + .map_or(Ok(self.batch_config.batch), |x| x.as_str().parse()) .map_err(|err| { MessagesError::with_source(MessagesErrorKind::Other, err) })?; + let pending_bytes = message .headers .as_ref() .and_then(|headers| headers.get("Nats-Pending-Bytes")) - .map(|h| h.iter()) - .and_then(|mut i| i.next()) - .map(|e| e.parse::()) - .unwrap_or(Ok(self.batch_config.max_bytes)) + .map_or(Ok(self.batch_config.max_bytes), |x| x.as_str().parse()) .map_err(|err| { MessagesError::with_source(MessagesErrorKind::Other, err) })?; + debug!( "timeout reached. remaining messages: {}, bytes {}", pending_messages, pending_bytes diff --git a/async-nats/src/jetstream/consumer/push.rs b/async-nats/src/jetstream/consumer/push.rs index 10363a4f5..87a59a03f 100644 --- a/async-nats/src/jetstream/consumer/push.rs +++ b/async-nats/src/jetstream/consumer/push.rs @@ -657,14 +657,12 @@ impl<'a> futures::Stream for Ordered<'a> { headers.get(crate::header::NATS_LAST_CONSUMER) { let sequence: u64 = - sequence.iter().next().unwrap().parse().map_err( - |err| { - OrderedError::with_source( - OrderedErrorKind::Other, - err, - ) - }, - )?; + sequence.as_str().parse().map_err(|err| { + OrderedError::with_source( + OrderedErrorKind::Other, + err, + ) + })?; let last_sequence = self.consumer_sequence.load(Ordering::Relaxed); diff --git a/async-nats/src/jetstream/kv/mod.rs b/async-nats/src/jetstream/kv/mod.rs index 57fe3d7e4..f30db892d 100644 --- a/async-nats/src/jetstream/kv/mod.rs +++ b/async-nats/src/jetstream/kv/mod.rs @@ -56,16 +56,19 @@ fn kv_operation_from_message(message: &Message) -> Result .as_ref() .ok_or_else(|| EntryError::with_source(EntryErrorKind::Other, "missing headers"))?; - headers - .get(KV_OPERATION) - .map(|x| x.iter().next().unwrap().as_str()) - .unwrap_or(KV_OPERATION_PUT) - .parse() - .map_err(|err| EntryError::with_source(EntryErrorKind::Other, err)) + if let Some(op) = headers.get(KV_OPERATION) { + Operation::from_str(op.as_str()) + .map_err(|err| EntryError::with_source(EntryErrorKind::Other, err)) + } else { + Err(EntryError::with_source( + EntryErrorKind::Other, + "missing operation", + )) + } } -static VALID_BUCKET_RE: Lazy = Lazy::new(|| Regex::new(r#"\A[a-zA-Z0-9_-]+\z"#).unwrap()); -static VALID_KEY_RE: Lazy = Lazy::new(|| Regex::new(r#"\A[-/_=\.a-zA-Z0-9]+\z"#).unwrap()); +static VALID_BUCKET_RE: Lazy = Lazy::new(|| Regex::new(r"\A[a-zA-Z0-9_-]+\z").unwrap()); +static VALID_KEY_RE: Lazy = Lazy::new(|| Regex::new(r"\A[-/_=\.a-zA-Z0-9]+\z").unwrap()); pub(crate) const MAX_HISTORY: i64 = 64; const ALL_KEYS: &str = ">"; @@ -304,14 +307,7 @@ impl Store { "missing sequence headers", ) })? - .iter() - .next() - .ok_or_else(|| { - EntryError::with_source( - EntryErrorKind::Other, - "did not found sequence header value", - ) - })? + .as_str() .parse() .map_err(|err| { EntryError::with_source( @@ -319,6 +315,7 @@ impl Store { format!("failed to parse headers sequence value: {}", err), ) })?; + let created = headers .get(header::NATS_TIME_STAMP) .ok_or_else(|| { @@ -326,17 +323,9 @@ impl Store { EntryErrorKind::Other, "did not found timestamp header", ) - })? - .iter() - .next() - .ok_or_else(|| { - EntryError::with_source( - EntryErrorKind::Other, - "did not found timestamp header value", - ) }) .and_then(|created| { - OffsetDateTime::parse(created, &Rfc3339).map_err(|err| { + OffsetDateTime::parse(created.as_str(), &Rfc3339).map_err(|err| { EntryError::with_source( EntryErrorKind::Other, format!( diff --git a/async-nats/src/jetstream/object_store/mod.rs b/async-nats/src/jetstream/object_store/mod.rs index b5f4be998..219bfce0b 100644 --- a/async-nats/src/jetstream/object_store/mod.rs +++ b/async-nats/src/jetstream/object_store/mod.rs @@ -20,6 +20,7 @@ use crate::{HeaderMap, HeaderValue}; use base64::engine::general_purpose::{STANDARD, URL_SAFE}; use base64::engine::Engine; use bytes::BytesMut; +use futures::future::BoxFuture; use once_cell::sync::Lazy; use ring::digest::SHA256; use tokio::io::AsyncReadExt; @@ -29,10 +30,10 @@ use regex::Regex; use serde::{Deserialize, Serialize}; use tracing::{debug, trace}; -use super::consumer::push::OrderedError; -use super::consumer::{StreamError, StreamErrorKind}; +use super::consumer::push::{OrderedConfig, OrderedError}; +use super::consumer::{DeliverPolicy, StreamError, StreamErrorKind}; use super::context::{PublishError, PublishErrorKind}; -use super::stream::{ConsumerError, ConsumerErrorKind, PurgeError, PurgeErrorKind}; +use super::stream::{self, ConsumerError, ConsumerErrorKind, PurgeError, PurgeErrorKind}; use super::{consumer::push::Ordered, stream::StorageType}; use crate::error::Error; use time::{serde::rfc3339, OffsetDateTime}; @@ -41,8 +42,8 @@ const DEFAULT_CHUNK_SIZE: usize = 128 * 1024; const NATS_ROLLUP: &str = "Nats-Rollup"; const ROLLUP_SUBJECT: &str = "sub"; -static BUCKET_NAME_RE: Lazy = Lazy::new(|| Regex::new(r#"\A[a-zA-Z0-9_-]+\z"#).unwrap()); -static OBJECT_NAME_RE: Lazy = Lazy::new(|| Regex::new(r#"\A[-/_=\.a-zA-Z0-9]+\z"#).unwrap()); +static BUCKET_NAME_RE: Lazy = Lazy::new(|| Regex::new(r"\A[a-zA-Z0-9_-]+\z").unwrap()); +static OBJECT_NAME_RE: Lazy = Lazy::new(|| Regex::new(r"\A[-/_=\.a-zA-Z0-9]+\z").unwrap()); pub(crate) fn is_valid_bucket_name(bucket_name: &str) -> bool { BUCKET_NAME_RE.is_match(bucket_name) @@ -107,26 +108,39 @@ impl ObjectStore { /// # Ok(()) /// # } /// ``` - pub async fn get>(&self, object_name: T) -> Result, GetError> { - let object_info = self.info(object_name).await?; - // if let Some(link) = object_info.link { - // return self.get(link.name).await; - // } - - let chunk_subject = format!("$O.{}.C.{}", self.name, object_info.nuid); - - let subscription = self - .stream - .create_consumer(crate::jetstream::consumer::push::OrderedConfig { - filter_subject: chunk_subject, - deliver_subject: self.stream.context.client.new_inbox(), - ..Default::default() - }) - .await? - .messages() - .await?; - - Ok(Object::new(subscription, object_info)) + pub fn get<'bucket, 'object, 'future, T>( + &'bucket self, + object_name: T, + ) -> BoxFuture<'future, Result, GetError>> + where + T: AsRef + Send + 'future, + 'bucket: 'future, + { + Box::pin(async move { + let object_info = self.info(object_name).await?; + if let Some(link) = object_info.link.as_ref() { + if let Some(link_name) = link.name.as_ref() { + let link_name = link_name.clone(); + debug!("getting object via link"); + if link.bucket == self.name { + return self.get(link_name).await; + } else { + let bucket = self + .stream + .context + .get_object_store(&link_name) + .await + .map_err(|err| GetError::with_source(GetErrorKind::Other, err))?; + let object = bucket.get(&link_name).await?; + return Ok(object); + } + } else { + return Err(GetError::new(GetErrorKind::BucketLink)); + } + } + debug!("not a link. Getting the object"); + Ok(Object::new(object_info, self.stream.clone())) + }) } /// Gets an [Object] from the [ObjectStore]. @@ -217,7 +231,7 @@ impl ObjectStore { .get_last_raw_message_by_subject(subject.as_str()) .await .map_err(|err| match err.kind() { - super::stream::LastRawMessageErrorKind::NoMessageFound => { + stream::LastRawMessageErrorKind::NoMessageFound => { InfoError::new(InfoErrorKind::NotFound) } _ => InfoError::with_source(InfoErrorKind::Other, err), @@ -321,9 +335,9 @@ impl ObjectStore { let object_info = ObjectInfo { name: object_meta.name, description: object_meta.description, - link: object_meta.link, + link: None, bucket: self.name.clone(), - nuid: object_nuid, + nuid: object_nuid.to_string(), chunks: object_chunks, size: object_size, digest: Some(format!("SHA-256={}", URL_SAFE.encode(digest))), @@ -401,11 +415,25 @@ impl ObjectStore { /// # } /// ``` pub async fn watch(&self) -> Result, WatchError> { + self.watch_with_deliver_policy(DeliverPolicy::New).await + } + + /// Creates a [Watch] stream over changes in the [ObjectStore] which yields values whenever + /// there are changes for that key with as well as last value. + pub async fn watch_with_history(&self) -> Result, WatchError> { + self.watch_with_deliver_policy(DeliverPolicy::LastPerSubject) + .await + } + + async fn watch_with_deliver_policy( + &self, + deliver_policy: DeliverPolicy, + ) -> Result, WatchError> { let subject = format!("$O.{}.M.>", self.name); let ordered = self .stream .create_consumer(crate::jetstream::consumer::push::OrderedConfig { - deliver_policy: super::consumer::DeliverPolicy::New, + deliver_policy, deliver_subject: self.stream.context.client.new_inbox(), description: Some("object store watcher".to_string()), filter_subject: subject, @@ -486,6 +514,295 @@ impl ObjectStore { .await?; Ok(()) } + + /// Updates [Object] [ObjectMeta]. + /// + /// # Examples + /// + /// ```no_run + /// # #[tokio::main] + /// # async fn main() -> Result<(), async_nats::Error> { + /// use async_nats::jetstream::object_store; + /// let client = async_nats::connect("demo.nats.io").await?; + /// let jetstream = async_nats::jetstream::new(client); + /// + /// let mut bucket = jetstream.get_object_store("store").await?; + /// bucket + /// .update_metadata( + /// "object", + /// object_store::ObjectMeta { + /// name: "new_name".to_string(), + /// description: Some("a new description".to_string()), + /// }, + /// ) + /// .await?; + /// # Ok(()) + /// # } + /// ``` + pub async fn update_metadata>( + &self, + object: A, + metadata: ObjectMeta, + ) -> Result { + let mut info = self.info(object.as_ref()).await?; + + // If name is being update, we need to check if other metadata with it already exists. + // If does, error. Otherwise, purge old name metadata. + if metadata.name != info.name { + tracing::info!("new metadata name is different than then old one"); + if !is_valid_object_name(&metadata.name) { + return Err(UpdateMetadataError::new( + UpdateMetadataErrorKind::InvalidName, + )); + } + match self.info(&metadata.name).await { + Ok(_) => { + return Err(UpdateMetadataError::new( + UpdateMetadataErrorKind::NameAlreadyInUse, + )) + } + Err(err) => match err.kind() { + InfoErrorKind::NotFound => { + tracing::info!("purging old metadata: {}", info.name); + self.stream + .purge() + .filter(format!( + "$O.{}.M.{}", + self.name, + encode_object_name(&info.name) + )) + .await + .map_err(|err| { + UpdateMetadataError::with_source( + UpdateMetadataErrorKind::Purge, + err, + ) + })?; + } + _ => { + return Err(UpdateMetadataError::with_source( + UpdateMetadataErrorKind::Other, + err, + )) + } + }, + } + } + + info.name = metadata.name; + info.description = metadata.description; + + let name = encode_object_name(&info.name); + let subject = format!("$O.{}.M.{}", &self.name, &name); + + let mut headers = HeaderMap::new(); + headers.insert( + NATS_ROLLUP, + ROLLUP_SUBJECT.parse::().map_err(|err| { + UpdateMetadataError::with_source( + UpdateMetadataErrorKind::Other, + format!("failed parsing header: {}", err), + ) + })?, + ); + let data = serde_json::to_vec(&info).map_err(|err| { + UpdateMetadataError::with_source( + UpdateMetadataErrorKind::Other, + format!("failed serializing object info: {}", err), + ) + })?; + + // publish meta. + self.stream + .context + .publish_with_headers(subject, headers, data.into()) + .await + .map_err(|err| { + UpdateMetadataError::with_source( + UpdateMetadataErrorKind::PublishMetadata, + format!("failed publishing metadata: {}", err), + ) + })? + .await + .map_err(|err| { + UpdateMetadataError::with_source( + UpdateMetadataErrorKind::PublishMetadata, + format!("failed ack from metadata publish: {}", err), + ) + })?; + + Ok(info) + } + + /// Adds a link to an [Object]. + /// It creates a new [Object] in the [ObjectStore] that points to another [Object] + /// and does not have any contents on it's own. + /// Links are automatically followed (one level deep) when calling [ObjectStore::get]. + /// + /// # Examples + /// + /// ```no_run + /// # #[tokio::main] + /// # async fn main() -> Result<(), async_nats::Error> { + /// use async_nats::jetstream::object_store; + /// let client = async_nats::connect("demo.nats.io").await?; + /// let jetstream = async_nats::jetstream::new(client); + /// let bucket = jetstream.get_object_store("bucket").await?; + /// let object = bucket.get("object").await?; + /// bucket.add_link("link_to_object", &object).await?; + /// # Ok(()) + /// # } + /// ``` + pub async fn add_link<'a, T, O>(&self, name: T, object: O) -> Result + where + T: ToString, + O: AsObjectInfo, + { + let object = object.as_info(); + let name = name.to_string(); + if name.is_empty() { + return Err(AddLinkError::new(AddLinkErrorKind::EmptyName)); + } + if object.name.is_empty() { + return Err(AddLinkError::new(AddLinkErrorKind::ObjectRequired)); + } + if object.deleted { + return Err(AddLinkError::new(AddLinkErrorKind::Deleted)); + } + if object.link.is_some() { + return Err(AddLinkError::new(AddLinkErrorKind::LinkToLink)); + } + + match self.info(&name).await { + Ok(info) => { + if info.link.is_none() { + return Err(AddLinkError::new(AddLinkErrorKind::AlreadyExists)); + } + } + Err(err) if err.kind() != InfoErrorKind::NotFound => { + return Err(AddLinkError::with_source(AddLinkErrorKind::Other, err)) + } + _ => (), + } + + let info = ObjectInfo { + name, + description: None, + link: Some(ObjectLink { + name: Some(object.name.clone()), + bucket: object.bucket.clone(), + }), + bucket: self.name.clone(), + nuid: nuid::next().to_string(), + size: 0, + chunks: 0, + modified: OffsetDateTime::now_utc(), + digest: None, + deleted: false, + }; + publish_meta(self, &info).await?; + Ok(info) + } + + /// Adds a link to another [ObjectStore] bucket by creating a new [Object] + /// in the current [ObjectStore] that points to another [ObjectStore] and + /// does not contain any data. + /// + /// # Examples + /// + /// ```no_run + /// # #[tokio::main] + /// # async fn main() -> Result<(), async_nats::Error> { + /// use async_nats::jetstream::object_store; + /// let client = async_nats::connect("demo.nats.io").await?; + /// let jetstream = async_nats::jetstream::new(client); + /// let bucket = jetstream.get_object_store("bucket").await?; + /// bucket + /// .add_bucket_link("link_to_object", "another_bucket") + /// .await?; + /// # Ok(()) + /// # } + /// ``` + pub async fn add_bucket_link( + &self, + name: T, + bucket: U, + ) -> Result { + let name = name.to_string(); + let bucket = bucket.to_string(); + if name.is_empty() { + return Err(AddLinkError::new(AddLinkErrorKind::EmptyName)); + } + + match self.info(&name).await { + Ok(info) => { + if info.link.is_none() { + return Err(AddLinkError::new(AddLinkErrorKind::AlreadyExists)); + } + } + Err(err) if err.kind() != InfoErrorKind::NotFound => { + return Err(AddLinkError::with_source(AddLinkErrorKind::Other, err)) + } + _ => (), + } + + let info = ObjectInfo { + name: name.clone(), + description: None, + link: Some(ObjectLink { name: None, bucket }), + bucket: self.name.clone(), + nuid: nuid::next().to_string(), + size: 0, + chunks: 0, + modified: OffsetDateTime::now_utc(), + digest: None, + deleted: false, + }; + publish_meta(self, &info).await?; + Ok(info) + } +} + +async fn publish_meta(store: &ObjectStore, info: &ObjectInfo) -> Result<(), PublishMetadataError> { + let encoded_object_name = encode_object_name(&info.name); + let subject = format!("$O.{}.M.{}", &store.name, &encoded_object_name); + + let mut headers = HeaderMap::new(); + headers.insert( + NATS_ROLLUP, + ROLLUP_SUBJECT.parse::().map_err(|err| { + PublishMetadataError::with_source( + PublishMetadataErrorKind::Other, + format!("failed parsing header: {}", err), + ) + })?, + ); + let data = serde_json::to_vec(&info).map_err(|err| { + PublishMetadataError::with_source( + PublishMetadataErrorKind::Other, + format!("failed serializing object info: {}", err), + ) + })?; + + store + .stream + .context + .publish_with_headers(subject, headers, data.into()) + .await + .map_err(|err| { + PublishMetadataError::with_source( + PublishMetadataErrorKind::PublishMetadata, + format!("failed publishing metadata: {}", err), + ) + })? + .await + .map_err(|err| { + PublishMetadataError::with_source( + PublishMetadataErrorKind::PublishMetadata, + format!("failed ack from metadata publish: {}", err), + ) + })?; + Ok(()) } pub struct Watch<'a> { @@ -579,16 +896,20 @@ pub struct Object<'a> { has_pending_messages: bool, digest: Option, subscription: Option>, + subscription_future: Option, StreamError>>>, + stream: crate::jetstream::stream::Stream, } impl<'a> Object<'a> { - pub(crate) fn new(subscription: Ordered<'a>, info: ObjectInfo) -> Self { + pub(crate) fn new(info: ObjectInfo, stream: stream::Stream) -> Self { Object { - subscription: Some(subscription), + subscription: None, info, remaining_bytes: VecDeque::new(), has_pending_messages: true, digest: Some(ring::digest::Context::new(&SHA256)), + subscription_future: None, + stream, } } @@ -613,6 +934,34 @@ impl tokio::io::AsyncRead for Object<'_> { } if self.has_pending_messages { + if self.subscription.is_none() { + let future = match self.subscription_future.as_mut() { + Some(future) => future, + None => { + let stream = self.stream.clone(); + let bucket = self.info.bucket.clone(); + let nuid = self.info.nuid.clone(); + self.subscription_future.insert(Box::pin(async move { + stream + .create_consumer(OrderedConfig { + deliver_subject: stream.context.client.new_inbox(), + filter_subject: format!("$O.{}.C.{}", bucket, nuid), + ..Default::default() + }) + .await + .unwrap() + .messages() + .await + })) + } + }; + match future.as_mut().poll(cx) { + Poll::Ready(subscription) => { + self.subscription = Some(subscription.unwrap()); + } + Poll::Pending => (), + } + } if let Some(subscription) = self.subscription.as_mut() { match subscription.poll_next_unpin(cx) { Poll::Ready(message) => match message { @@ -673,7 +1022,7 @@ impl tokio::io::AsyncRead for Object<'_> { Poll::Pending => Poll::Pending, } } else { - Poll::Ready(Ok(())) + Poll::Pending } } else { Poll::Ready(Ok(())) @@ -717,9 +1066,9 @@ fn is_default(t: &T) -> bool { #[derive(Debug, Default, Clone, Serialize, Deserialize, Eq, PartialEq)] pub struct ObjectLink { /// Name of the object - pub name: String, + pub name: Option, /// Name of the bucket the object is stored in. - pub bucket: Option, + pub bucket: String, } /// Meta information about an object. @@ -729,8 +1078,6 @@ pub struct ObjectMeta { pub name: String, /// A short human readable description of the object. pub description: Option, - /// Link this object points to, if any. - pub link: Option, } impl From<&str> for ObjectMeta { @@ -742,6 +1089,76 @@ impl From<&str> for ObjectMeta { } } +pub trait AsObjectInfo { + fn as_info(&self) -> &ObjectInfo; +} + +impl AsObjectInfo for &Object<'_> { + fn as_info(&self) -> &ObjectInfo { + &self.info + } +} +impl AsObjectInfo for &ObjectInfo { + fn as_info(&self) -> &ObjectInfo { + self + } +} + +impl From for ObjectMeta { + fn from(info: ObjectInfo) -> Self { + ObjectMeta { + name: info.name, + description: info.description, + } + } +} + +#[derive(Debug, PartialEq, Clone)] +pub enum UpdateMetadataErrorKind { + InvalidName, + NotFound, + TimedOut, + Other, + PublishMetadata, + NameAlreadyInUse, + Purge, +} + +impl Display for UpdateMetadataErrorKind { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::InvalidName => write!(f, "invalid object name"), + Self::NotFound => write!(f, "object not found"), + Self::TimedOut => write!(f, "timed out"), + Self::Other => write!(f, "error"), + Self::PublishMetadata => { + write!(f, "failed publishing metadata") + } + Self::NameAlreadyInUse => { + write!(f, "object with updated name already exists") + } + Self::Purge => write!(f, "failed purging old name metadata"), + } + } +} + +impl From for UpdateMetadataError { + fn from(error: InfoError) -> Self { + match error.kind() { + InfoErrorKind::InvalidName => { + UpdateMetadataError::new(UpdateMetadataErrorKind::InvalidName) + } + InfoErrorKind::NotFound => UpdateMetadataError::new(UpdateMetadataErrorKind::NotFound), + InfoErrorKind::Other => { + UpdateMetadataError::with_source(UpdateMetadataErrorKind::Other, error) + } + InfoErrorKind::TimedOut => UpdateMetadataError::new(UpdateMetadataErrorKind::TimedOut), + } + } +} + +pub type UpdateMetadataError = Error; + #[derive(Clone, Copy, Debug, PartialEq)] pub enum InfoErrorKind { InvalidName, @@ -768,6 +1185,7 @@ pub enum GetErrorKind { InvalidName, ConsumerCreate, NotFound, + BucketLink, Other, TimedOut, } @@ -780,6 +1198,7 @@ impl Display for GetErrorKind { Self::NotFound => write!(f, "object not found"), Self::TimedOut => write!(f, "timed out"), Self::InvalidName => write!(f, "invalid object name"), + Self::BucketLink => write!(f, "object is a link to a bucket"), } } } @@ -866,6 +1285,73 @@ impl Display for PutErrorKind { pub type PutError = Error; +pub type AddLinkError = Error; + +#[derive(Clone, Copy, Debug, PartialEq)] +pub enum AddLinkErrorKind { + EmptyName, + ObjectRequired, + Deleted, + LinkToLink, + PublishMetadata, + AlreadyExists, + Other, +} + +impl Display for AddLinkErrorKind { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + AddLinkErrorKind::ObjectRequired => write!(f, "cannot link to empty Object"), + AddLinkErrorKind::Deleted => write!(f, "cannot link a deleted Object"), + AddLinkErrorKind::LinkToLink => write!(f, "cannot link to another link"), + AddLinkErrorKind::EmptyName => write!(f, "link name cannot be empty"), + AddLinkErrorKind::PublishMetadata => write!(f, "failed publishing link metadata"), + AddLinkErrorKind::Other => write!(f, "error"), + AddLinkErrorKind::AlreadyExists => write!(f, "object already exists"), + } + } +} + +type PublishMetadataError = Error; + +#[derive(Clone, Copy, Debug, PartialEq)] +enum PublishMetadataErrorKind { + PublishMetadata, + Other, +} + +impl Display for PublishMetadataErrorKind { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + PublishMetadataErrorKind::PublishMetadata => write!(f, "failed to publish metadata"), + PublishMetadataErrorKind::Other => write!(f, "error"), + } + } +} + +impl From for AddLinkError { + fn from(error: PublishMetadataError) -> Self { + match error.kind { + PublishMetadataErrorKind::PublishMetadata => { + AddLinkError::new(AddLinkErrorKind::PublishMetadata) + } + PublishMetadataErrorKind::Other => { + AddLinkError::with_source(AddLinkErrorKind::Other, error) + } + } + } +} +impl From for PutError { + fn from(error: PublishMetadataError) -> Self { + match error.kind { + PublishMetadataErrorKind::PublishMetadata => { + PutError::new(PutErrorKind::PublishMetadata) + } + PublishMetadataErrorKind::Other => PutError::with_source(PutErrorKind::Other, error), + } + } +} + #[derive(Clone, Copy, Debug, PartialEq)] pub enum WatchErrorKind { TimedOut, diff --git a/async-nats/src/lib.rs b/async-nats/src/lib.rs index 200f88f7e..4e3d14e36 100644 --- a/async-nats/src/lib.rs +++ b/async-nats/src/lib.rs @@ -152,6 +152,7 @@ pub type Error = Box; const VERSION: &str = env!("CARGO_PKG_VERSION"); const LANG: &str = "rust"; const MAX_PENDING_PINGS: usize = 2; +const MULTIPLEXER_SID: u64 = 0; /// A re-export of the `rustls` crate used in this crate, /// for use in cases where manual client configurations @@ -267,6 +268,13 @@ pub(crate) enum Command { respond: Option, headers: Option, }, + Request { + subject: String, + payload: Bytes, + respond: String, + headers: Option, + sender: oneshot::Sender, + }, Subscribe { sid: u64, subject: String, @@ -315,11 +323,19 @@ struct Subscription { max: Option, } +#[derive(Debug)] +struct Multiplexer { + subject: String, + prefix: String, + senders: HashMap>, +} + /// A connection handler which facilitates communication from channels to a single shared connection. pub(crate) struct ConnectionHandler { connection: Connection, connector: Connector, subscriptions: HashMap, + multiplexer: Option, pending_pings: usize, info_sender: tokio::sync::watch::Sender, ping_interval: Interval, @@ -344,6 +360,7 @@ impl ConnectionHandler { connection, connector, subscriptions: HashMap::new(), + multiplexer: None, pending_pings: 0, info_sender, ping_interval, @@ -484,6 +501,28 @@ impl ConnectionHandler { self.handle_flush().await?; } } + } else if sid == MULTIPLEXER_SID { + if let Some(multiplexer) = self.multiplexer.as_mut() { + let maybe_token = subject.strip_prefix(&multiplexer.prefix).to_owned(); + + if let Some(token) = maybe_token { + if let Some(sender) = multiplexer.senders.remove(token) { + let message = Message { + subject, + reply, + payload, + headers, + status, + description, + length, + }; + + sender.send(message).map_err(|_| { + io::Error::new(io::ErrorKind::Other, "request receiver closed") + })?; + } + } + } } } // TODO: we should probably update advertised server list here too. @@ -591,6 +630,58 @@ impl ConnectionHandler { error!("Sending Subscribe failed with {:?}", err); } } + Command::Request { + subject, + payload, + respond, + headers, + sender, + } => { + let (prefix, token) = respond.rsplit_once('.').ok_or_else(|| { + io::Error::new(io::ErrorKind::Other, "malformed request subject") + })?; + + let multiplexer = if let Some(multiplexer) = self.multiplexer.as_mut() { + multiplexer + } else { + let subject = format!("{}.*", prefix); + + if let Err(err) = self + .connection + .write_op(&ClientOp::Subscribe { + sid: MULTIPLEXER_SID, + subject: subject.clone(), + queue_group: None, + }) + .await + { + error!("Sending Subscribe failed with {:?}", err); + } + + self.multiplexer.insert(Multiplexer { + subject, + prefix: format!("{}.", prefix), + senders: HashMap::new(), + }) + }; + + multiplexer.senders.insert(token.to_owned(), sender); + + let pub_op = ClientOp::Publish { + subject, + payload, + respond: Some(respond), + headers, + }; + + while let Err(err) = self.connection.write_op(&pub_op).await { + self.handle_disconnect().await?; + error!("Sending Publish failed with {:?}", err); + } + + self.connection.flush().await?; + } + Command::Publish { subject, payload, @@ -645,6 +736,18 @@ impl ConnectionHandler { .await .unwrap(); } + + if let Some(multiplexer) = &self.multiplexer { + self.connection + .write_op(&ClientOp::Subscribe { + sid: MULTIPLEXER_SID, + subject: multiplexer.subject.to_owned(), + queue_group: None, + }) + .await + .unwrap(); + } + self.connector.events_tx.try_send(Event::Connected).ok(); Ok(()) diff --git a/async-nats/src/options.rs b/async-nats/src/options.rs index fec11816b..93d7629ef 100644 --- a/async-nats/src/options.rs +++ b/async-nats/src/options.rs @@ -597,7 +597,7 @@ impl ConnectOptions { /// # #[tokio::main] /// # async fn main() -> Result<(), async_nats::ConnectError> { /// async_nats::ConnectOptions::new() - /// .flush_interval(Duration::from_millis(100)) + /// .ping_interval(Duration::from_secs(24)) /// .connect("demo.nats.io") /// .await?; /// # Ok(()) diff --git a/async-nats/src/service/mod.rs b/async-nats/src/service/mod.rs index 9bec665a8..31f3b7431 100644 --- a/async-nats/src/service/mod.rs +++ b/async-nats/src/service/mod.rs @@ -48,11 +48,11 @@ pub const NATS_SERVICE_ERROR_CODE: &str = "Nats-Service-Error-Code"; // uses recommended semver validation expression from // https://semver.org/#is-there-a-suggested-regular-expression-regex-to-check-a-semver-string static SEMVER: Lazy = Lazy::new(|| { - Regex::new(r#"^(0|[1-9]\d*)\.(0|[1-9]\d*)\.(0|[1-9]\d*)(?:-((?:0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*)(?:\.(?:0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*))*))?(?:\+([0-9a-zA-Z-]+(?:\.[0-9a-zA-Z-]+)*))?$"#) + Regex::new(r"^(0|[1-9]\d*)\.(0|[1-9]\d*)\.(0|[1-9]\d*)(?:-((?:0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*)(?:\.(?:0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*))*))?(?:\+([0-9a-zA-Z-]+(?:\.[0-9a-zA-Z-]+)*))?$") .unwrap() }); // From ADR-33: Name can only have A-Z, a-z, 0-9, dash, underscore. -static NAME: Lazy = Lazy::new(|| Regex::new(r#"^[A-Za-z0-9\-_]+$"#).unwrap()); +static NAME: Lazy = Lazy::new(|| Regex::new(r"^[A-Za-z0-9\-_]+$").unwrap()); /// Represents state for all endpoints. #[derive(Debug, Clone, Serialize, Deserialize)] @@ -318,7 +318,7 @@ impl Service { "service name is not a valid string (only A-Z, a-z, 0-9, _, - are allowed)", ))); } - let id = nuid::next(); + let id = nuid::next().to_string(); let started = time::OffsetDateTime::now_utc(); let subjects = Arc::new(Mutex::new(Vec::new())); let info = Info { diff --git a/async-nats/src/tls.rs b/async-nats/src/tls.rs index 6ada7dfd2..c00f37c63 100644 --- a/async-nats/src/tls.rs +++ b/async-nats/src/tls.rs @@ -77,7 +77,7 @@ pub(crate) async fn config_tls(options: &ConnectorOptions) -> io::Result>>() - .as_ref(), + .as_slice(), ); } @@ -105,7 +105,7 @@ pub(crate) async fn config_tls(options: &ConnectorOptions) -> io::Result().unwrap(), publish_ack.sequence); assert_eq!(payload, message.payload.as_ref()); @@ -754,9 +753,8 @@ mod jetstream { .unwrap() .get(header::NATS_SEQUENCE) .unwrap() - .iter() - .next() - .unwrap(); + .as_str(); + assert_eq!(sequence.parse::().unwrap(), publish_ack.sequence); assert_eq!(payload, message.payload.as_ref()); @@ -828,9 +826,8 @@ mod jetstream { .unwrap() .get(header::NATS_SEQUENCE) .unwrap() - .iter() - .next() - .unwrap(); + .as_str(); + assert_eq!(sequence.parse::().unwrap(), publish_ack.sequence); assert_eq!(payload, message.payload.as_ref()); @@ -897,9 +894,8 @@ mod jetstream { .unwrap() .get(header::NATS_SEQUENCE) .unwrap() - .iter() - .next() - .unwrap(); + .as_str(); + assert_eq!(sequence.parse::().unwrap(), publish_ack.sequence); assert_eq!(payload, message.payload.as_ref()); diff --git a/async-nats/tests/object_store.rs b/async-nats/tests/object_store.rs index abbe6995f..ad76211e5 100644 --- a/async-nats/tests/object_store.rs +++ b/async-nats/tests/object_store.rs @@ -15,7 +15,10 @@ mod object_store { use std::{io, time::Duration}; - use async_nats::jetstream::object_store::ObjectMeta; + use async_nats::jetstream::{ + object_store::{AddLinkErrorKind, ObjectMeta}, + stream::DirectGetErrorKind, + }; use base64::Engine; use futures::StreamExt; use rand::RngCore; @@ -67,6 +70,17 @@ mod object_store { object.info.digest ); assert_eq!(result, bytes); + + // Check if following a link works. + bucket.add_link("link", &object.info).await.unwrap(); + + tracing::info!("getting link"); + let mut object_link = bucket.get("link").await.unwrap(); + let mut contents = Vec::new(); + + tracing::info!("reading content"); + object_link.read_to_end(&mut contents).await.unwrap(); + assert_eq!(contents, result); } #[tokio::test] @@ -110,6 +124,66 @@ mod object_store { assert!(object.deleted); } + #[tokio::test] + async fn watch_with_history() { + let server = nats_server::run_server("tests/configs/jetstream.conf"); + let client = async_nats::connect(server.client_url()).await.unwrap(); + + let jetstream = async_nats::jetstream::new(client); + + let bucket = jetstream + .create_object_store(async_nats::jetstream::object_store::Config { + bucket: "bucket".to_string(), + ..Default::default() + }) + .await + .unwrap(); + + bucket + .put("FOO", &mut std::io::Cursor::new(vec![1, 2, 3, 4])) + .await + .unwrap(); + + bucket + .put("BAR", &mut std::io::Cursor::new(vec![5, 6, 7, 8])) + .await + .unwrap(); + + bucket + .put("FOO", &mut std::io::Cursor::new(vec![9, 0, 1, 2])) + .await + .unwrap(); + + let mut watcher = bucket.watch_with_history().await.unwrap(); + + tokio::task::spawn({ + let bucket = bucket.clone(); + async move { + tokio::time::sleep(Duration::from_millis(100)).await; + bucket + .put("BAR", &mut io::Cursor::new(vec![2, 3, 4, 5])) + .await + .unwrap(); + bucket.delete("BAR").await.unwrap(); + } + }); + + // check to see if we get the values in accordance to the LastPerSubject deliver policy + // we should get `BAR` and only one `FOO` + let object = watcher.next().await.unwrap().unwrap(); + assert_eq!(object.name, "BAR".to_string()); + + let object = watcher.next().await.unwrap().unwrap(); + assert_eq!(object.name, "FOO".to_string()); + + // make sure we get the rest correctly + let object = watcher.next().await.unwrap().unwrap(); + assert_eq!(object.name, "BAR".to_string()); + let object = watcher.next().await.unwrap().unwrap(); + assert_eq!(object.name, "BAR".to_string()); + assert!(object.deleted); + } + #[tokio::test] async fn info() { let server = nats_server::run_server("tests/configs/jetstream.conf"); @@ -282,7 +356,6 @@ mod object_store { ObjectMeta { name: "Foo".to_string(), description: Some("foo desc".to_string()), - ..Default::default() }, &mut "dadada".as_bytes(), ) @@ -343,4 +416,142 @@ mod object_store { .await .unwrap(); } + + #[tokio::test] + async fn update_metadata() { + let server = nats_server::run_server("tests/configs/jetstream.conf"); + let client = async_nats::connect(server.client_url()).await.unwrap(); + + let jetstream = async_nats::jetstream::new(client); + + let bucket = jetstream + .create_object_store(async_nats::jetstream::object_store::Config { + bucket: "bucket".to_string(), + ..Default::default() + }) + .await + .unwrap(); + bucket + .put("old_object", &mut "some data".as_bytes()) + .await + .unwrap(); + + let given_metadata = ObjectMeta { + name: "new_object".to_owned(), + description: Some("description".to_string()), + }; + + bucket + .update_metadata("old_object", given_metadata.clone()) + .await + .unwrap(); + + let stream = jetstream.get_stream("OBJ_bucket").await.unwrap(); + + stream + .direct_get_last_for_subject(format!( + "$O.bucket.M.{}", + base64::engine::general_purpose::URL_SAFE.encode("new_object") + )) + .await + .unwrap(); + + let old_meta_subject = stream + .direct_get_last_for_subject(format!( + "$O.bucket.M.{}", + base64::engine::general_purpose::URL_SAFE.encode("old_object") + )) + .await + .unwrap_err(); + + assert_eq!(old_meta_subject.kind(), DirectGetErrorKind::NotFound); + + let info = bucket.info("new_object").await.unwrap(); + + assert_eq!(info.name, given_metadata.name); + assert_eq!(info.description, given_metadata.description); + } + + #[tokio::test] + async fn add_link() { + let server = nats_server::run_server("tests/configs/jetstream.conf"); + let client = async_nats::connect(server.client_url()).await.unwrap(); + + let jetstream = async_nats::jetstream::new(client); + + let bucket = jetstream + .create_object_store(async_nats::jetstream::object_store::Config { + bucket: "bucket".to_string(), + ..Default::default() + }) + .await + .unwrap(); + let object = bucket + .put("object", &mut "some data".as_bytes()) + .await + .unwrap(); + + let another_object = bucket + .put("another_object", &mut "other data".as_bytes()) + .await + .unwrap(); + + bucket.add_link("link", &object).await.unwrap(); + + let link_info = bucket.info("link").await.unwrap(); + + assert_eq!( + link_info + .link + .as_ref() + .unwrap() + .name + .as_ref() + .unwrap() + .as_str(), + "object" + ); + assert_eq!(link_info.link.as_ref().unwrap().bucket.as_str(), "bucket"); + + let result = bucket + .add_link("object", &another_object) + .await + .unwrap_err(); + assert_eq!(result.kind(), AddLinkErrorKind::AlreadyExists); + + let result = bucket.add_link("", &another_object).await.unwrap_err(); + assert_eq!(result.kind(), AddLinkErrorKind::EmptyName); + + let result = bucket.add_link("new_link", &link_info).await.unwrap_err(); + assert_eq!(result.kind(), AddLinkErrorKind::LinkToLink); + } + + #[tokio::test] + async fn add_bucket_link() { + let server = nats_server::run_server("tests/configs/jetstream.conf"); + let client = async_nats::connect(server.client_url()).await.unwrap(); + + let jetstream = async_nats::jetstream::new(client); + + jetstream + .create_object_store(async_nats::jetstream::object_store::Config { + bucket: "another".to_string(), + ..Default::default() + }) + .await + .unwrap(); + let bucket = jetstream + .create_object_store(async_nats::jetstream::object_store::Config { + bucket: "bucket".to_string(), + ..Default::default() + }) + .await + .unwrap(); + + bucket.add_bucket_link("link", "another").await.unwrap(); + + let link_info = bucket.info("link").await.unwrap(); + assert!(link_info.link.as_ref().unwrap().name.is_none()); + assert_eq!(link_info.link.as_ref().unwrap().bucket.as_str(), "another"); + } } diff --git a/async-nats/tests/service_tests.rs b/async-nats/tests/service_tests.rs index 89b992a63..94f0bf417 100644 --- a/async-nats/tests/service_tests.rs +++ b/async-nats/tests/service_tests.rs @@ -307,9 +307,7 @@ mod service { .unwrap() .get(async_nats::service::NATS_SERVICE_ERROR_CODE) .unwrap() - .iter() - .next() - .unwrap() + .as_str() .parse::() .unwrap(), 503 @@ -320,9 +318,7 @@ mod service { .unwrap() .get(async_nats::service::NATS_SERVICE_ERROR) .unwrap() - .iter() - .next() - .unwrap(), + .to_string(), "error".to_string() ); diff --git a/nats-server/Cargo.toml b/nats-server/Cargo.toml index 14099dbf4..b665ff67c 100644 --- a/nats-server/Cargo.toml +++ b/nats-server/Cargo.toml @@ -11,7 +11,7 @@ lazy_static = "1.4.0" regex = { version = "1.7.1", default-features = false, features = ["std", "unicode-perl"] } url = "2" json = "0.12" -nuid = "0.4.1" +nuid = "0.5" rand = "0.8" tokio-retry = "0.3.0" diff --git a/nats-server/src/lib.rs b/nats-server/src/lib.rs index 2a9f4cca4..57ba39f2e 100644 --- a/nats-server/src/lib.rs +++ b/nats-server/src/lib.rs @@ -38,7 +38,7 @@ struct Inner { lazy_static! { static ref SD_RE: Regex = Regex::new(r#".+\sStore Directory:\s+"([^"]+)""#).unwrap(); - static ref CLIENT_RE: Regex = Regex::new(r#".+\sclient connections on\s+(\S+)"#).unwrap(); + static ref CLIENT_RE: Regex = Regex::new(r".+\sclient connections on\s+(\S+)").unwrap(); } impl Drop for Server { @@ -179,7 +179,7 @@ impl<'a> IntoConfig<'a> for [&'a str; 3] { pub fn run_cluster<'a, C: IntoConfig<'a>>(cfg: C) -> Cluster { let cfg = cfg.into_config(); let port = rand::thread_rng().gen_range(3000..50_000); - let ports = vec![port, port + 100, port + 200]; + let ports = [port, port + 100, port + 200]; let ports = ports .iter() @@ -191,7 +191,7 @@ pub fn run_cluster<'a, C: IntoConfig<'a>>(cfg: C) -> Cluster { new_port }) .collect::>(); - let cluster = vec![port + 1, port + 101, port + 201]; + let cluster = [port + 1, port + 101, port + 201]; let s1 = run_cluster_node_with_port( cfg.0[0], @@ -240,7 +240,7 @@ pub fn run_server_with_port(cfg: &str, port: Option<&str>) -> Server { } fn do_run(cfg: &str, port: Option<&str>, id: Option) -> Inner { - let id = id.unwrap_or_else(nuid::next); + let id = id.unwrap_or_else(|| nuid::next().to_string()); let logfile = env::temp_dir().join(format!("nats-server-{id}.log")); let pidfile = env::temp_dir().join(format!("nats-server-{id}.pid")); let store_dir = env::temp_dir().join(format!("store-dir-{id}")); @@ -268,7 +268,7 @@ fn do_run(cfg: &str, port: Option<&str>, id: Option) -> Inner { Inner { port: port.map(ToString::to_string), cfg: cfg.to_string(), - id, + id: id.to_string(), child, logfile, pidfile, @@ -327,7 +327,7 @@ fn run_cluster_node_with_port( inner: Inner { port: port.map(ToString::to_string), cfg: cfg.to_string(), - id, + id: id.to_string(), child, logfile, pidfile, diff --git a/nats/src/jetstream/push_subscription.rs b/nats/src/jetstream/push_subscription.rs index 5b8ede2f5..9f36b6640 100644 --- a/nats/src/jetstream/push_subscription.rs +++ b/nats/src/jetstream/push_subscription.rs @@ -342,7 +342,7 @@ impl PushSubscription { self.0.stream, self.0.consumer, )) .spawn(move || { - for m in sub.iter() { + for m in &sub { if let Err(e) = handler(m) { // TODO(dlc) - Capture for last error? log::error!("Error in callback! {:?}", e); @@ -394,7 +394,7 @@ impl PushSubscription { self.0.consumer, self.0.stream )) .spawn(move || { - for message in sub.iter() { + for message in &sub { if let Err(err) = handler(&message) { log::error!("Error in callback! {:?}", err); } diff --git a/nats/src/subscription.rs b/nats/src/subscription.rs index c650fcc5d..be8bb3f32 100644 --- a/nats/src/subscription.rs +++ b/nats/src/subscription.rs @@ -246,7 +246,7 @@ impl Subscription { thread::Builder::new() .name(format!("nats_subscriber_{}_{}", self.0.sid, self.0.subject)) .spawn(move || { - for m in sub.iter() { + for m in &sub { if let Err(e) = handler(m) { // TODO(dlc) - Capture for last error? log::error!("Error in callback! {:?}", e);