Skip to content

Commit

Permalink
Remove max_in_flight
Browse files Browse the repository at this point in the history
Should be added using middleware as per #1.
  • Loading branch information
jonhoo committed Dec 14, 2018
1 parent 95f3567 commit 12cfaf7
Show file tree
Hide file tree
Showing 6 changed files with 14 additions and 154 deletions.
56 changes: 4 additions & 52 deletions src/multiplex/client.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use crate::MakeTransport;
use futures::future;
use futures::sync::oneshot;
use futures::{Async, AsyncSink, Future, Poll, Sink, Stream};
use std::collections::VecDeque;
Expand Down Expand Up @@ -36,7 +35,6 @@ pub trait Transport<Request>: Sink + Stream + TagStore<Request, <Self as Stream>
pub struct Maker<NT, Request> {
t_maker: NT,
_req: PhantomData<Request>,
in_flight: Option<usize>,
}

impl<NT, Request> Maker<NT, Request> {
Expand All @@ -45,15 +43,8 @@ impl<NT, Request> Maker<NT, Request> {
Maker {
t_maker: t,
_req: PhantomData,
in_flight: None,
}
}

/// Limit each new `Client` instance to `in_flight` pending requests.
pub fn with_limit(mut self, in_flight: usize) -> Self {
self.in_flight = Some(in_flight);
self
}
}

/// A `Future` that will resolve into a `Buffer<Client<T::Transport>>`.
Expand All @@ -62,7 +53,6 @@ where
NT: MakeTransport<Target, Request>,
{
maker: Option<NT::Future>,
in_flight: Option<usize>,
}

/// A failure to spawn a new `Client`.
Expand Down Expand Up @@ -96,11 +86,7 @@ where
None => unreachable!("poll called after future resolved"),
Some(mut fut) => match fut.poll().map_err(SpawnError::Inner)? {
Async::Ready(t) => {
let c = if let Some(f) = self.in_flight {
Client::with_limit(t, f)
} else {
Client::new(t)
};
let c = Client::new(t);

Ok(Async::Ready(
Buffer::new_direct(c, 0, &DefaultExecutor::current())
Expand Down Expand Up @@ -133,7 +119,6 @@ where
fn call(&mut self, target: Target) -> Self::Future {
NewSpawnedClientFuture {
maker: Some(self.t_maker.make_transport(target)),
in_flight: self.in_flight.clone(),
}
}

Expand All @@ -155,7 +140,6 @@ where
responses: VecDeque<(T::Tag, oneshot::Sender<T::Item>)>,
transport: T,

max_in_flight: Option<usize>,
in_flight: usize,
finish: bool,

Expand Down Expand Up @@ -269,24 +253,6 @@ where
requests: VecDeque::default(),
responses: VecDeque::default(),
transport,
max_in_flight: None,
in_flight: 0,
error: PhantomData::<E>,
finish: false,
}
}

/// Construct a new [`Client`] over the given `transport` with a maxmimum limit on the number
/// of in-flight requests.
///
/// Note that setting the limit to 1 implies that for each `Request`, the `Response` must be
/// received before another request is sent on the same transport.
pub fn with_limit(transport: T, max_in_flight: usize) -> Self {
Client {
requests: VecDeque::with_capacity(max_in_flight),
responses: VecDeque::with_capacity(max_in_flight),
transport,
max_in_flight: Some(max_in_flight),
in_flight: 0,
error: PhantomData::<E>,
finish: false,
Expand All @@ -307,17 +273,7 @@ where
type Future = Box<Future<Item = Self::Response, Error = Self::Error> + Send>;

fn poll_ready(&mut self) -> Result<Async<()>, Self::Error> {
if let Some(mif) = self.max_in_flight {
if self.in_flight + self.requests.len() >= mif {
// not enough request slots -- need to handle some outstanding
self.poll_service()?;

if self.in_flight + self.requests.len() >= mif {
// that didn't help -- wait to be awoken again
return Ok(Async::NotReady);
}
}
}
// NOTE: it'd be great if we could poll_ready the Sink, but alas..
return Ok(Async::Ready(()));
}

Expand Down Expand Up @@ -409,18 +365,14 @@ where
}

fn call(&mut self, mut req: T::SinkItem) -> Self::Future {
if let Some(mif) = self.max_in_flight {
if self.in_flight + self.requests.len() >= mif {
return Box::new(future::err(E::from(Error::TransportFull)));
}
}

assert!(!self.finish, "invoked call() after poll_close()");

let (tx, rx) = oneshot::channel();
let id = self.transport.assign_tag(&mut req);
self.requests.push_back(req);
self.responses.push_back((id, tx));

// TODO: one day, we'll use existentials here
Box::new(rx.map_err(|_| E::from(Error::ClientDropped)))
}
}
Expand Down
19 changes: 2 additions & 17 deletions src/multiplex/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ where
transport: T,
service: S,

max_in_flight: Option<usize>,
in_flight: usize,
finish: bool,
}
Expand Down Expand Up @@ -133,17 +132,12 @@ where
/// Requests are passed to `Service::call` as they arrive, and responses are written back to
/// the underlying `transport` in the order that they complete. If a later request completes
/// before an earlier request, its response is still sent immediately.
///
/// If `limit` is `Some(n)`, at most `n` requests are allowed to be pending at any given point
/// in time.
pub fn multiplexed(transport: T, service: S, limit: Option<usize>) -> Self {
let cap = limit.unwrap_or(16);
pub fn new(transport: T, service: S) -> Self {
Server {
responses: VecDeque::with_capacity(cap),
responses: VecDeque::new(),
pending: FuturesUnordered::new(),
transport,
service,
max_in_flight: limit,
in_flight: 0,
finish: false,
}
Expand Down Expand Up @@ -248,15 +242,6 @@ where
return Ok(Async::NotReady);
}

// we can't send any more, so see if there are more requests for us
if let Some(max) = self.max_in_flight {
if self.in_flight >= max {
// we can't accept any more requests until we finish some responses
return Ok(Async::NotReady);
}
}

// we are allowed to receive another request
// is the service ready?
try_ready!(self.service.poll_ready().map_err(Error::from_service_error));

Expand Down
46 changes: 2 additions & 44 deletions src/pipeline/client.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use crate::MakeTransport;
use futures::future;
use futures::sync::oneshot;
use futures::{Async, AsyncSink, Future, Poll, Sink, Stream};
use std::collections::VecDeque;
Expand All @@ -23,7 +22,6 @@ where
responses: VecDeque<oneshot::Sender<T::Item>>,
transport: T,

max_in_flight: Option<usize>,
in_flight: usize,
finish: bool,

Expand Down Expand Up @@ -62,7 +60,6 @@ where
NT: MakeTransport<Target, Request>,
{
maker: Option<NT::Future>,
in_flight: Option<usize>,
}

/// A failure to spawn a new `Client`.
Expand Down Expand Up @@ -92,11 +89,7 @@ where
None => unreachable!("poll called after future resolved"),
Some(mut fut) => match fut.poll().map_err(SpawnError::Inner)? {
Async::Ready(t) => {
let c = if let Some(f) = self.in_flight {
Client::with_limit(t, f)
} else {
Client::new(t)
};
let c = Client::new(t);

Ok(Async::Ready(
Buffer::new_direct(c, 0, &DefaultExecutor::current())
Expand Down Expand Up @@ -128,7 +121,6 @@ where
fn call(&mut self, target: Target) -> Self::Future {
NewSpawnedClientFuture {
maker: Some(self.t_maker.make_transport(target)),
in_flight: self.in_flight.clone(),
}
}

Expand Down Expand Up @@ -243,24 +235,6 @@ where
requests: VecDeque::default(),
responses: VecDeque::default(),
transport,
max_in_flight: None,
in_flight: 0,
error: PhantomData::<E>,
finish: false,
}
}

/// Construct a new [`Client`] over the given `transport` with a maxmimum limit on the number
/// of in-flight requests.
///
/// Note that setting the limit to 1 implies that for each `Request`, the `Response` must be
/// received before another request is sent on the same transport.
pub fn with_limit(transport: T, max_in_flight: usize) -> Self {
Client {
requests: VecDeque::with_capacity(max_in_flight),
responses: VecDeque::with_capacity(max_in_flight),
transport,
max_in_flight: Some(max_in_flight),
in_flight: 0,
error: PhantomData::<E>,
finish: false,
Expand All @@ -283,17 +257,7 @@ where
type Future = Box<Future<Item = Self::Response, Error = Self::Error> + Send>;

fn poll_ready(&mut self) -> Result<Async<()>, Self::Error> {
if let Some(mif) = self.max_in_flight {
if self.in_flight + self.requests.len() >= mif {
// not enough request slots -- need to handle some outstanding
self.poll_service()?;

if self.in_flight + self.requests.len() >= mif {
// that didn't help -- wait to be awoken again
return Ok(Async::NotReady);
}
}
}
// NOTE: it'd be great if we could poll_ready the Sink, but alas..
return Ok(Async::Ready(()));
}

Expand Down Expand Up @@ -373,12 +337,6 @@ where
}

fn call(&mut self, req: T::SinkItem) -> Self::Future {
if let Some(mif) = self.max_in_flight {
if self.in_flight + self.requests.len() >= mif {
return Box::new(future::err(E::from(Error::TransportFull)));
}
}

assert!(!self.finish, "invoked call() after poll_close()");

let (tx, rx) = oneshot::channel();
Expand Down
36 changes: 2 additions & 34 deletions src/pipeline/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ where
transport: T,
service: S,

max_in_flight: Option<usize>,
in_flight: usize,
finish: bool,
}
Expand Down Expand Up @@ -126,40 +125,18 @@ where
T: Sink + Stream,
S: DirectService<T::Item>,
{
/// Construct a new [`Server`] over the given `transport` that services requests using the
/// given `service`.
///
/// With this constructor, all requests are handled one at a time, and the next request is not
/// sent to the `Service` until the previous request has been fully completed. To allow
/// pipelined requests, use [`Server::pipelined`].
pub fn new(transport: T, service: S) -> Self {
Server {
responses: VecDeque::default(),
transport,
service,
max_in_flight: Some(1),
in_flight: 0,
finish: false,
}
}

/// Construct a new [`Server`] over the given `transport` that services requests using the
/// given `service`.
///
/// Requests are passed to `Service::call` as they arrive, and responses are written back to
/// the underlying `transport` in the order that the requests arrive. If a later request
/// completes before an earlier request, its result will be buffered until all preceeding
/// requests have been sent.
///
/// If `limit` is `Some(n)`, at most `n` requests are allowed to be pending at any given point
/// in time.
pub fn pipelined(transport: T, service: S, limit: Option<usize>) -> Self {
let cap = limit.unwrap_or(16);
pub fn new(transport: T, service: S) -> Self {
Server {
responses: VecDeque::with_capacity(cap),
responses: VecDeque::new(),
transport,
service,
max_in_flight: limit,
in_flight: 0,
finish: false,
}
Expand Down Expand Up @@ -270,15 +247,6 @@ where
return Ok(Async::NotReady);
}

// we can't send any more, so see if there are more requests for us
if let Some(max) = self.max_in_flight {
if self.in_flight >= max {
// we can't accept any more requests until we finish some responses
return Ok(Async::NotReady);
}
}

// we are allowed to receive another request
// is the service ready?
try_ready!(self.service.poll_ready().map_err(Error::from_service_error));

Expand Down
4 changes: 2 additions & 2 deletions tests/multiplex/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use async_bincode::*;
use crate::{EchoService, PanicError, Request, Response};
use async_bincode::*;
use slab::Slab;
use tokio;
use tokio::prelude::*;
Expand Down Expand Up @@ -42,7 +42,7 @@ fn integration() {
.map(AsyncBincodeStream::from)
.map(AsyncBincodeStream::for_async)
.map_err(PanicError::from)
.map(|stream| Server::multiplexed(stream, EchoService, None));
.map(|stream| Server::new(stream, EchoService));

let mut rt = tokio::runtime::Runtime::new().unwrap();
rt.spawn(
Expand Down
7 changes: 2 additions & 5 deletions tests/pipeline/client.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use async_bincode::*;
use crate::{PanicError, Request, Response};
use async_bincode::*;
use tokio;
use tokio::prelude::*;
use tokio_tower::pipeline::Client;
Expand All @@ -15,10 +15,7 @@ fn it_works() {
.map(AsyncBincodeStream::from)
.map(AsyncBincodeStream::for_async)
.map_err(PanicError::from)
.map(|s| {
// need to limit to one-in-flight for poll_ready to be sufficient to drive Service
Client::with_limit(s, 1)
});
.map(Client::new);

let mut rt = tokio::runtime::Runtime::new().unwrap();
rt.spawn(
Expand Down

0 comments on commit 12cfaf7

Please sign in to comment.