Skip to content

Commit

Permalink
Replace flush interval with a fused sleep
Browse files Browse the repository at this point in the history
  • Loading branch information
caspervonb committed Jul 14, 2023
1 parent cb0e15a commit 66806eb
Showing 1 changed file with 16 additions and 14 deletions.
30 changes: 16 additions & 14 deletions async-nats/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ use std::slice;
use std::str::{self, FromStr};
use std::task::{Context, Poll};
use tokio::io::ErrorKind;
use tokio::time::sleep;
use tokio::time::{interval, Duration, Interval, MissedTickBehavior};
use url::{Host, Url};

Expand Down Expand Up @@ -322,7 +323,7 @@ pub(crate) struct ConnectionHandler {
pending_pings: usize,
info_sender: tokio::sync::watch::Sender<ServerInfo>,
ping_interval: Interval,
flush_interval: Interval,
flush_period: Duration,
}

impl ConnectionHandler {
Expand All @@ -336,24 +337,24 @@ impl ConnectionHandler {
let mut ping_interval = interval(ping_period);
ping_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);

let mut flush_interval = interval(flush_period);
flush_interval.set_missed_tick_behavior(MissedTickBehavior::Skip);

ConnectionHandler {
connection,
connector,
subscriptions: HashMap::new(),
pending_pings: 0,
info_sender,
ping_interval,
flush_interval,
flush_period,
}
}

pub(crate) async fn process(
&mut self,
mut receiver: mpsc::Receiver<Command>,
) -> Result<(), io::Error> {
let flush_timeout = sleep(self.flush_period).fuse();
futures::pin_mut!(flush_timeout);

loop {
select! {
_ = self.ping_interval.tick().fuse() => {
Expand All @@ -374,21 +375,23 @@ impl ConnectionHandler {
self.handle_flush().await?;

},
_ = self.flush_interval.tick().fuse() => {
_ = flush_timeout => {
if let Err(_err) = self.handle_flush().await {
self.handle_disconnect().await?;
}
},
maybe_command = receiver.recv().fuse() => {
match maybe_command {
Some(command) => if let Err(err) = self.handle_command(command).await {
maybe_command = receiver.recv().fuse() => match maybe_command {
Some(command) => {
flush_timeout.set(sleep(self.flush_period).fuse());

if let Err(err) = self.handle_command(command).await {
error!("error handling command {}", err);
}
None => {
break;
}
}
}
None => {
break;
}
},

maybe_op_result = self.connection.read_op().fuse() => {
match maybe_op_result {
Expand Down Expand Up @@ -506,7 +509,6 @@ impl ConnectionHandler {

async fn handle_flush(&mut self) -> Result<(), io::Error> {
self.connection.flush().await?;
self.flush_interval.reset();

Ok(())
}
Expand Down

0 comments on commit 66806eb

Please sign in to comment.