refactor(lib): convert to futures 0.2.0-beta (#1470)

This commit is contained in:
Sam Rijs
2018-03-30 07:32:44 +11:00
committed by Sean McArthur
parent 5db85316a1
commit a12f7beed9
34 changed files with 1366 additions and 1347 deletions

View File

@@ -11,9 +11,10 @@ use std::fmt;
use std::marker::PhantomData;
use bytes::Bytes;
use futures::{Async, Future, Poll};
use futures::{Async, Future, FutureExt, Poll};
use futures::future::{self, Either};
use tokio_io::{AsyncRead, AsyncWrite};
use futures::task;
use futures::io::{AsyncRead, AsyncWrite};
use proto;
use proto::body::Entity;
@@ -123,8 +124,8 @@ impl<B> SendRequest<B>
/// Polls to determine whether this sender can be used yet for a request.
///
/// If the associated connection is closed, this returns an Error.
pub fn poll_ready(&mut self) -> Poll<(), ::Error> {
self.dispatch.poll_ready()
pub fn poll_ready(&mut self, cx: &mut task::Context) -> Poll<(), ::Error> {
self.dispatch.poll_ready(cx)
}
pub(super) fn is_closed(&self) -> bool {
@@ -162,7 +163,7 @@ where
/// # use http::header::HOST;
/// # use hyper::client::conn::SendRequest;
/// # use hyper::Body;
/// use futures::Future;
/// use futures::FutureExt;
/// use hyper::Request;
///
/// # fn doc(mut tx: SendRequest<Body>) {
@@ -186,19 +187,19 @@ where
pub fn send_request(&mut self, req: Request<B>) -> ResponseFuture {
let inner = match self.dispatch.send(req) {
Ok(rx) => {
Either::A(rx.then(move |res| {
rx.then(move |res| {
match res {
Ok(Ok(res)) => Ok(res),
Ok(Err(err)) => Err(err),
// this is definite bug if it happens, but it shouldn't happen!
Err(_) => panic!("dispatch dropped without returning error"),
}
}))
}).left()
},
Err(_req) => {
debug!("connection was not ready");
let err = ::Error::new_canceled(Some("connection was not ready"));
Either::B(future::err(err))
future::err(err).right()
}
};
@@ -214,7 +215,7 @@ where
{
let inner = match self.dispatch.try_send(req) {
Ok(rx) => {
Either::A(rx.then(move |res| {
Either::Left(rx.then(move |res| {
match res {
Ok(Ok(res)) => Ok(res),
Ok(Err(err)) => Err(err),
@@ -226,7 +227,7 @@ where
Err(req) => {
debug!("connection was not ready");
let err = ::Error::new_canceled(Some("connection was not ready"));
Either::B(future::err((err, Some(req))))
Either::Right(future::err((err, Some(req))))
}
};
Box::new(inner)
@@ -277,8 +278,8 @@ where
/// upgrade. Once the upgrade is completed, the connection would be "done",
/// but it is not desired to actally shutdown the IO object. Instead you
/// would take it back using `into_parts`.
pub fn poll_without_shutdown(&mut self) -> Poll<(), ::Error> {
self.inner.poll_without_shutdown()
pub fn poll_without_shutdown(&mut self, cx: &mut task::Context) -> Poll<(), ::Error> {
self.inner.poll_without_shutdown(cx)
}
}
@@ -290,8 +291,8 @@ where
type Item = ();
type Error = ::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.inner.poll()
fn poll(&mut self, cx: &mut task::Context) -> Poll<Self::Item, Self::Error> {
self.inner.poll(cx)
}
}
@@ -363,8 +364,8 @@ where
type Item = (SendRequest<B>, Connection<T, B>);
type Error = ::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.inner.poll()
fn poll(&mut self, cx: &mut task::Context) -> Poll<Self::Item, Self::Error> {
self.inner.poll(cx)
.map(|async| {
async.map(|(tx, dispatch)| {
(tx, Connection { inner: dispatch })
@@ -394,8 +395,8 @@ where
>);
type Error = ::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.inner.poll()
fn poll(&mut self, cx: &mut task::Context) -> Poll<Self::Item, Self::Error> {
self.inner.poll(cx)
}
}
@@ -417,7 +418,7 @@ where
>);
type Error = ::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
fn poll(&mut self, _cx: &mut task::Context) -> Poll<Self::Item, Self::Error> {
let io = self.io.take().expect("polled more than once");
let (tx, rx) = dispatch::channel();
let mut conn = proto::Conn::new(io);
@@ -441,8 +442,8 @@ impl Future for ResponseFuture {
type Error = ::Error;
#[inline]
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.inner.poll()
fn poll(&mut self, cx: &mut task::Context) -> Poll<Self::Item, Self::Error> {
self.inner.poll(cx)
}
}

View File

@@ -8,24 +8,21 @@
use std::error::Error as StdError;
use std::fmt;
use std::io;
use std::mem;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use futures::{Future, Poll, Async};
use futures::future::{Executor, ExecuteError};
use futures::sync::oneshot;
use futures_cpupool::{Builder as CpuPoolBuilder};
use futures::{Future, Never, Poll, Async};
use futures::executor::{Executor, SpawnError, ThreadPoolBuilder};
use futures::task;
use futures::io::{AsyncRead, AsyncWrite};
use http::Uri;
use http::uri::Scheme;
use net2::TcpBuilder;
use tokio_io::{AsyncRead, AsyncWrite};
use tokio::reactor::Handle;
use tokio::net::{TcpStream, ConnectFuture};
use executor::CloneBoxedExecutor;
use super::dns;
use self::http_connector::HttpConnectorBlockingTask;
/// Connect to a destination, returning an IO transport.
///
@@ -174,7 +171,7 @@ impl HttpConnector {
/// Takes number of DNS worker threads.
#[inline]
pub fn new(threads: usize, handle: &Handle) -> HttpConnector {
let pool = CpuPoolBuilder::new()
let pool = ThreadPoolBuilder::new()
.name_prefix("hyper-dns")
.pool_size(threads)
.create();
@@ -186,10 +183,10 @@ impl HttpConnector {
/// Takes an executor to run blocking tasks on.
#[inline]
pub fn new_with_executor<E: 'static>(executor: E, handle: &Handle) -> HttpConnector
where E: Executor<HttpConnectorBlockingTask> + Send + Sync
where E: Executor + Clone + Send + Sync
{
HttpConnector {
executor: HttpConnectExecutor(Arc::new(executor)),
executor: HttpConnectExecutor(Box::new(executor)),
enforce_http: true,
handle: handle.clone(),
keep_alive_timeout: None,
@@ -298,7 +295,7 @@ pub struct HttpConnecting {
enum State {
Lazy(HttpConnectExecutor, String, u16),
Resolving(oneshot::SpawnHandle<dns::IpAddrs, io::Error>),
Resolving(dns::Resolving),
Connecting(ConnectingTcp),
Error(Option<io::Error>),
}
@@ -307,11 +304,11 @@ impl Future for HttpConnecting {
type Item = (TcpStream, Connected);
type Error = io::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
fn poll(&mut self, cx: &mut task::Context) -> Poll<Self::Item, Self::Error> {
loop {
let state;
match self.state {
State::Lazy(ref executor, ref mut host, port) => {
State::Lazy(ref mut executor, ref mut host, port) => {
// If the host is already an IP addr (v4 or v6),
// skip resolving the dns and start connecting right away.
if let Some(addrs) = dns::IpAddrs::try_parse(host, port) {
@@ -320,24 +317,19 @@ impl Future for HttpConnecting {
current: None
})
} else {
let host = mem::replace(host, String::new());
let work = dns::Work::new(host, port);
state = State::Resolving(oneshot::spawn(work, executor));
let host = ::std::mem::replace(host, String::new());
state = State::Resolving(dns::Resolving::spawn(host, port, executor));
}
},
State::Resolving(ref mut future) => {
match try!(future.poll()) {
Async::NotReady => return Ok(Async::NotReady),
Async::Ready(addrs) => {
state = State::Connecting(ConnectingTcp {
addrs: addrs,
current: None,
})
}
};
let addrs = try_ready!(future.poll(cx));
state = State::Connecting(ConnectingTcp {
addrs: addrs,
current: None,
});
},
State::Connecting(ref mut c) => {
let sock = try_ready!(c.poll(&self.handle));
let sock = try_ready!(c.poll(cx, &self.handle));
if let Some(dur) = self.keep_alive_timeout {
sock.set_keepalive(Some(dur))?;
@@ -365,11 +357,11 @@ struct ConnectingTcp {
impl ConnectingTcp {
// not a Future, since passing a &Handle to poll
fn poll(&mut self, handle: &Handle) -> Poll<TcpStream, io::Error> {
fn poll(&mut self, cx: &mut task::Context, handle: &Handle) -> Poll<TcpStream, io::Error> {
let mut err = None;
loop {
if let Some(ref mut current) = self.current {
match current.poll() {
match current.poll(cx) {
Ok(ok) => return Ok(ok),
Err(e) => {
trace!("connect error {:?}", e);
@@ -392,37 +384,19 @@ impl ConnectingTcp {
}
}
// Make this Future unnameable outside of this crate.
mod http_connector {
use super::*;
// Blocking task to be executed on a thread pool.
pub struct HttpConnectorBlockingTask {
pub(super) work: oneshot::Execute<dns::Work>
}
impl fmt::Debug for HttpConnectorBlockingTask {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.pad("HttpConnectorBlockingTask")
}
}
impl Future for HttpConnectorBlockingTask {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<(), ()> {
self.work.poll()
}
}
}
#[derive(Clone)]
struct HttpConnectExecutor(Arc<Executor<HttpConnectorBlockingTask> + Send + Sync>);
struct HttpConnectExecutor(Box<CloneBoxedExecutor>);
impl Executor<oneshot::Execute<dns::Work>> for HttpConnectExecutor {
fn execute(&self, future: oneshot::Execute<dns::Work>) -> Result<(), ExecuteError<oneshot::Execute<dns::Work>>> {
self.0.execute(HttpConnectorBlockingTask { work: future })
.map_err(|err| ExecuteError::new(err.kind(), err.into_future().work))
impl Executor for HttpConnectExecutor {
fn spawn(
&mut self,
f: Box<Future<Item = (), Error = Never> + 'static + Send>
) -> Result<(), SpawnError> {
self.0.spawn(f)
}
fn status(&self) -> Result<(), SpawnError> {
self.0.status()
}
}
@@ -430,7 +404,7 @@ impl Executor<oneshot::Execute<dns::Work>> for HttpConnectExecutor {
mod tests {
#![allow(deprecated)]
use std::io;
use futures::Future;
use futures::executor::block_on;
use tokio::runtime::Runtime;
use super::{Connect, Destination, HttpConnector};
@@ -443,7 +417,7 @@ mod tests {
};
let connector = HttpConnector::new(1, runtime.handle());
assert_eq!(connector.connect(dst).wait().unwrap_err().kind(), io::ErrorKind::InvalidInput);
assert_eq!(block_on(connector.connect(dst)).unwrap_err().kind(), io::ErrorKind::InvalidInput);
}
#[test]
@@ -455,7 +429,7 @@ mod tests {
};
let connector = HttpConnector::new(1, runtime.handle());
assert_eq!(connector.connect(dst).wait().unwrap_err().kind(), io::ErrorKind::InvalidInput);
assert_eq!(block_on(connector.connect(dst)).unwrap_err().kind(), io::ErrorKind::InvalidInput);
}
@@ -468,6 +442,6 @@ mod tests {
};
let connector = HttpConnector::new(1, runtime.handle());
assert_eq!(connector.connect(dst).wait().unwrap_err().kind(), io::ErrorKind::InvalidInput);
assert_eq!(block_on(connector.connect(dst)).unwrap_err().kind(), io::ErrorKind::InvalidInput);
}
}

View File

@@ -1,9 +1,8 @@
use futures::{Async, Poll, Stream};
use futures::sync::{mpsc, oneshot};
use futures::{Async, Never, Poll, Stream};
use futures::channel::{mpsc, oneshot};
use futures::task;
use want;
use common::Never;
//pub type Callback<T, U> = oneshot::Sender<Result<U, (::Error, Option<T>)>>;
pub type RetryPromise<T, U> = oneshot::Receiver<Result<U, (::Error, Option<T>)>>;
pub type Promise<T> = oneshot::Receiver<Result<T, ::Error>>;
@@ -33,15 +32,15 @@ pub struct Sender<T, U> {
}
impl<T, U> Sender<T, U> {
pub fn poll_ready(&mut self) -> Poll<(), ::Error> {
match self.inner.poll_ready() {
pub fn poll_ready(&mut self, cx: &mut task::Context) -> Poll<(), ::Error> {
match self.inner.poll_ready(cx) {
Ok(Async::Ready(())) => {
// there's room in the queue, but does the Connection
// want a message yet?
self.giver.poll_want()
self.giver.poll_want(cx)
.map_err(|_| ::Error::Closed)
},
Ok(Async::NotReady) => Ok(Async::NotReady),
Ok(Async::Pending) => Ok(Async::Pending),
Err(_) => Err(::Error::Closed),
}
}
@@ -75,16 +74,15 @@ impl<T, U> Stream for Receiver<T, U> {
type Item = (T, Callback<T, U>);
type Error = Never;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
match self.inner.poll() {
Ok(Async::Ready(item)) => Ok(Async::Ready(item.map(|mut env| {
fn poll_next(&mut self, cx: &mut task::Context) -> Poll<Option<Self::Item>, Self::Error> {
match self.inner.poll_next(cx)? {
Async::Ready(item) => Ok(Async::Ready(item.map(|mut env| {
env.0.take().expect("envelope not dropped")
}))),
Ok(Async::NotReady) => {
Async::Pending => {
self.taker.want();
Ok(Async::NotReady)
Ok(Async::Pending)
}
Err(()) => unreachable!("mpsc never errors"),
}
}
}
@@ -107,11 +105,11 @@ impl<T, U> Drop for Receiver<T, U> {
// This poll() is safe to call in `Drop`, because we've
// called, `close`, which promises that no new messages
// will arrive, and thus, once we reach the end, we won't
// see a `NotReady` (and try to park), but a Ready(None).
// see a `Pending` (and try to park), but a Ready(None).
//
// All other variants:
// - Ready(None): the end. we want to stop looping
// - NotReady: unreachable
// - Pending: unreachable
// - Err: unreachable
while let Ok(Async::Ready(Some((val, cb)))) = self.inner.poll() {
let _ = cb.send(Err((::Error::new_canceled(None::<::Error>), Some(val))));
@@ -137,10 +135,10 @@ pub enum Callback<T, U> {
}
impl<T, U> Callback<T, U> {
pub fn poll_cancel(&mut self) -> Poll<(), ()> {
pub fn poll_cancel(&mut self, cx: &mut task::Context) -> Poll<(), Never> {
match *self {
Callback::Retry(ref mut tx) => tx.poll_cancel(),
Callback::NoRetry(ref mut tx) => tx.poll_cancel(),
Callback::Retry(ref mut tx) => tx.poll_cancel(cx),
Callback::NoRetry(ref mut tx) => tx.poll_cancel(cx),
}
}
@@ -162,7 +160,8 @@ mod tests {
#[cfg(feature = "nightly")]
extern crate test;
use futures::{future, Future};
use futures::{future, FutureExt};
use futures::executor::block_on;
#[cfg(feature = "nightly")]
use futures::{Stream};
@@ -171,7 +170,7 @@ mod tests {
fn drop_receiver_sends_cancel_errors() {
let _ = pretty_env_logger::try_init();
future::lazy(|| {
block_on(future::lazy(|_| {
#[derive(Debug)]
struct Custom(i32);
let (mut tx, rx) = super::channel::<Custom, ()>();
@@ -188,7 +187,7 @@ mod tests {
Ok::<(), ()>(())
})
}).wait().unwrap();
})).unwrap();
}
#[cfg(feature = "nightly")]
@@ -197,18 +196,18 @@ mod tests {
let (mut tx, mut rx) = super::channel::<i32, ()>();
b.iter(move || {
::futures::future::lazy(|| {
block_on(future::lazy(|cx| {
let _ = tx.send(1).unwrap();
loop {
let async = rx.poll().unwrap();
if async.is_not_ready() {
let async = rx.poll_next(cx).unwrap();
if async.is_pending() {
break;
}
}
Ok::<(), ()>(())
}).wait().unwrap();
})).unwrap();
})
}
@@ -218,11 +217,11 @@ mod tests {
let (_tx, mut rx) = super::channel::<i32, ()>();
b.iter(move || {
::futures::future::lazy(|| {
assert!(rx.poll().unwrap().is_not_ready());
block_on(future::lazy(|cx| {
assert!(rx.poll_next(cx).unwrap().is_pending());
Ok::<(), ()>(())
}).wait().unwrap();
})).unwrap();
})
}

View File

@@ -6,27 +6,44 @@ use std::net::{
};
use std::vec;
use ::futures::{Async, Future, Poll};
use futures::{Async, Future, Poll};
use futures::task;
use futures::future::lazy;
use futures::executor::Executor;
use futures::channel::oneshot;
pub struct Work {
host: String,
port: u16
pub struct Resolving {
receiver: oneshot::Receiver<Result<IpAddrs, io::Error>>
}
impl Work {
pub fn new(host: String, port: u16) -> Work {
Work { host: host, port: port }
impl Resolving {
pub fn spawn(host: String, port: u16, executor: &mut Executor) -> Resolving {
let (sender, receiver) = oneshot::channel();
// The `Resolving` future will return an error when the sender is dropped,
// so we can just ignore the spawn error here
executor.spawn(Box::new(lazy(move |_| {
debug!("resolving host={:?}, port={:?}", host, port);
let result = (host.as_ref(), port).to_socket_addrs()
.map(|i| IpAddrs { iter: i });
sender.send(result).ok();
Ok(())
}))).ok();
Resolving { receiver }
}
}
impl Future for Work {
impl Future for Resolving {
type Item = IpAddrs;
type Error = io::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
debug!("resolving host={:?}, port={:?}", self.host, self.port);
(&*self.host, self.port).to_socket_addrs()
.map(|i| Async::Ready(IpAddrs { iter: i }))
fn poll(&mut self, cx: &mut task::Context) -> Poll<IpAddrs, io::Error> {
match self.receiver.poll(cx) {
Ok(Async::Pending) => Ok(Async::Pending),
Ok(Async::Ready(Ok(ips))) => Ok(Async::Ready(ips)),
Ok(Async::Ready(Err(err))) => Err(err),
Err(_) =>
Err(io::Error::new(io::ErrorKind::Other, "dns task was cancelled"))
}
}
}

View File

@@ -6,22 +6,21 @@ use std::marker::PhantomData;
use std::sync::Arc;
use std::time::Duration;
use futures::{Async, Future, Poll};
use futures::future::{self, Executor};
use futures::{Async, Future, FutureExt, Never, Poll};
use futures::future;
use futures::task;
use http::{Method, Request, Response, Uri, Version};
use http::header::{Entry, HeaderValue, HOST};
use http::uri::Scheme;
use tokio::reactor::Handle;
use tokio_executor::spawn;
pub use tokio_service::Service;
pub use service::Service;
use proto::body::{Body, Entity};
use proto;
use self::pool::Pool;
pub use self::connect::{Connect, HttpConnector};
use self::background::{bg, Background};
use self::connect::Destination;
pub mod conn;
@@ -35,7 +34,6 @@ mod tests;
/// A Client to make outgoing HTTP requests.
pub struct Client<C, B = proto::Body> {
connector: Arc<C>,
executor: Exec,
h1_writev: bool,
pool: Pool<PoolClient<B>>,
retry_canceled_requests: bool,
@@ -82,10 +80,9 @@ impl Client<HttpConnector, proto::Body> {
impl<C, B> Client<C, B> {
#[inline]
fn configured(config: Config<C, B>, exec: Exec) -> Client<C, B> {
fn configured(config: Config<C, B>) -> Client<C, B> {
Client {
connector: Arc::new(config.connector),
executor: exec,
h1_writev: config.h1_writev,
pool: Pool::new(config.keep_alive, config.keep_alive_timeout),
retry_canceled_requests: config.retry_canceled_requests,
@@ -125,14 +122,6 @@ where C: Connect<Error=io::Error> + Sync + 'static,
/// Send a constructed Request using this Client.
pub fn request(&self, mut req: Request<B>) -> FutureResponse {
// TODO(0.12): do this at construction time.
//
// It cannot be done in the constructor because the Client::configured
// does not have `B: 'static` bounds, which are required to spawn
// the interval. In 0.12, add a static bounds to the constructor,
// and move this.
self.schedule_pool_timer();
match req.version() {
Version::HTTP_10 |
Version::HTTP_11 => (),
@@ -175,7 +164,6 @@ where C: Connect<Error=io::Error> + Sync + 'static,
}
}
let client = self.clone();
let uri = req.uri().clone();
let fut = RetryableSendRequest {
@@ -192,7 +180,6 @@ where C: Connect<Error=io::Error> + Sync + 'static,
let url = req.uri().clone();
let checkout = self.pool.checkout(domain);
let connect = {
let executor = self.executor.clone();
let pool = self.pool.clone();
let pool_key = Arc::new(domain.to_string());
let h1_writev = self.h1_writev;
@@ -200,36 +187,39 @@ where C: Connect<Error=io::Error> + Sync + 'static,
let dst = Destination {
uri: url,
};
future::lazy(move || {
future::lazy(move |_| {
connector.connect(dst)
.from_err()
.err_into()
.and_then(move |(io, connected)| {
conn::Builder::new()
.h1_writev(h1_writev)
.handshake_no_upgrades(io)
.and_then(move |(tx, conn)| {
executor.execute(conn.map_err(|e| debug!("client connection error: {}", e)))?;
Ok(pool.pooled(pool_key, PoolClient {
is_proxied: connected.is_proxied,
tx: tx,
}))
future::lazy(move |cx| {
execute(conn.recover(|e| {
debug!("client connection error: {}", e);
}), cx)?;
Ok(pool.pooled(pool_key, PoolClient {
is_proxied: connected.is_proxied,
tx: tx,
}))
})
})
})
})
};
let race = checkout.select(connect)
.map(|(pooled, _work)| pooled)
.map_err(|(e, _checkout)| {
// the Pool Checkout cannot error, so the only error
// is from the Connector
// XXX: should wait on the Checkout? Problem is
// that if the connector is failing, it may be that we
// never had a pooled stream at all
ClientError::Normal(e)
});
let race = checkout.select(connect).map(|either| {
either.either(|(pooled, _)| pooled, |(pooled, _)| pooled)
}).map_err(|either| {
// the Pool Checkout cannot error, so the only error
// is from the Connector
// XXX: should wait on the Checkout? Problem is
// that if the connector is failing, it may be that we
// never had a pooled stream at all
ClientError::Normal(either.either(|(e, _)| e, |(e, _)| e))
});
let executor = self.executor.clone();
let resp = race.and_then(move |mut pooled| {
let conn_reused = pooled.is_reused();
set_relative_uri(req.uri_mut(), pooled.is_proxied);
@@ -245,24 +235,26 @@ where C: Connect<Error=io::Error> + Sync + 'static,
ClientError::Normal(err)
}
})
.map(move |res| {
// when pooled is dropped, it will try to insert back into the
// pool. To delay that, spawn a future that completes once the
// sender is ready again.
//
// This *should* only be once the related `Connection` has polled
// for a new request to start.
//
// It won't be ready if there is a body to stream.
if let Ok(Async::NotReady) = pooled.tx.poll_ready() {
// If the executor doesn't have room, oh well. Things will likely
// be blowing up soon, but this specific task isn't required.
let _ = executor.execute(future::poll_fn(move || {
pooled.tx.poll_ready().map_err(|_| ())
}));
}
.and_then(move |res| {
future::lazy(move |cx| {
// when pooled is dropped, it will try to insert back into the
// pool. To delay that, spawn a future that completes once the
// sender is ready again.
//
// This *should* only be once the related `Connection` has polled
// for a new request to start.
//
// It won't be ready if there is a body to stream.
if let Ok(Async::Pending) = pooled.tx.poll_ready(cx) {
// If the executor doesn't have room, oh well. Things will likely
// be blowing up soon, but this specific task isn't required.
execute(future::poll_fn(move |cx| {
pooled.tx.poll_ready(cx).or(Ok(Async::Ready(())))
}), cx).ok();
}
res
Ok(res)
})
});
@@ -271,10 +263,6 @@ where C: Connect<Error=io::Error> + Sync + 'static,
Box::new(resp)
}
fn schedule_pool_timer(&self) {
self.pool.spawn_expired_interval(&self.executor);
}
}
impl<C, B> Service for Client<C, B>
@@ -297,7 +285,6 @@ impl<C, B> Clone for Client<C, B> {
fn clone(&self) -> Client<C, B> {
Client {
connector: self.connector.clone(),
executor: self.executor.clone(),
h1_writev: self.h1_writev,
pool: self.pool.clone(),
retry_canceled_requests: self.retry_canceled_requests,
@@ -327,8 +314,8 @@ impl Future for FutureResponse {
type Item = Response<Body>;
type Error = ::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.0.poll()
fn poll(&mut self, cx: &mut task::Context) -> Poll<Self::Item, Self::Error> {
self.0.poll(cx)
}
}
@@ -349,11 +336,11 @@ where
type Item = Response<Body>;
type Error = ::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
fn poll(&mut self, cx: &mut task::Context) -> Poll<Self::Item, Self::Error> {
loop {
match self.future.poll() {
match self.future.poll(cx) {
Ok(Async::Ready(resp)) => return Ok(Async::Ready(resp)),
Ok(Async::NotReady) => return Ok(Async::NotReady),
Ok(Async::Pending) => return Ok(Async::Pending),
Err(ClientError::Normal(err)) => return Err(err),
Err(ClientError::Canceled {
connection_reused,
@@ -561,18 +548,7 @@ where C: Connect<Error=io::Error>,
/// Construct the Client with this configuration.
#[inline]
pub fn build(self) -> Client<C, B> {
Client::configured(self, Exec::Default)
}
/// Construct a Client with this configuration and an executor.
///
/// The executor will be used to spawn "background" connection tasks
/// to drive requests and responses.
pub fn executor<E>(self, executor: E) -> Client<C, B>
where
E: Executor<Background> + Send + Sync + 'static,
{
Client::configured(self, Exec::new(executor))
Client::configured(self)
}
}
@@ -590,21 +566,6 @@ where
}
self.connector(connector).build()
}
/// Construct a Client with this configuration and an executor.
///
/// The executor will be used to spawn "background" connection tasks
/// to drive requests and responses.
pub fn build_with_executor<E>(self, handle: &Handle, executor: E) -> Client<HttpConnector, B>
where
E: Executor<Background> + Send + Sync + 'static,
{
let mut connector = HttpConnector::new(4, handle);
if self.keep_alive {
connector.set_keepalive(self.keep_alive_timeout);
}
self.connector(connector).executor(executor)
}
}
impl<C, B> fmt::Debug for Config<C, B> {
@@ -629,68 +590,15 @@ impl<C: Clone, B> Clone for Config<C, B> {
}
// ===== impl Exec =====
#[derive(Clone)]
enum Exec {
Default,
Executor(Arc<Executor<Background> + Send + Sync>),
}
impl Exec {
pub(crate) fn new<E: Executor<Background> + Send + Sync + 'static>(executor: E) -> Exec {
Exec::Executor(Arc::new(executor))
}
fn execute<F>(&self, fut: F) -> io::Result<()>
where
F: Future<Item=(), Error=()> + Send + 'static,
{
match *self {
Exec::Default => spawn(fut),
Exec::Executor(ref e) => {
e.execute(bg(Box::new(fut)))
.map_err(|err| {
debug!("executor error: {:?}", err.kind());
io::Error::new(
io::ErrorKind::Other,
"executor error",
)
})?
},
}
Ok(())
fn execute<F>(fut: F, cx: &mut task::Context) -> Result<(), ::Error>
where F: Future<Item=(), Error=Never> + Send + 'static,
{
if let Some(executor) = cx.executor() {
executor.spawn(Box::new(fut)).map_err(|err| {
debug!("executor error: {:?}", err);
::Error::Executor
})
} else {
Err(::Error::Executor)
}
}
// ===== impl Background =====
// The types inside this module are not exported out of the crate,
// so they are in essence un-nameable.
mod background {
use futures::{Future, Poll};
// This is basically `impl Future`, since the type is un-nameable,
// and only implementeds `Future`.
#[allow(missing_debug_implementations)]
pub struct Background {
inner: Box<Future<Item=(), Error=()> + Send>,
}
pub fn bg(fut: Box<Future<Item=(), Error=()> + Send>) -> Background {
Background {
inner: fut,
}
}
impl Future for Background {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.inner.poll()
}
}
}

View File

@@ -4,14 +4,13 @@ use std::ops::{Deref, DerefMut};
use std::sync::{Arc, Mutex, Weak};
use std::time::{Duration, Instant};
use futures::{Future, Async, Poll, Stream};
use futures::sync::oneshot;
use futures::{Future, Async, Never, Poll, Stream};
use futures::channel::oneshot;
use futures::task;
use futures_timer::Interval;
use super::Exec;
pub struct Pool<T> {
inner: Arc<Mutex<PoolInner<T>>>,
pub(super) struct Pool<T> {
inner: Arc<Mutex<PoolInner<T>>>
}
// Before using a pooled connection, make sure the sender is not dead.
@@ -48,7 +47,7 @@ struct PoolInner<T> {
}
impl<T> Pool<T> {
pub fn new(enabled: bool, timeout: Option<Duration>) -> Pool<T> {
pub(super) fn new(enabled: bool, timeout: Option<Duration>) -> Pool<T> {
Pool {
inner: Arc::new(Mutex::new(PoolInner {
enabled: enabled,
@@ -56,7 +55,7 @@ impl<T> Pool<T> {
parked: HashMap::new(),
timeout: timeout,
expired_timer_spawned: false,
})),
}))
}
}
}
@@ -67,6 +66,7 @@ impl<T: Closed> Pool<T> {
key: Arc::new(key.to_owned()),
pool: self.clone(),
parked: None,
spawned_expired_interval: false
}
}
@@ -221,38 +221,38 @@ impl<T: Closed> PoolInner<T> {
impl<T: Closed + Send + 'static> Pool<T> {
pub(super) fn spawn_expired_interval(&self, exec: &Exec) {
fn spawn_expired_interval(&mut self, cx: &mut task::Context) -> Result<(), ::Error> {
let dur = {
let mut inner = self.inner.lock().unwrap();
if !inner.enabled {
return;
return Ok(());
}
if inner.expired_timer_spawned {
return;
return Ok(());
}
inner.expired_timer_spawned = true;
if let Some(dur) = inner.timeout {
dur
} else {
return
return Ok(());
}
};
let interval = Interval::new(dur);
exec.execute(IdleInterval {
super::execute(IdleInterval {
interval: interval,
pool: Arc::downgrade(&self.inner),
}).unwrap();
}, cx)
}
}
impl<T> Clone for Pool<T> {
fn clone(&self) -> Pool<T> {
Pool {
inner: self.inner.clone(),
inner: self.inner.clone()
}
}
}
@@ -322,22 +322,23 @@ pub struct Checkout<T> {
key: Arc<String>,
pool: Pool<T>,
parked: Option<oneshot::Receiver<T>>,
spawned_expired_interval: bool
}
struct NotParked;
impl<T: Closed> Checkout<T> {
fn poll_parked(&mut self) -> Poll<Pooled<T>, NotParked> {
fn poll_parked(&mut self, cx: &mut task::Context) -> Poll<Pooled<T>, NotParked> {
let mut drop_parked = false;
if let Some(ref mut rx) = self.parked {
match rx.poll() {
match rx.poll(cx) {
Ok(Async::Ready(value)) => {
if !value.is_closed() {
return Ok(Async::Ready(self.pool.reuse(&self.key, value)));
}
drop_parked = true;
},
Ok(Async::NotReady) => return Ok(Async::NotReady),
Ok(Async::Pending) => return Ok(Async::Pending),
Err(_canceled) => drop_parked = true,
}
}
@@ -347,22 +348,27 @@ impl<T: Closed> Checkout<T> {
Err(NotParked)
}
fn park(&mut self) {
fn park(&mut self, cx: &mut task::Context) {
if self.parked.is_none() {
let (tx, mut rx) = oneshot::channel();
let _ = rx.poll(); // park this task
let _ = rx.poll(cx); // park this task
self.pool.park(self.key.clone(), tx);
self.parked = Some(rx);
}
}
}
impl<T: Closed> Future for Checkout<T> {
impl<T: Closed + Send + 'static> Future for Checkout<T> {
type Item = Pooled<T>;
type Error = ::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.poll_parked() {
fn poll(&mut self, cx: &mut task::Context) -> Poll<Self::Item, Self::Error> {
if !self.spawned_expired_interval {
self.pool.spawn_expired_interval(cx)?;
self.spawned_expired_interval = true;
}
match self.poll_parked(cx) {
Ok(async) => return Ok(async),
Err(_not_parked) => (),
}
@@ -372,8 +378,8 @@ impl<T: Closed> Future for Checkout<T> {
if let Some(pooled) = entry {
Ok(Async::Ready(pooled))
} else {
self.park();
Ok(Async::NotReady)
self.park(cx);
Ok(Async::Pending)
}
}
}
@@ -409,11 +415,11 @@ struct IdleInterval<T> {
impl<T: Closed + 'static> Future for IdleInterval<T> {
type Item = ();
type Error = ();
type Error = Never;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
fn poll(&mut self, cx: &mut task::Context) -> Poll<Self::Item, Self::Error> {
loop {
try_ready!(self.interval.poll().map_err(|_| unreachable!("interval cannot error")));
try_ready!(self.interval.poll_next(cx).map_err(|_| unreachable!("interval cannot error")));
if let Some(inner) = self.pool.upgrade() {
if let Ok(mut inner) = inner.lock() {
@@ -430,9 +436,10 @@ impl<T: Closed + 'static> Future for IdleInterval<T> {
mod tests {
use std::sync::Arc;
use std::time::Duration;
use futures::{Async, Future};
use futures::{Async, Future, FutureExt};
use futures::future;
use super::{Closed, Pool, Exec};
use futures::executor::block_on;
use super::{Closed, Pool};
impl Closed for i32 {
fn is_closed(&self) -> bool {
@@ -442,34 +449,37 @@ mod tests {
#[test]
fn test_pool_checkout_smoke() {
let pool = Pool::new(true, Some(Duration::from_secs(5)));
let key = Arc::new("foo".to_string());
let pooled = pool.pooled(key.clone(), 41);
block_on(future::lazy(|cx| {
let pool = Pool::new(true, Some(Duration::from_secs(5)));
let key = Arc::new("foo".to_string());
let pooled = pool.pooled(key.clone(), 41);
drop(pooled);
drop(pooled);
match pool.checkout(&key).poll().unwrap() {
Async::Ready(pooled) => assert_eq!(*pooled, 41),
_ => panic!("not ready"),
}
match pool.checkout(&key).poll(cx).unwrap() {
Async::Ready(pooled) => assert_eq!(*pooled, 41),
_ => panic!("not ready"),
}
Ok::<_, ()>(())
})).unwrap();
}
#[test]
fn test_pool_checkout_returns_none_if_expired() {
future::lazy(|| {
block_on(future::lazy(|cx| {
let pool = Pool::new(true, Some(Duration::from_millis(100)));
let key = Arc::new("foo".to_string());
let pooled = pool.pooled(key.clone(), 41);
drop(pooled);
::std::thread::sleep(pool.inner.lock().unwrap().timeout.unwrap());
assert!(pool.checkout(&key).poll().unwrap().is_not_ready());
::futures::future::ok::<(), ()>(())
}).wait().unwrap();
assert!(pool.checkout(&key).poll(cx).unwrap().is_pending());
Ok::<_, ()>(())
})).unwrap();
}
#[test]
fn test_pool_checkout_removes_expired() {
future::lazy(|| {
block_on(future::lazy(|cx| {
let pool = Pool::new(true, Some(Duration::from_millis(100)));
let key = Arc::new("foo".to_string());
@@ -481,20 +491,20 @@ mod tests {
::std::thread::sleep(pool.inner.lock().unwrap().timeout.unwrap());
// checkout.poll() should clean out the expired
pool.checkout(&key).poll().unwrap();
pool.checkout(&key).poll(cx).unwrap();
assert!(pool.inner.lock().unwrap().idle.get(&key).is_none());
Ok::<(), ()>(())
}).wait().unwrap();
Ok::<_, ()>(())
})).unwrap();
}
#[test]
fn test_pool_timer_removes_expired() {
let runtime = ::tokio::runtime::Runtime::new().unwrap();
let pool = Pool::new(true, Some(Duration::from_millis(100)));
let mut pool = Pool::new(true, Some(Duration::from_millis(100)));
let executor = runtime.executor();
pool.spawn_expired_interval(&Exec::new(executor));
block_on(future::lazy(|cx| {
pool.spawn_expired_interval(cx)
})).unwrap();
let key = Arc::new("foo".to_string());
pool.pooled(key.clone(), 41);
@@ -503,9 +513,9 @@ mod tests {
assert_eq!(pool.inner.lock().unwrap().idle.get(&key).map(|entries| entries.len()), Some(3));
::futures_timer::Delay::new(
block_on(::futures_timer::Delay::new(
Duration::from_millis(400) // allow for too-good resolution
).wait().unwrap();
)).unwrap();
assert!(pool.inner.lock().unwrap().idle.get(&key).is_none());
}
@@ -516,7 +526,7 @@ mod tests {
let key = Arc::new("foo".to_string());
let pooled = pool.pooled(key.clone(), 41);
let checkout = pool.checkout(&key).join(future::lazy(move || {
let checkout = pool.checkout(&key).join(future::lazy(move |_| {
// the checkout future will park first,
// and then this lazy future will be polled, which will insert
// the pooled back into the pool
@@ -525,12 +535,12 @@ mod tests {
drop(pooled);
Ok(())
})).map(|(entry, _)| entry);
assert_eq!(*checkout.wait().unwrap(), 41);
assert_eq!(*block_on(checkout).unwrap(), 41);
}
#[test]
fn test_pool_checkout_drop_cleans_up_parked() {
future::lazy(|| {
block_on(future::lazy(|cx| {
let pool = Pool::<i32>::new(true, Some(Duration::from_secs(10)));
let key = Arc::new("localhost:12345".to_string());
@@ -538,9 +548,9 @@ mod tests {
let mut checkout2 = pool.checkout(&key);
// first poll needed to get into Pool's parked
checkout1.poll().unwrap();
checkout1.poll(cx).unwrap();
assert_eq!(pool.inner.lock().unwrap().parked.get(&key).unwrap().len(), 1);
checkout2.poll().unwrap();
checkout2.poll(cx).unwrap();
assert_eq!(pool.inner.lock().unwrap().parked.get(&key).unwrap().len(), 2);
// on drop, clean up Pool
@@ -550,7 +560,7 @@ mod tests {
drop(checkout2);
assert!(pool.inner.lock().unwrap().parked.get(&key).is_none());
::futures::future::ok::<(), ()>(())
}).wait().unwrap();
Ok::<_, ()>(())
})).unwrap();
}
}

View File

@@ -5,6 +5,7 @@ use std::time::Duration;
use futures::Async;
use futures::future::poll_fn;
use futures::executor::block_on;
use tokio::executor::thread_pool::{Builder as ThreadPoolBuilder};
use mock::MockConnector;
@@ -22,7 +23,7 @@ fn retryable_request() {
let client = Client::configure()
.connector(connector)
.executor(executor.sender().clone());
.build();
{
@@ -30,13 +31,13 @@ fn retryable_request() {
.uri("http://mock.local/a")
.body(Default::default())
.unwrap();
let res1 = client.request(req);
let srv1 = poll_fn(|| {
try_ready!(sock1.read(&mut [0u8; 512]));
let res1 = client.request(req).with_executor(executor.sender().clone());
let srv1 = poll_fn(|cx| {
try_ready!(sock1.read(cx, &mut [0u8; 512]));
try_ready!(sock1.write(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n"));
Ok(Async::Ready(()))
});
res1.join(srv1).wait().expect("res1");
block_on(res1.join(srv1)).expect("res1");
}
drop(sock1);
@@ -44,17 +45,17 @@ fn retryable_request() {
.uri("http://mock.local/b")
.body(Default::default())
.unwrap();
let res2 = client.request(req)
let res2 = client.request(req).with_executor(executor.sender().clone())
.map(|res| {
assert_eq!(res.status().as_u16(), 222);
});
let srv2 = poll_fn(|| {
try_ready!(sock2.read(&mut [0u8; 512]));
let srv2 = poll_fn(|cx| {
try_ready!(sock2.read(cx, &mut [0u8; 512]));
try_ready!(sock2.write(b"HTTP/1.1 222 OK\r\nContent-Length: 0\r\n\r\n"));
Ok(Async::Ready(()))
});
res2.join(srv2).wait().expect("res2");
block_on(res2.join(srv2)).expect("res2");
}
#[test]
@@ -68,7 +69,7 @@ fn conn_reset_after_write() {
let client = Client::configure()
.connector(connector)
.executor(executor.sender().clone());
.build();
{
let req = Request::builder()
@@ -77,13 +78,13 @@ fn conn_reset_after_write() {
.header("content-length", "0")
.body(Default::default())
.unwrap();
let res1 = client.request(req);
let srv1 = poll_fn(|| {
try_ready!(sock1.read(&mut [0u8; 512]));
let res1 = client.request(req).with_executor(executor.sender().clone());
let srv1 = poll_fn(|cx| {
try_ready!(sock1.read(cx, &mut [0u8; 512]));
try_ready!(sock1.write(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n"));
Ok(Async::Ready(()))
});
res1.join(srv1).wait().expect("res1");
block_on(res1.join(srv1)).expect("res1");
}
// sleep to allow some time for the connection to return to the pool
@@ -93,20 +94,20 @@ fn conn_reset_after_write() {
.uri("http://mock.local/a")
.body(Default::default())
.unwrap();
let res2 = client.request(req);
let res2 = client.request(req).with_executor(executor.sender().clone());
let mut sock1 = Some(sock1);
let srv2 = poll_fn(|| {
let srv2 = poll_fn(|cx| {
// We purposefully keep the socket open until the client
// has written the second request, and THEN disconnect.
//
// Not because we expect servers to be jerks, but to trigger
// state where we write on an assumedly good connetion, and
// only reset the close AFTER we wrote bytes.
try_ready!(sock1.as_mut().unwrap().read(&mut [0u8; 512]));
try_ready!(sock1.as_mut().unwrap().read(cx, &mut [0u8; 512]));
sock1.take();
Ok(Async::Ready(()))
});
let err = res2.join(srv2).wait().expect_err("res2");
let err = block_on(res2.join(srv2)).expect_err("res2");
match err {
::Error::Incomplete => (),
other => panic!("expected Incomplete, found {:?}", other)