diff --git a/async-nats/tests/jetstream_tests.rs b/async-nats/tests/jetstream_tests.rs index 2c6965ec6..b4f7a65de 100755 --- a/async-nats/tests/jetstream_tests.rs +++ b/async-nats/tests/jetstream_tests.rs @@ -38,12 +38,12 @@ mod jetstream { }; use async_nats::jetstream::context::{GetStreamByNameErrorKind, Publish, PublishErrorKind}; use async_nats::jetstream::response::Response; + #[cfg(feature = "server_2_10")] + use async_nats::jetstream::stream::ConsumerLimits; use async_nats::jetstream::stream::{ self, ConsumerCreateStrictErrorKind, ConsumerUpdateErrorKind, DirectGetErrorKind, DiscardPolicy, StorageType, }; - #[cfg(feature = "server_2_10")] - use async_nats::jetstream::stream::{Compression, ConsumerLimits, Source, SubjectTransform}; use async_nats::jetstream::AckKind; use async_nats::ConnectOptions; use futures::stream::{StreamExt, TryStreamExt}; @@ -3260,7 +3260,9 @@ mod jetstream { .await .unwrap(); - assert_eq!(stream.info().await.unwrap().config.metadata, metadata); + let info = stream.info().await.unwrap(); + assert_eq!(info.config.metadata.get("key"), metadata.get("key")); + assert_eq!(info.config.metadata.get("other"), metadata.get("other")); let mut consumer = stream .create_consumer(async_nats::jetstream::consumer::pull::Config { @@ -3271,7 +3273,9 @@ mod jetstream { .await .unwrap(); - assert_eq!(consumer.info().await.unwrap().config.metadata, metadata); + let info = consumer.info().await.unwrap(); + assert_eq!(info.config.metadata.get("key"), metadata.get("key")); + assert_eq!(info.config.metadata.get("other"), metadata.get("other")); } #[tokio::test] @@ -3533,80 +3537,6 @@ mod jetstream { .unwrap(); } - #[cfg(feature = "server_2_10")] - #[tokio::test] - async fn stream_config() { - let server = nats_server::run_server("tests/configs/jetstream.conf"); - let client = async_nats::connect(server.client_url()).await.unwrap(); - - let jetstream = async_nats::jetstream::new(client); - - let config = async_nats::jetstream::stream::Config { - name: "EVENTS".to_string(), - max_bytes: 1024 * 1024, - max_messages: 1_000_000, - max_messages_per_subject: 100, - discard: DiscardPolicy::New, - discard_new_per_subject: true, - subjects: vec!["events.>".to_string()], - retention: stream::RetentionPolicy::WorkQueue, - max_consumers: 10, - max_age: Duration::from_secs(900), - max_message_size: 1024 * 1024, - storage: StorageType::Memory, - num_replicas: 1, - no_ack: true, - duplicate_window: Duration::from_secs(90), - template_owner: "".to_string(), - sealed: false, - description: Some("A Stream".to_string()), - allow_rollup: true, - deny_delete: false, - deny_purge: false, - republish: Some(stream::Republish { - source: "data.>".to_string(), - destination: "dest.>".to_string(), - headers_only: true, - }), - allow_direct: true, - mirror_direct: false, - mirror: None, - sources: Some(vec![Source { - name: "source_one_of_many".to_string(), - start_sequence: Some(5), - start_time: Some(OffsetDateTime::now_utc()), - filter_subject: Some("filter".to_string()), - external: Some(stream::External { - api_prefix: "API.PREFIX".to_string(), - delivery_prefix: Some("delivery_prefix".to_string()), - }), - domain: None, - subject_transforms: vec![SubjectTransform { - source: "source".to_string(), - destination: "dest".to_string(), - }], - }]), - metadata: HashMap::from([("key".to_string(), "value".to_string())]), - subject_transform: Some(SubjectTransform { - source: "source".to_string(), - destination: "dest".to_string(), - }), - compression: Some(Compression::S2), - consumer_limits: Some(ConsumerLimits { - inactive_threshold: Duration::from_secs(120), - max_ack_pending: 150, - }), - first_sequence: Some(505), - placement: Some(stream::Placement { - cluster: Some("CLUSTER".to_string()), - tags: vec!["tag".to_string()], - }), - }; - - let mut stream = jetstream.create_stream(config.clone()).await.unwrap(); - assert_eq!(config, stream.info().await.unwrap().config); - } - #[tokio::test] async fn limits() { let server = nats_server::run_server("tests/configs/jetstream.conf");