Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Graceful shutdown2 #746

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ dist/
npm-debug.log*
Cargo.lock
.DS_Store
.idea
.idea
.vscode/launch.json
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ async-sse = "4.0.1"
async-std = { version = "1.6.5", features = ["unstable"] }
async-trait = "0.1.41"
femme = { version = "2.1.1", optional = true }
futures = "0.3.7"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably don't want to depend on futures directly since it adds about a minute of compile time in certain setups. Instead it's preferable to use smaller crates and/or async-std directly.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Futures is required for "select": https://rust-lang.github.io/async-book/06_multiple_futures/03_select.html

Honestly, I don't see a way to do this without "select," otherwise you end up waiting for a final incoming http request before ending the loop accepting incoming requests. (See my comment about when I reviewed stop-token.)

Maybe it's worth trying to implement something like select manually, but, IMO, that might be a fools' errand.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think what we would prefer is futures_lite::future::race.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great advice!

Let me see what I can do. It'll probably be a few days before I can update this PR.

futures-util = "0.3.6"
http-client = { version = "6.1.0", default-features = false }
http-types = "2.5.0"
Expand Down
60 changes: 60 additions & 0 deletions src/cancelation_token.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll, Waker};

#[derive(Debug)]
pub struct CancelationToken {
shared_state: Arc<Mutex<CancelationTokenState>>
}

#[derive(Debug)]
struct CancelationTokenState {
canceled: bool,
waker: Option<Waker>
}

/// Future that allows gracefully shutting down the server
impl CancelationToken {
pub fn new() -> CancelationToken {
CancelationToken {
shared_state: Arc::new(Mutex::new(CancelationTokenState {
canceled: false,
waker: None
}))
}
}

/// Call to shut down the server
pub fn complete(&self) {
let mut shared_state = self.shared_state.lock().unwrap();

shared_state.canceled = true;
if let Some(waker) = shared_state.waker.take() {
waker.wake()
}
}
}

impl Future for CancelationToken {
type Output = ();

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut shared_state = self.shared_state.lock().unwrap();

if shared_state.canceled {
Poll::Ready(())
} else {
shared_state.waker = Some(cx.waker().clone());
Poll::Pending
}
}
}

impl Clone for CancelationToken {
fn clone(&self) -> Self {
CancelationToken {
shared_state: self.shared_state.clone()
}
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The stop-token does much the same, and we're looking to integrate it into async-std in the near future. If possible it'd be great to use that crate directly instead of defining our own here.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing I like about stop-token is that it supports streams.

However, the authors describe it as "experimental." Upon closer inspection:

I should point out that the library that I made with cancelation token has full documentation and tests: https://crates.io/crates/sync-tokens

2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
#![doc(html_favicon_url = "https://yoshuawuyts.com/assets/http-rs/favicon.ico")]
#![doc(html_logo_url = "https://yoshuawuyts.com/assets/http-rs/logo-rounded.png")]

mod cancelation_token;
#[cfg(feature = "cookies")]
mod cookies;
mod endpoint;
Expand Down Expand Up @@ -88,6 +89,7 @@ pub mod utils;
#[cfg(feature = "sessions")]
pub mod sessions;

pub use cancelation_token::CancelationToken;
pub use endpoint::Endpoint;
pub use middleware::{Middleware, Next};
pub use redirect::Redirect;
Expand Down
20 changes: 16 additions & 4 deletions src/listener/concurrent_listener.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::listener::{Listener, ToListener};
use crate::Server;
use crate::{CancelationToken, Server};

use std::fmt::{self, Debug, Display, Formatter};

use async_std::io;
use async_std::{io, task};
use futures_util::stream::{futures_unordered::FuturesUnordered, StreamExt};

/// ConcurrentListener allows tide to listen on any number of transports
Expand Down Expand Up @@ -79,17 +79,29 @@ impl<State: Clone + Send + Sync + 'static> ConcurrentListener<State> {

#[async_trait::async_trait]
impl<State: Clone + Send + Sync + 'static> Listener<State> for ConcurrentListener<State> {
async fn listen(&mut self, app: Server<State>) -> io::Result<()> {
async fn listen(&mut self, app: Server<State>, cancelation_token: CancelationToken) -> io::Result<()> {
let mut futures_unordered = FuturesUnordered::new();

let mut cancelation_tokens = Vec::new();

for listener in self.0.iter_mut() {
let app = app.clone();
futures_unordered.push(listener.listen(app));
let sub_cancelation_token = CancelationToken::new();
futures_unordered.push(listener.listen(app, sub_cancelation_token.clone()));
cancelation_tokens.push(sub_cancelation_token);
}

task::spawn(async move {
cancelation_token.await;
for sub_cancelation_token in cancelation_tokens.iter_mut() {
sub_cancelation_token.complete();
}
});

while let Some(result) = futures_unordered.next().await {
result?;
}

Ok(())
}
}
Expand Down
20 changes: 16 additions & 4 deletions src/listener/failover_listener.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::listener::{Listener, ToListener};
use crate::Server;
use crate::{CancelationToken, Server};

use std::fmt::{self, Debug, Display, Formatter};

use async_std::io;
use async_std::{io, task};

/// FailoverListener allows tide to attempt to listen in a sequential
/// order to any number of ports/addresses. The first successful
Expand Down Expand Up @@ -81,10 +81,14 @@ impl<State: Clone + Send + Sync + 'static> FailoverListener<State> {

#[async_trait::async_trait]
impl<State: Clone + Send + Sync + 'static> Listener<State> for FailoverListener<State> {
async fn listen(&mut self, app: Server<State>) -> io::Result<()> {
async fn listen(&mut self, app: Server<State>, cancelation_token: CancelationToken) -> io::Result<()> {

let mut cancelation_tokens = Vec::new();

for listener in self.0.iter_mut() {
let app = app.clone();
match listener.listen(app).await {
let sub_cancelation_token = CancelationToken::new();
match listener.listen(app, sub_cancelation_token.clone()).await {
Ok(_) => return Ok(()),
Err(e) => {
crate::log::info!("unable to listen", {
Expand All @@ -93,8 +97,16 @@ impl<State: Clone + Send + Sync + 'static> Listener<State> for FailoverListener<
});
}
}
cancelation_tokens.push(sub_cancelation_token);
}

task::spawn(async move {
cancelation_token.await;
for sub_cancelation_token in cancelation_tokens.iter_mut() {
sub_cancelation_token.complete();
}
});

Err(io::Error::new(
io::ErrorKind::AddrNotAvailable,
"unable to bind to any supplied listener spec",
Expand Down
4 changes: 2 additions & 2 deletions src/listener/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ mod to_listener_impls;
#[cfg(all(unix, feature = "h1-server"))]
mod unix_listener;

use crate::Server;
use crate::{CancelationToken, Server};
use async_std::io;

pub use concurrent_listener::ConcurrentListener;
Expand All @@ -37,7 +37,7 @@ pub trait Listener<State: 'static>:
/// This is the primary entrypoint for the Listener trait. listen
/// is called exactly once, and is expected to spawn tasks for
/// each incoming connection.
async fn listen(&mut self, app: Server<State>) -> io::Result<()>;
async fn listen(&mut self, app: Server<State>, cancelation_token: CancelationToken) -> io::Result<()>;
}

/// crate-internal shared logic used by tcp and unix listeners to
Expand Down
8 changes: 4 additions & 4 deletions src/listener/parsed_listener.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#[cfg(unix)]
use super::UnixListener;
use super::{Listener, TcpListener};
use crate::Server;
use crate::{CancelationToken, Server};

use async_std::io;
use std::fmt::{self, Display, Formatter};
Expand Down Expand Up @@ -32,11 +32,11 @@ impl Display for ParsedListener {

#[async_trait::async_trait]
impl<State: Clone + Send + Sync + 'static> Listener<State> for ParsedListener {
async fn listen(&mut self, app: Server<State>) -> io::Result<()> {
async fn listen(&mut self, app: Server<State>, cancelation_token: CancelationToken) -> io::Result<()> {
match self {
#[cfg(unix)]
Self::Unix(u) => u.listen(app).await,
Self::Tcp(t) => t.listen(app).await,
Self::Unix(u) => u.listen(app, cancelation_token).await,
Self::Tcp(t) => t.listen(app, cancelation_token).await,
}
}
}
38 changes: 24 additions & 14 deletions src/listener/tcp_listener.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use super::is_transient_error;

use crate::listener::Listener;
use crate::{log, Server};
use crate::{CancelationToken, log, Server};

use std::fmt::{self, Display, Formatter};

use async_std::net::{self, SocketAddr, TcpStream};
use async_std::prelude::*;
use async_std::{io, task};
use futures::future::{self, Either};

/// This represents a tide [Listener](crate::listener::Listener) that
/// wraps an [async_std::net::TcpListener]. It is implemented as an
Expand Down Expand Up @@ -70,28 +71,37 @@ fn handle_tcp<State: Clone + Send + Sync + 'static>(app: Server<State>, stream:

#[async_trait::async_trait]
impl<State: Clone + Send + Sync + 'static> Listener<State> for TcpListener {
async fn listen(&mut self, app: Server<State>) -> io::Result<()> {
async fn listen(&mut self, app: Server<State>, cancelation_token: CancelationToken) -> io::Result<()> {
self.connect().await?;
let listener = self.listener()?;
crate::log::info!("Server listening on {}", self);

let mut incoming = listener.incoming();

while let Some(stream) = incoming.next().await {
match stream {
Err(ref e) if is_transient_error(e) => continue,
Err(error) => {
let delay = std::time::Duration::from_millis(500);
crate::log::error!("Error: {}. Pausing for {:?}.", error, delay);
task::sleep(delay).await;
continue;
}

Ok(stream) => {
handle_tcp(app.clone(), stream);
'serve_loop:
while let Either::Left(result) = future::select(incoming.next(), cancelation_token.clone()).await {
match result.0 {
Some(stream) => {
match stream {
Err(ref e) if is_transient_error(e) => continue,
Err(error) => {
let delay = std::time::Duration::from_millis(500);
crate::log::error!("Error: {}. Pausing for {:?}.", error, delay);
task::sleep(delay).await;
continue;
}

Ok(stream) => {
handle_tcp(app.clone(), stream);
}
};
},
None => {
break 'serve_loop;
}
};
}

Ok(())
}
}
Expand Down
37 changes: 23 additions & 14 deletions src/listener/unix_listener.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use super::is_transient_error;

use crate::listener::Listener;
use crate::{log, Server};
use crate::{CancelationToken, log, Server};

use std::fmt::{self, Display, Formatter};

use async_std::os::unix::net::{self, SocketAddr, UnixStream};
use async_std::prelude::*;
use async_std::{io, path::PathBuf, task};
use futures::future::{self, Either};

/// This represents a tide [Listener](crate::listener::Listener) that
/// wraps an [async_std::os::unix::net::UnixListener]. It is implemented as an
Expand Down Expand Up @@ -83,24 +84,32 @@ fn handle_unix<State: Clone + Send + Sync + 'static>(app: Server<State>, stream:

#[async_trait::async_trait]
impl<State: Clone + Send + Sync + 'static> Listener<State> for UnixListener {
async fn listen(&mut self, app: Server<State>) -> io::Result<()> {
async fn listen(&mut self, app: Server<State>, cancelation_token: CancelationToken) -> io::Result<()> {
self.connect().await?;
crate::log::info!("Server listening on {}", self);
let listener = self.listener()?;
let mut incoming = listener.incoming();

while let Some(stream) = incoming.next().await {
match stream {
Err(ref e) if is_transient_error(e) => continue,
Err(error) => {
let delay = std::time::Duration::from_millis(500);
crate::log::error!("Error: {}. Pausing for {:?}.", error, delay);
task::sleep(delay).await;
continue;
}

Ok(stream) => {
handle_unix(app.clone(), stream);
'serve_loop:
while let Either::Left(result) = future::select(incoming.next(), cancelation_token.clone()).await {
match result.0 {
Some(stream) => {
match stream {
Err(ref e) if is_transient_error(e) => continue,
Err(error) => {
let delay = std::time::Duration::from_millis(500);
crate::log::error!("Error: {}. Pausing for {:?}.", error, delay);
task::sleep(delay).await;
continue;
}

Ok(stream) => {
handle_unix(app.clone(), stream);
}
};
},
None => {
break 'serve_loop;
}
};
}
Expand Down
9 changes: 7 additions & 2 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::listener::{Listener, ToListener};
use crate::log;
use crate::middleware::{Middleware, Next};
use crate::router::{Router, Selection};
use crate::{Endpoint, Request, Route};
use crate::{CancelationToken, Endpoint, Request, Route};

/// An HTTP server.
///
Expand Down Expand Up @@ -187,7 +187,12 @@ impl<State: Clone + Send + Sync + 'static> Server<State> {

/// Asynchronously serve the app with the supplied listener. For more details, see [Listener] and [ToListener]
pub async fn listen<TL: ToListener<State>>(self, listener: TL) -> io::Result<()> {
listener.to_listener()?.listen(self).await
self.listen_with_cancelation_token(listener, CancelationToken::new()).await
}

/// Asynchronously serve the app with the supplied listener and canelation token. For more details, see [Listener] and [ToListener]
pub async fn listen_with_cancelation_token<TL: ToListener<State>>(self, listener: TL, cancelation_token: CancelationToken) -> io::Result<()> {
listener.to_listener()?.listen(self, cancelation_token).await
}

/// Respond to a `Request` with a `Response`.
Expand Down
Loading