Skip to content

Commit

Permalink
Use Duration in a few more places
Browse files Browse the repository at this point in the history
  • Loading branch information
paolobarbolini authored and Jarema committed Jul 28, 2023
1 parent 9677baf commit eb4d492
Showing 1 changed file with 28 additions and 22 deletions.
50 changes: 28 additions & 22 deletions async-nats/src/jetstream/consumer/pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@
// limitations under the License.

use bytes::Bytes;
use futures::{future::BoxFuture, FutureExt, StreamExt, TryFutureExt};
use futures::{
future::{BoxFuture, Either},
FutureExt, StreamExt, TryFutureExt,
};

#[cfg(feature = "server_2_10")]
use std::collections::HashMap;
Expand Down Expand Up @@ -82,7 +85,7 @@ impl Consumer<Config> {
Stream::stream(
BatchConfig {
batch: 200,
expires: Some(Duration::from_secs(30).as_nanos().try_into().unwrap()),
expires: Some(Duration::from_secs(30)),
no_wait: false,
max_bytes: 0,
idle_heartbeat: Duration::from_secs(15),
Expand Down Expand Up @@ -315,7 +318,7 @@ impl Consumer<Config> {

let request = serde_json::to_vec(&BatchConfig {
batch,
expires: Some(Duration::from_secs(60).as_nanos().try_into().unwrap()),
expires: Some(Duration::from_secs(60)),
..Default::default()
})
.map(Bytes::from)
Expand Down Expand Up @@ -345,9 +348,9 @@ impl<'a> Batch {
let subscription = consumer.context.client.subscribe(inbox.clone()).await?;
consumer.request_batch(batch, inbox.clone()).await?;

let sleep = batch.expires.map(|e| {
let sleep = batch.expires.map(|expires| {
Box::pin(tokio::time::sleep(
Duration::from_nanos(e).saturating_add(Duration::from_secs(5)),
expires.saturating_add(Duration::from_secs(5)),
))
});

Expand Down Expand Up @@ -556,7 +559,7 @@ impl<'a> Consumer<OrderedConfig> {
let stream = Stream::stream(
BatchConfig {
batch: 500,
expires: Some(Duration::from_secs(30).as_nanos().try_into().unwrap()),
expires: Some(Duration::from_secs(30)),
no_wait: false,
max_bytes: 0,
idle_heartbeat: Duration::from_secs(15),
Expand Down Expand Up @@ -870,13 +873,16 @@ impl Stream {
// this is just in edge case of missing response for some reason.
let expires = batch_config
.expires
.map(|expires| match expires {
0 => futures::future::Either::Left(future::pending()),
t => futures::future::Either::Right(tokio::time::sleep(
Duration::from_nanos(t).saturating_add(Duration::from_secs(5)),
)),
.map(|expires| {
if expires.is_zero() {
Either::Left(future::pending())
} else {
Either::Right(tokio::time::sleep(
expires.saturating_add(Duration::from_secs(5)),
))
}
})
.unwrap_or_else(|| futures::future::Either::Left(future::pending()));
.unwrap_or_else(|| Either::Left(future::pending()));
// Need to check previous state, as `changed` will always fire on first
// call.
let prev_state = context.client.state.borrow().to_owned();
Expand Down Expand Up @@ -1234,7 +1240,7 @@ pub struct StreamBuilder<'a> {
batch: usize,
max_bytes: usize,
heartbeat: Duration,
expires: u64,
expires: Duration,
consumer: &'a Consumer<Config>,
}

Expand All @@ -1244,7 +1250,7 @@ impl<'a> StreamBuilder<'a> {
consumer,
batch: 200,
max_bytes: 0,
expires: Duration::from_secs(30).as_nanos().try_into().unwrap(),
expires: Duration::from_secs(30),
heartbeat: Duration::default(),
}
}
Expand Down Expand Up @@ -1406,7 +1412,7 @@ impl<'a> StreamBuilder<'a> {
/// # }
/// ```
pub fn expires(mut self, expires: Duration) -> Self {
self.expires = expires.as_nanos().try_into().unwrap();
self.expires = expires;
self
}

Expand Down Expand Up @@ -1494,7 +1500,7 @@ pub struct FetchBuilder<'a> {
batch: usize,
max_bytes: usize,
heartbeat: Duration,
expires: Option<u64>,
expires: Option<Duration>,
consumer: &'a Consumer<Config>,
}

Expand Down Expand Up @@ -1657,7 +1663,7 @@ impl<'a> FetchBuilder<'a> {
/// # }
/// ```
pub fn expires(mut self, expires: Duration) -> Self {
self.expires = Some(expires.as_nanos().try_into().unwrap());
self.expires = Some(expires);
self
}

Expand Down Expand Up @@ -1741,7 +1747,7 @@ pub struct BatchBuilder<'a> {
batch: usize,
max_bytes: usize,
heartbeat: Duration,
expires: u64,
expires: Duration,
consumer: &'a Consumer<Config>,
}

Expand All @@ -1751,7 +1757,7 @@ impl<'a> BatchBuilder<'a> {
consumer,
batch: 200,
max_bytes: 0,
expires: 0,
expires: Duration::ZERO,
heartbeat: Duration::default(),
}
}
Expand Down Expand Up @@ -1905,7 +1911,7 @@ impl<'a> BatchBuilder<'a> {
/// # }
/// ```
pub fn expires(mut self, expires: Duration) -> Self {
self.expires = expires.as_nanos().try_into().unwrap();
self.expires = expires;
self
}

Expand Down Expand Up @@ -1959,8 +1965,8 @@ pub struct BatchConfig {
pub batch: usize,
/// The optional number of nanoseconds that the server will store this next request for
/// before forgetting about the pending batch size.
#[serde(skip_serializing_if = "Option::is_none")]
pub expires: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none", with = "serde_nanos")]
pub expires: Option<Duration>,
/// This optionally causes the server not to store this pending request at all, but when there are no
/// messages to deliver will send a nil bytes message with a Status header of 404, this way you
/// can know when you reached the end of the stream for example. A 409 is returned if the
Expand Down

0 comments on commit eb4d492

Please sign in to comment.