From eb4d4922d5e90b155c58382f6a9d97ed529e871c Mon Sep 17 00:00:00 2001 From: Paolo Barbolini Date: Thu, 27 Jul 2023 16:00:23 +0200 Subject: [PATCH] Use Duration in a few more places --- async-nats/src/jetstream/consumer/pull.rs | 50 +++++++++++++---------- 1 file changed, 28 insertions(+), 22 deletions(-) diff --git a/async-nats/src/jetstream/consumer/pull.rs b/async-nats/src/jetstream/consumer/pull.rs index 684d77f2e..c90fd5916 100644 --- a/async-nats/src/jetstream/consumer/pull.rs +++ b/async-nats/src/jetstream/consumer/pull.rs @@ -12,7 +12,10 @@ // limitations under the License. use bytes::Bytes; -use futures::{future::BoxFuture, FutureExt, StreamExt, TryFutureExt}; +use futures::{ + future::{BoxFuture, Either}, + FutureExt, StreamExt, TryFutureExt, +}; #[cfg(feature = "server_2_10")] use std::collections::HashMap; @@ -82,7 +85,7 @@ impl Consumer { Stream::stream( BatchConfig { batch: 200, - expires: Some(Duration::from_secs(30).as_nanos().try_into().unwrap()), + expires: Some(Duration::from_secs(30)), no_wait: false, max_bytes: 0, idle_heartbeat: Duration::from_secs(15), @@ -315,7 +318,7 @@ impl Consumer { let request = serde_json::to_vec(&BatchConfig { batch, - expires: Some(Duration::from_secs(60).as_nanos().try_into().unwrap()), + expires: Some(Duration::from_secs(60)), ..Default::default() }) .map(Bytes::from) @@ -345,9 +348,9 @@ impl<'a> Batch { let subscription = consumer.context.client.subscribe(inbox.clone()).await?; consumer.request_batch(batch, inbox.clone()).await?; - let sleep = batch.expires.map(|e| { + let sleep = batch.expires.map(|expires| { Box::pin(tokio::time::sleep( - Duration::from_nanos(e).saturating_add(Duration::from_secs(5)), + expires.saturating_add(Duration::from_secs(5)), )) }); @@ -556,7 +559,7 @@ impl<'a> Consumer { let stream = Stream::stream( BatchConfig { batch: 500, - expires: Some(Duration::from_secs(30).as_nanos().try_into().unwrap()), + expires: Some(Duration::from_secs(30)), no_wait: false, max_bytes: 0, idle_heartbeat: Duration::from_secs(15), @@ -870,13 +873,16 @@ impl Stream { // this is just in edge case of missing response for some reason. let expires = batch_config .expires - .map(|expires| match expires { - 0 => futures::future::Either::Left(future::pending()), - t => futures::future::Either::Right(tokio::time::sleep( - Duration::from_nanos(t).saturating_add(Duration::from_secs(5)), - )), + .map(|expires| { + if expires.is_zero() { + Either::Left(future::pending()) + } else { + Either::Right(tokio::time::sleep( + expires.saturating_add(Duration::from_secs(5)), + )) + } }) - .unwrap_or_else(|| futures::future::Either::Left(future::pending())); + .unwrap_or_else(|| Either::Left(future::pending())); // Need to check previous state, as `changed` will always fire on first // call. let prev_state = context.client.state.borrow().to_owned(); @@ -1234,7 +1240,7 @@ pub struct StreamBuilder<'a> { batch: usize, max_bytes: usize, heartbeat: Duration, - expires: u64, + expires: Duration, consumer: &'a Consumer, } @@ -1244,7 +1250,7 @@ impl<'a> StreamBuilder<'a> { consumer, batch: 200, max_bytes: 0, - expires: Duration::from_secs(30).as_nanos().try_into().unwrap(), + expires: Duration::from_secs(30), heartbeat: Duration::default(), } } @@ -1406,7 +1412,7 @@ impl<'a> StreamBuilder<'a> { /// # } /// ``` pub fn expires(mut self, expires: Duration) -> Self { - self.expires = expires.as_nanos().try_into().unwrap(); + self.expires = expires; self } @@ -1494,7 +1500,7 @@ pub struct FetchBuilder<'a> { batch: usize, max_bytes: usize, heartbeat: Duration, - expires: Option, + expires: Option, consumer: &'a Consumer, } @@ -1657,7 +1663,7 @@ impl<'a> FetchBuilder<'a> { /// # } /// ``` pub fn expires(mut self, expires: Duration) -> Self { - self.expires = Some(expires.as_nanos().try_into().unwrap()); + self.expires = Some(expires); self } @@ -1741,7 +1747,7 @@ pub struct BatchBuilder<'a> { batch: usize, max_bytes: usize, heartbeat: Duration, - expires: u64, + expires: Duration, consumer: &'a Consumer, } @@ -1751,7 +1757,7 @@ impl<'a> BatchBuilder<'a> { consumer, batch: 200, max_bytes: 0, - expires: 0, + expires: Duration::ZERO, heartbeat: Duration::default(), } } @@ -1905,7 +1911,7 @@ impl<'a> BatchBuilder<'a> { /// # } /// ``` pub fn expires(mut self, expires: Duration) -> Self { - self.expires = expires.as_nanos().try_into().unwrap(); + self.expires = expires; self } @@ -1959,8 +1965,8 @@ pub struct BatchConfig { pub batch: usize, /// The optional number of nanoseconds that the server will store this next request for /// before forgetting about the pending batch size. - #[serde(skip_serializing_if = "Option::is_none")] - pub expires: Option, + #[serde(skip_serializing_if = "Option::is_none", with = "serde_nanos")] + pub expires: Option, /// This optionally causes the server not to store this pending request at all, but when there are no /// messages to deliver will send a nil bytes message with a Status header of 404, this way you /// can know when you reached the end of the stream for example. A 409 is returned if the