Skip to content

Commit

Permalink
Simplify pull consumer idle heartbeat
Browse files Browse the repository at this point in the history
Signed-off-by: Tomasz Pietrek <[email protected]>
  • Loading branch information
Jarema committed Jul 21, 2023
1 parent d2abf31 commit 45a2391
Showing 1 changed file with 12 additions and 14 deletions.
26 changes: 12 additions & 14 deletions async-nats/src/jetstream/consumer/pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1046,22 +1046,20 @@ impl futures::Stream for Stream {
}

if !self.batch_config.idle_heartbeat.is_zero() {
trace!("setting hearbeats");
let timeout = self.batch_config.idle_heartbeat.saturating_mul(2);
self.heartbeat_timeout
.get_or_insert_with(|| Box::pin(tokio::time::sleep(timeout)));

trace!("checking idle hearbeats");
if let Some(hearbeat) = self.heartbeat_timeout.as_mut() {
match hearbeat.poll_unpin(cx) {
Poll::Ready(_) => {
self.heartbeat_timeout = None;
return Poll::Ready(Some(Err(MessagesError::new(
MessagesErrorKind::MissingHeartbeat,
))));
}
Poll::Pending => (),
let timeout = self.batch_config.idle_heartbeat.saturating_mul(2);
match self
.heartbeat_timeout
.get_or_insert_with(|| Box::pin(tokio::time::sleep(timeout)))
.poll_unpin(cx)
{
Poll::Ready(_) => {
self.heartbeat_timeout = None;
return Poll::Ready(Some(Err(MessagesError::new(
MessagesErrorKind::MissingHeartbeat,
))));
}
Poll::Pending => (),
}
}

Expand Down

0 comments on commit 45a2391

Please sign in to comment.