diff --git a/async-nats/src/lib.rs b/async-nats/src/lib.rs index 3288e2a1a..5d2a77212 100644 --- a/async-nats/src/lib.rs +++ b/async-nats/src/lib.rs @@ -137,6 +137,7 @@ use std::slice; use std::str::{self, FromStr}; use std::task::{Context, Poll}; use tokio::io::ErrorKind; +use tokio::time::sleep; use tokio::time::{interval, Duration, Interval, MissedTickBehavior}; use url::{Host, Url}; @@ -322,7 +323,7 @@ pub(crate) struct ConnectionHandler { pending_pings: usize, info_sender: tokio::sync::watch::Sender, ping_interval: Interval, - flush_interval: Interval, + flush_period: Duration, } impl ConnectionHandler { @@ -336,9 +337,6 @@ impl ConnectionHandler { let mut ping_interval = interval(ping_period); ping_interval.set_missed_tick_behavior(MissedTickBehavior::Delay); - let mut flush_interval = interval(flush_period); - flush_interval.set_missed_tick_behavior(MissedTickBehavior::Skip); - ConnectionHandler { connection, connector, @@ -346,7 +344,7 @@ impl ConnectionHandler { pending_pings: 0, info_sender, ping_interval, - flush_interval, + flush_period, } } @@ -354,6 +352,9 @@ impl ConnectionHandler { &mut self, mut receiver: mpsc::Receiver, ) -> Result<(), io::Error> { + let flush_timeout = sleep(self.flush_period).fuse(); + futures::pin_mut!(flush_timeout); + loop { select! { _ = self.ping_interval.tick().fuse() => { @@ -374,21 +375,23 @@ impl ConnectionHandler { self.handle_flush().await?; }, - _ = self.flush_interval.tick().fuse() => { + _ = flush_timeout => { if let Err(_err) = self.handle_flush().await { self.handle_disconnect().await?; } }, - maybe_command = receiver.recv().fuse() => { - match maybe_command { - Some(command) => if let Err(err) = self.handle_command(command).await { + maybe_command = receiver.recv().fuse() => match maybe_command { + Some(command) => { + flush_timeout.set(sleep(self.flush_period).fuse()); + + if let Err(err) = self.handle_command(command).await { error!("error handling command {}", err); } - None => { - break; - } } - } + None => { + break; + } + }, maybe_op_result = self.connection.read_op().fuse() => { match maybe_op_result { @@ -506,7 +509,6 @@ impl ConnectionHandler { async fn handle_flush(&mut self) -> Result<(), io::Error> { self.connection.flush().await?; - self.flush_interval.reset(); Ok(()) }