Revert "refactor(lib): convert to futures 0.2.0-beta (#1470)"

This reverts commit a12f7beed9.

Much sadness 😢.
This commit is contained in:
Sean McArthur
2018-04-07 10:19:50 -07:00
parent 72e02d6ac8
commit 625e4daaa1
34 changed files with 1368 additions and 1386 deletions

View File

@@ -11,10 +11,9 @@ use std::fmt;
use std::marker::PhantomData;
use bytes::Bytes;
use futures::{Async, Future, FutureExt, Poll};
use futures::{Async, Future, Poll};
use futures::future::{self, Either};
use futures::task;
use futures::io::{AsyncRead, AsyncWrite};
use tokio_io::{AsyncRead, AsyncWrite};
use proto;
use proto::body::Entity;
@@ -124,8 +123,8 @@ impl<B> SendRequest<B>
/// Polls to determine whether this sender can be used yet for a request.
///
/// If the associated connection is closed, this returns an Error.
pub fn poll_ready(&mut self, cx: &mut task::Context) -> Poll<(), ::Error> {
self.dispatch.poll_ready(cx)
pub fn poll_ready(&mut self) -> Poll<(), ::Error> {
self.dispatch.poll_ready()
}
pub(super) fn is_ready(&self) -> bool {
@@ -167,7 +166,7 @@ where
/// # use http::header::HOST;
/// # use hyper::client::conn::SendRequest;
/// # use hyper::Body;
/// use futures::FutureExt;
/// use futures::Future;
/// use hyper::Request;
///
/// # fn doc(mut tx: SendRequest<Body>) {
@@ -191,19 +190,19 @@ where
pub fn send_request(&mut self, req: Request<B>) -> ResponseFuture {
let inner = match self.dispatch.send(req) {
Ok(rx) => {
rx.then(move |res| {
Either::A(rx.then(move |res| {
match res {
Ok(Ok(res)) => Ok(res),
Ok(Err(err)) => Err(err),
// this is definite bug if it happens, but it shouldn't happen!
Err(_) => panic!("dispatch dropped without returning error"),
}
}).left()
}))
},
Err(_req) => {
debug!("connection was not ready");
let err = ::Error::new_canceled(Some("connection was not ready"));
future::err(err).right()
Either::B(future::err(err))
}
};
@@ -219,7 +218,7 @@ where
{
let inner = match self.dispatch.try_send(req) {
Ok(rx) => {
Either::Left(rx.then(move |res| {
Either::A(rx.then(move |res| {
match res {
Ok(Ok(res)) => Ok(res),
Ok(Err(err)) => Err(err),
@@ -231,7 +230,7 @@ where
Err(req) => {
debug!("connection was not ready");
let err = ::Error::new_canceled(Some("connection was not ready"));
Either::Right(future::err((err, Some(req))))
Either::B(future::err((err, Some(req))))
}
};
Box::new(inner)
@@ -282,8 +281,8 @@ where
/// upgrade. Once the upgrade is completed, the connection would be "done",
/// but it is not desired to actally shutdown the IO object. Instead you
/// would take it back using `into_parts`.
pub fn poll_without_shutdown(&mut self, cx: &mut task::Context) -> Poll<(), ::Error> {
self.inner.poll_without_shutdown(cx)
pub fn poll_without_shutdown(&mut self) -> Poll<(), ::Error> {
self.inner.poll_without_shutdown()
}
}
@@ -295,8 +294,8 @@ where
type Item = ();
type Error = ::Error;
fn poll(&mut self, cx: &mut task::Context) -> Poll<Self::Item, Self::Error> {
self.inner.poll(cx)
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.inner.poll()
}
}
@@ -368,8 +367,8 @@ where
type Item = (SendRequest<B>, Connection<T, B>);
type Error = ::Error;
fn poll(&mut self, cx: &mut task::Context) -> Poll<Self::Item, Self::Error> {
self.inner.poll(cx)
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.inner.poll()
.map(|async| {
async.map(|(tx, dispatch)| {
(tx, Connection { inner: dispatch })
@@ -399,8 +398,8 @@ where
>);
type Error = ::Error;
fn poll(&mut self, cx: &mut task::Context) -> Poll<Self::Item, Self::Error> {
self.inner.poll(cx)
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.inner.poll()
}
}
@@ -422,7 +421,7 @@ where
>);
type Error = ::Error;
fn poll(&mut self, _cx: &mut task::Context) -> Poll<Self::Item, Self::Error> {
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let io = self.io.take().expect("polled more than once");
let (tx, rx) = dispatch::channel();
let mut conn = proto::Conn::new(io);
@@ -446,8 +445,8 @@ impl Future for ResponseFuture {
type Error = ::Error;
#[inline]
fn poll(&mut self, cx: &mut task::Context) -> Poll<Self::Item, Self::Error> {
self.inner.poll(cx)
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.inner.poll()
}
}

View File

@@ -8,21 +8,24 @@
use std::error::Error as StdError;
use std::fmt;
use std::io;
use std::mem;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use futures::{Future, Never, Poll, Async};
use futures::executor::{Executor, SpawnError, ThreadPoolBuilder};
use futures::task;
use futures::io::{AsyncRead, AsyncWrite};
use futures::{Future, Poll, Async};
use futures::future::{Executor, ExecuteError};
use futures::sync::oneshot;
use futures_cpupool::{Builder as CpuPoolBuilder};
use http::Uri;
use http::uri::Scheme;
use net2::TcpBuilder;
use tokio_io::{AsyncRead, AsyncWrite};
use tokio::reactor::Handle;
use tokio::net::{TcpStream, ConnectFuture};
use executor::CloneBoxedExecutor;
use super::dns;
use self::http_connector::HttpConnectorBlockingTask;
/// Connect to a destination, returning an IO transport.
///
@@ -171,7 +174,7 @@ impl HttpConnector {
/// Takes number of DNS worker threads.
#[inline]
pub fn new(threads: usize, handle: &Handle) -> HttpConnector {
let pool = ThreadPoolBuilder::new()
let pool = CpuPoolBuilder::new()
.name_prefix("hyper-dns")
.pool_size(threads)
.create();
@@ -183,10 +186,10 @@ impl HttpConnector {
/// Takes an executor to run blocking tasks on.
#[inline]
pub fn new_with_executor<E: 'static>(executor: E, handle: &Handle) -> HttpConnector
where E: Executor + Clone + Send + Sync
where E: Executor<HttpConnectorBlockingTask> + Send + Sync
{
HttpConnector {
executor: HttpConnectExecutor(Box::new(executor)),
executor: HttpConnectExecutor(Arc::new(executor)),
enforce_http: true,
handle: handle.clone(),
keep_alive_timeout: None,
@@ -295,7 +298,7 @@ pub struct HttpConnecting {
enum State {
Lazy(HttpConnectExecutor, String, u16),
Resolving(dns::Resolving),
Resolving(oneshot::SpawnHandle<dns::IpAddrs, io::Error>),
Connecting(ConnectingTcp),
Error(Option<io::Error>),
}
@@ -304,11 +307,11 @@ impl Future for HttpConnecting {
type Item = (TcpStream, Connected);
type Error = io::Error;
fn poll(&mut self, cx: &mut task::Context) -> Poll<Self::Item, Self::Error> {
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop {
let state;
match self.state {
State::Lazy(ref mut executor, ref mut host, port) => {
State::Lazy(ref executor, ref mut host, port) => {
// If the host is already an IP addr (v4 or v6),
// skip resolving the dns and start connecting right away.
if let Some(addrs) = dns::IpAddrs::try_parse(host, port) {
@@ -317,19 +320,24 @@ impl Future for HttpConnecting {
current: None
})
} else {
let host = ::std::mem::replace(host, String::new());
state = State::Resolving(dns::Resolving::spawn(host, port, executor));
let host = mem::replace(host, String::new());
let work = dns::Work::new(host, port);
state = State::Resolving(oneshot::spawn(work, executor));
}
},
State::Resolving(ref mut future) => {
let addrs = try_ready!(future.poll(cx));
state = State::Connecting(ConnectingTcp {
addrs: addrs,
current: None,
});
match try!(future.poll()) {
Async::NotReady => return Ok(Async::NotReady),
Async::Ready(addrs) => {
state = State::Connecting(ConnectingTcp {
addrs: addrs,
current: None,
})
}
};
},
State::Connecting(ref mut c) => {
let sock = try_ready!(c.poll(cx, &self.handle));
let sock = try_ready!(c.poll(&self.handle));
if let Some(dur) = self.keep_alive_timeout {
sock.set_keepalive(Some(dur))?;
@@ -357,11 +365,11 @@ struct ConnectingTcp {
impl ConnectingTcp {
// not a Future, since passing a &Handle to poll
fn poll(&mut self, cx: &mut task::Context, handle: &Handle) -> Poll<TcpStream, io::Error> {
fn poll(&mut self, handle: &Handle) -> Poll<TcpStream, io::Error> {
let mut err = None;
loop {
if let Some(ref mut current) = self.current {
match current.poll(cx) {
match current.poll() {
Ok(ok) => return Ok(ok),
Err(e) => {
trace!("connect error {:?}", e);
@@ -384,19 +392,37 @@ impl ConnectingTcp {
}
}
#[derive(Clone)]
struct HttpConnectExecutor(Box<CloneBoxedExecutor>);
impl Executor for HttpConnectExecutor {
fn spawn(
&mut self,
f: Box<Future<Item = (), Error = Never> + 'static + Send>
) -> Result<(), SpawnError> {
self.0.spawn(f)
// Make this Future unnameable outside of this crate.
mod http_connector {
use super::*;
// Blocking task to be executed on a thread pool.
pub struct HttpConnectorBlockingTask {
pub(super) work: oneshot::Execute<dns::Work>
}
fn status(&self) -> Result<(), SpawnError> {
self.0.status()
impl fmt::Debug for HttpConnectorBlockingTask {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.pad("HttpConnectorBlockingTask")
}
}
impl Future for HttpConnectorBlockingTask {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<(), ()> {
self.work.poll()
}
}
}
#[derive(Clone)]
struct HttpConnectExecutor(Arc<Executor<HttpConnectorBlockingTask> + Send + Sync>);
impl Executor<oneshot::Execute<dns::Work>> for HttpConnectExecutor {
fn execute(&self, future: oneshot::Execute<dns::Work>) -> Result<(), ExecuteError<oneshot::Execute<dns::Work>>> {
self.0.execute(HttpConnectorBlockingTask { work: future })
.map_err(|err| ExecuteError::new(err.kind(), err.into_future().work))
}
}
@@ -404,7 +430,7 @@ impl Executor for HttpConnectExecutor {
mod tests {
#![allow(deprecated)]
use std::io;
use futures::executor::block_on;
use futures::Future;
use tokio::runtime::Runtime;
use super::{Connect, Destination, HttpConnector};
@@ -417,7 +443,7 @@ mod tests {
};
let connector = HttpConnector::new(1, runtime.handle());
assert_eq!(block_on(connector.connect(dst)).unwrap_err().kind(), io::ErrorKind::InvalidInput);
assert_eq!(connector.connect(dst).wait().unwrap_err().kind(), io::ErrorKind::InvalidInput);
}
#[test]
@@ -429,7 +455,7 @@ mod tests {
};
let connector = HttpConnector::new(1, runtime.handle());
assert_eq!(block_on(connector.connect(dst)).unwrap_err().kind(), io::ErrorKind::InvalidInput);
assert_eq!(connector.connect(dst).wait().unwrap_err().kind(), io::ErrorKind::InvalidInput);
}
@@ -442,6 +468,6 @@ mod tests {
};
let connector = HttpConnector::new(1, runtime.handle());
assert_eq!(block_on(connector.connect(dst)).unwrap_err().kind(), io::ErrorKind::InvalidInput);
assert_eq!(connector.connect(dst).wait().unwrap_err().kind(), io::ErrorKind::InvalidInput);
}
}

View File

@@ -1,8 +1,9 @@
use futures::{Async, Never, Poll, Stream};
use futures::channel::{mpsc, oneshot};
use futures::task;
use futures::{Async, Poll, Stream};
use futures::sync::{mpsc, oneshot};
use want;
use common::Never;
//pub type Callback<T, U> = oneshot::Sender<Result<U, (::Error, Option<T>)>>;
pub type RetryPromise<T, U> = oneshot::Receiver<Result<U, (::Error, Option<T>)>>;
pub type Promise<T> = oneshot::Receiver<Result<T, ::Error>>;
@@ -32,15 +33,15 @@ pub struct Sender<T, U> {
}
impl<T, U> Sender<T, U> {
pub fn poll_ready(&mut self, cx: &mut task::Context) -> Poll<(), ::Error> {
match self.inner.poll_ready(cx) {
pub fn poll_ready(&mut self) -> Poll<(), ::Error> {
match self.inner.poll_ready() {
Ok(Async::Ready(())) => {
// there's room in the queue, but does the Connection
// want a message yet?
self.giver.poll_want(cx)
self.giver.poll_want()
.map_err(|_| ::Error::Closed)
},
Ok(Async::Pending) => Ok(Async::Pending),
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(_) => Err(::Error::Closed),
}
}
@@ -78,15 +79,16 @@ impl<T, U> Stream for Receiver<T, U> {
type Item = (T, Callback<T, U>);
type Error = Never;
fn poll_next(&mut self, cx: &mut task::Context) -> Poll<Option<Self::Item>, Self::Error> {
match self.inner.poll_next(cx)? {
Async::Ready(item) => Ok(Async::Ready(item.map(|mut env| {
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
match self.inner.poll() {
Ok(Async::Ready(item)) => Ok(Async::Ready(item.map(|mut env| {
env.0.take().expect("envelope not dropped")
}))),
Async::Pending => {
Ok(Async::NotReady) => {
self.taker.want();
Ok(Async::Pending)
Ok(Async::NotReady)
}
Err(()) => unreachable!("mpsc never errors"),
}
}
}
@@ -109,11 +111,11 @@ impl<T, U> Drop for Receiver<T, U> {
// This poll() is safe to call in `Drop`, because we've
// called, `close`, which promises that no new messages
// will arrive, and thus, once we reach the end, we won't
// see a `Pending` (and try to park), but a Ready(None).
// see a `NotReady` (and try to park), but a Ready(None).
//
// All other variants:
// - Ready(None): the end. we want to stop looping
// - Pending: unreachable
// - NotReady: unreachable
// - Err: unreachable
while let Ok(Async::Ready(Some((val, cb)))) = self.inner.poll() {
let _ = cb.send(Err((::Error::new_canceled(None::<::Error>), Some(val))));
@@ -139,10 +141,10 @@ pub enum Callback<T, U> {
}
impl<T, U> Callback<T, U> {
pub fn poll_cancel(&mut self, cx: &mut task::Context) -> Poll<(), Never> {
pub fn poll_cancel(&mut self) -> Poll<(), ()> {
match *self {
Callback::Retry(ref mut tx) => tx.poll_cancel(cx),
Callback::NoRetry(ref mut tx) => tx.poll_cancel(cx),
Callback::Retry(ref mut tx) => tx.poll_cancel(),
Callback::NoRetry(ref mut tx) => tx.poll_cancel(),
}
}
@@ -164,8 +166,7 @@ mod tests {
#[cfg(feature = "nightly")]
extern crate test;
use futures::{future, FutureExt};
use futures::executor::block_on;
use futures::{future, Future};
#[cfg(feature = "nightly")]
use futures::{Stream};
@@ -174,7 +175,7 @@ mod tests {
fn drop_receiver_sends_cancel_errors() {
let _ = pretty_env_logger::try_init();
block_on(future::lazy(|_| {
future::lazy(|| {
#[derive(Debug)]
struct Custom(i32);
let (mut tx, rx) = super::channel::<Custom, ()>();
@@ -191,7 +192,7 @@ mod tests {
Ok::<(), ()>(())
})
})).unwrap();
}).wait().unwrap();
}
#[cfg(feature = "nightly")]
@@ -200,18 +201,18 @@ mod tests {
let (mut tx, mut rx) = super::channel::<i32, ()>();
b.iter(move || {
block_on(future::lazy(|cx| {
::futures::future::lazy(|| {
let _ = tx.send(1).unwrap();
loop {
let async = rx.poll_next(cx).unwrap();
if async.is_pending() {
let async = rx.poll().unwrap();
if async.is_not_ready() {
break;
}
}
Ok::<(), ()>(())
})).unwrap();
}).wait().unwrap();
})
}
@@ -221,11 +222,11 @@ mod tests {
let (_tx, mut rx) = super::channel::<i32, ()>();
b.iter(move || {
block_on(future::lazy(|cx| {
assert!(rx.poll_next(cx).unwrap().is_pending());
::futures::future::lazy(|| {
assert!(rx.poll().unwrap().is_not_ready());
Ok::<(), ()>(())
})).unwrap();
}).wait().unwrap();
})
}

View File

@@ -6,44 +6,27 @@ use std::net::{
};
use std::vec;
use futures::{Async, Future, Poll};
use futures::task;
use futures::future::lazy;
use futures::executor::Executor;
use futures::channel::oneshot;
use ::futures::{Async, Future, Poll};
pub struct Resolving {
receiver: oneshot::Receiver<Result<IpAddrs, io::Error>>
pub struct Work {
host: String,
port: u16
}
impl Resolving {
pub fn spawn(host: String, port: u16, executor: &mut Executor) -> Resolving {
let (sender, receiver) = oneshot::channel();
// The `Resolving` future will return an error when the sender is dropped,
// so we can just ignore the spawn error here
executor.spawn(Box::new(lazy(move |_| {
debug!("resolving host={:?}, port={:?}", host, port);
let result = (host.as_ref(), port).to_socket_addrs()
.map(|i| IpAddrs { iter: i });
sender.send(result).ok();
Ok(())
}))).ok();
Resolving { receiver }
impl Work {
pub fn new(host: String, port: u16) -> Work {
Work { host: host, port: port }
}
}
impl Future for Resolving {
impl Future for Work {
type Item = IpAddrs;
type Error = io::Error;
fn poll(&mut self, cx: &mut task::Context) -> Poll<IpAddrs, io::Error> {
match self.receiver.poll(cx) {
Ok(Async::Pending) => Ok(Async::Pending),
Ok(Async::Ready(Ok(ips))) => Ok(Async::Ready(ips)),
Ok(Async::Ready(Err(err))) => Err(err),
Err(_) =>
Err(io::Error::new(io::ErrorKind::Other, "dns task was cancelled"))
}
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
debug!("resolving host={:?}, port={:?}", self.host, self.port);
(&*self.host, self.port).to_socket_addrs()
.map(|i| Async::Ready(IpAddrs { iter: i }))
}
}

View File

@@ -6,22 +6,23 @@ use std::marker::PhantomData;
use std::sync::Arc;
use std::time::Duration;
use futures::{Async, Future, FutureExt, Never, Poll};
use futures::channel::oneshot;
use futures::future;
use futures::task;
use futures::{Async, Future, Poll};
use futures::future::{self, Executor};
use futures::sync::oneshot;
use http::{Method, Request, Response, Uri, Version};
use http::header::{Entry, HeaderValue, HOST};
use http::uri::Scheme;
use tokio::reactor::Handle;
use tokio_executor::spawn;
pub use tokio_service::Service;
pub use service::Service;
use proto::body::{Body, Entity};
use proto;
use self::pool::Pool;
pub use self::connect::{Connect, HttpConnector};
use self::background::{bg, Background};
use self::connect::Destination;
pub mod conn;
@@ -35,6 +36,7 @@ mod tests;
/// A Client to make outgoing HTTP requests.
pub struct Client<C, B = proto::Body> {
connector: Arc<C>,
executor: Exec,
h1_writev: bool,
pool: Pool<PoolClient<B>>,
retry_canceled_requests: bool,
@@ -81,9 +83,10 @@ impl Client<HttpConnector, proto::Body> {
impl<C, B> Client<C, B> {
#[inline]
fn configured(config: Config<C, B>) -> Client<C, B> {
fn configured(config: Config<C, B>, exec: Exec) -> Client<C, B> {
Client {
connector: Arc::new(config.connector),
executor: exec,
h1_writev: config.h1_writev,
pool: Pool::new(config.keep_alive, config.keep_alive_timeout),
retry_canceled_requests: config.retry_canceled_requests,
@@ -123,6 +126,14 @@ where C: Connect<Error=io::Error> + Sync + 'static,
/// Send a constructed Request using this Client.
pub fn request(&self, mut req: Request<B>) -> FutureResponse {
// TODO(0.12): do this at construction time.
//
// It cannot be done in the constructor because the Client::configured
// does not have `B: 'static` bounds, which are required to spawn
// the interval. In 0.12, add a static bounds to the constructor,
// and move this.
self.schedule_pool_timer();
match req.version() {
Version::HTTP_10 |
Version::HTTP_11 => (),
@@ -165,6 +176,7 @@ where C: Connect<Error=io::Error> + Sync + 'static,
}
}
let client = self.clone();
let uri = req.uri().clone();
let fut = RetryableSendRequest {
@@ -181,6 +193,7 @@ where C: Connect<Error=io::Error> + Sync + 'static,
let url = req.uri().clone();
let checkout = self.pool.checkout(domain);
let connect = {
let executor = self.executor.clone();
let pool = self.pool.clone();
let pool_key = Arc::new(domain.to_string());
let h1_writev = self.h1_writev;
@@ -188,39 +201,36 @@ where C: Connect<Error=io::Error> + Sync + 'static,
let dst = Destination {
uri: url,
};
future::lazy(move |_| {
future::lazy(move || {
connector.connect(dst)
.err_into()
.from_err()
.and_then(move |(io, connected)| {
conn::Builder::new()
.h1_writev(h1_writev)
.handshake_no_upgrades(io)
.and_then(move |(tx, conn)| {
future::lazy(move |cx| {
execute(conn.recover(|e| {
debug!("client connection error: {}", e);
}), cx)?;
Ok(pool.pooled(pool_key, PoolClient {
is_proxied: connected.is_proxied,
tx: tx,
}))
})
executor.execute(conn.map_err(|e| debug!("client connection error: {}", e)))?;
Ok(pool.pooled(pool_key, PoolClient {
is_proxied: connected.is_proxied,
tx: tx,
}))
})
})
})
};
let race = checkout.select(connect).map(|either| {
either.either(|(pooled, _)| pooled, |(pooled, _)| pooled)
}).map_err(|either| {
// the Pool Checkout cannot error, so the only error
// is from the Connector
// XXX: should wait on the Checkout? Problem is
// that if the connector is failing, it may be that we
// never had a pooled stream at all
ClientError::Normal(either.either(|(e, _)| e, |(e, _)| e))
});
let race = checkout.select(connect)
.map(|(pooled, _work)| pooled)
.map_err(|(e, _checkout)| {
// the Pool Checkout cannot error, so the only error
// is from the Connector
// XXX: should wait on the Checkout? Problem is
// that if the connector is failing, it may be that we
// never had a pooled stream at all
ClientError::Normal(e)
});
let executor = self.executor.clone();
let resp = race.and_then(move |mut pooled| {
let conn_reused = pooled.is_reused();
set_relative_uri(req.uri_mut(), pooled.is_proxied);
@@ -237,35 +247,34 @@ where C: Connect<Error=io::Error> + Sync + 'static,
}
})
.and_then(move |mut res| {
future::lazy(move |cx| {
// when pooled is dropped, it will try to insert back into the
// pool. To delay that, spawn a future that completes once the
// sender is ready again.
//
// This *should* only be once the related `Connection` has polled
// for a new request to start.
//
// It won't be ready if there is a body to stream.
if pooled.tx.is_ready() {
drop(pooled);
} else if !res.body().is_empty() {
let (delayed_tx, delayed_rx) = oneshot::channel();
res.body_mut().delayed_eof(delayed_rx);
// If the executor doesn't have room, oh well. Things will likely
// be blowing up soon, but this specific task isn't required.
let fut = future::poll_fn(move |cx| {
pooled.tx.poll_ready(cx)
// when pooled is dropped, it will try to insert back into the
// pool. To delay that, spawn a future that completes once the
// sender is ready again.
//
// This *should* only be once the related `Connection` has polled
// for a new request to start.
//
// It won't be ready if there is a body to stream.
if pooled.tx.is_ready() {
drop(pooled);
} else if !res.body().is_empty() {
let (delayed_tx, delayed_rx) = oneshot::channel();
res.body_mut().delayed_eof(delayed_rx);
// If the executor doesn't have room, oh well. Things will likely
// be blowing up soon, but this specific task isn't required.
let _ = executor.execute(
future::poll_fn(move || {
pooled.tx.poll_ready()
})
.then(move |_| {
// At this point, `pooled` is dropped, and had a chance
// to insert into the pool (if conn was idle)
drop(delayed_tx);
Ok(())
});
execute(fut, cx).ok();
}
Ok(res)
})
.then(move |_| {
// At this point, `pooled` is dropped, and had a chance
// to insert into the pool (if conn was idle)
drop(delayed_tx);
Ok(())
})
);
}
Ok(res)
});
@@ -274,6 +283,10 @@ where C: Connect<Error=io::Error> + Sync + 'static,
Box::new(resp)
}
fn schedule_pool_timer(&self) {
self.pool.spawn_expired_interval(&self.executor);
}
}
impl<C, B> Service for Client<C, B>
@@ -296,6 +309,7 @@ impl<C, B> Clone for Client<C, B> {
fn clone(&self) -> Client<C, B> {
Client {
connector: self.connector.clone(),
executor: self.executor.clone(),
h1_writev: self.h1_writev,
pool: self.pool.clone(),
retry_canceled_requests: self.retry_canceled_requests,
@@ -325,8 +339,8 @@ impl Future for FutureResponse {
type Item = Response<Body>;
type Error = ::Error;
fn poll(&mut self, cx: &mut task::Context) -> Poll<Self::Item, Self::Error> {
self.0.poll(cx)
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.0.poll()
}
}
@@ -347,11 +361,11 @@ where
type Item = Response<Body>;
type Error = ::Error;
fn poll(&mut self, cx: &mut task::Context) -> Poll<Self::Item, Self::Error> {
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop {
match self.future.poll(cx) {
match self.future.poll() {
Ok(Async::Ready(resp)) => return Ok(Async::Ready(resp)),
Ok(Async::Pending) => return Ok(Async::Pending),
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(ClientError::Normal(err)) => return Err(err),
Err(ClientError::Canceled {
connection_reused,
@@ -559,7 +573,18 @@ where C: Connect<Error=io::Error>,
/// Construct the Client with this configuration.
#[inline]
pub fn build(self) -> Client<C, B> {
Client::configured(self)
Client::configured(self, Exec::Default)
}
/// Construct a Client with this configuration and an executor.
///
/// The executor will be used to spawn "background" connection tasks
/// to drive requests and responses.
pub fn executor<E>(self, executor: E) -> Client<C, B>
where
E: Executor<Background> + Send + Sync + 'static,
{
Client::configured(self, Exec::new(executor))
}
}
@@ -577,6 +602,21 @@ where
}
self.connector(connector).build()
}
/// Construct a Client with this configuration and an executor.
///
/// The executor will be used to spawn "background" connection tasks
/// to drive requests and responses.
pub fn build_with_executor<E>(self, handle: &Handle, executor: E) -> Client<HttpConnector, B>
where
E: Executor<Background> + Send + Sync + 'static,
{
let mut connector = HttpConnector::new(4, handle);
if self.keep_alive {
connector.set_keepalive(self.keep_alive_timeout);
}
self.connector(connector).executor(executor)
}
}
impl<C, B> fmt::Debug for Config<C, B> {
@@ -601,15 +641,68 @@ impl<C: Clone, B> Clone for Config<C, B> {
}
fn execute<F>(fut: F, cx: &mut task::Context) -> Result<(), ::Error>
where F: Future<Item=(), Error=Never> + Send + 'static,
{
if let Some(executor) = cx.executor() {
executor.spawn(Box::new(fut)).map_err(|err| {
debug!("executor error: {:?}", err);
::Error::Executor
})
} else {
Err(::Error::Executor)
// ===== impl Exec =====
#[derive(Clone)]
enum Exec {
Default,
Executor(Arc<Executor<Background> + Send + Sync>),
}
impl Exec {
pub(crate) fn new<E: Executor<Background> + Send + Sync + 'static>(executor: E) -> Exec {
Exec::Executor(Arc::new(executor))
}
fn execute<F>(&self, fut: F) -> io::Result<()>
where
F: Future<Item=(), Error=()> + Send + 'static,
{
match *self {
Exec::Default => spawn(fut),
Exec::Executor(ref e) => {
e.execute(bg(Box::new(fut)))
.map_err(|err| {
debug!("executor error: {:?}", err.kind());
io::Error::new(
io::ErrorKind::Other,
"executor error",
)
})?
},
}
Ok(())
}
}
// ===== impl Background =====
// The types inside this module are not exported out of the crate,
// so they are in essence un-nameable.
mod background {
use futures::{Future, Poll};
// This is basically `impl Future`, since the type is un-nameable,
// and only implementeds `Future`.
#[allow(missing_debug_implementations)]
pub struct Background {
inner: Box<Future<Item=(), Error=()> + Send>,
}
pub fn bg(fut: Box<Future<Item=(), Error=()> + Send>) -> Background {
Background {
inner: fut,
}
}
impl Future for Background {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.inner.poll()
}
}
}

View File

@@ -4,13 +4,15 @@ use std::ops::{Deref, DerefMut};
use std::sync::{Arc, Mutex, Weak};
use std::time::{Duration, Instant};
use futures::{Future, Async, Never, Poll, Stream};
use futures::channel::oneshot;
use futures::task;
use futures::{Future, Async, Poll, Stream};
use futures::sync::oneshot;
use futures_timer::Interval;
pub(super) struct Pool<T> {
inner: Arc<Mutex<PoolInner<T>>>
use common::Never;
use super::Exec;
pub struct Pool<T> {
inner: Arc<Mutex<PoolInner<T>>>,
}
// Before using a pooled connection, make sure the sender is not dead.
@@ -44,7 +46,7 @@ struct PoolInner<T> {
}
impl<T> Pool<T> {
pub(super) fn new(enabled: bool, timeout: Option<Duration>) -> Pool<T> {
pub fn new(enabled: bool, timeout: Option<Duration>) -> Pool<T> {
Pool {
inner: Arc::new(Mutex::new(PoolInner {
enabled: enabled,
@@ -52,7 +54,7 @@ impl<T> Pool<T> {
idle_interval_ref: None,
parked: HashMap::new(),
timeout: timeout,
}))
})),
}
}
}
@@ -63,7 +65,6 @@ impl<T: Closed> Pool<T> {
key: Arc::new(key.to_owned()),
pool: self.clone(),
parked: None,
spawned_expired_interval: false
}
}
@@ -218,16 +219,16 @@ impl<T: Closed> PoolInner<T> {
impl<T: Closed + Send + 'static> Pool<T> {
fn spawn_expired_interval(&mut self, cx: &mut task::Context) -> Result<(), ::Error> {
pub(super) fn spawn_expired_interval(&self, exec: &Exec) {
let (dur, rx) = {
let mut inner = self.inner.lock().unwrap();
if !inner.enabled {
return Ok(());
return;
}
if inner.idle_interval_ref.is_some() {
return Ok(());
return;
}
if let Some(dur) = inner.timeout {
@@ -235,23 +236,23 @@ impl<T: Closed + Send + 'static> Pool<T> {
inner.idle_interval_ref = Some(tx);
(dur, rx)
} else {
return Ok(());
return
}
};
let interval = Interval::new(dur);
super::execute(IdleInterval {
exec.execute(IdleInterval {
interval: interval,
pool: Arc::downgrade(&self.inner),
pool_drop_notifier: rx,
}, cx)
}).unwrap();
}
}
impl<T> Clone for Pool<T> {
fn clone(&self) -> Pool<T> {
Pool {
inner: self.inner.clone()
inner: self.inner.clone(),
}
}
}
@@ -321,23 +322,22 @@ pub struct Checkout<T> {
key: Arc<String>,
pool: Pool<T>,
parked: Option<oneshot::Receiver<T>>,
spawned_expired_interval: bool
}
struct NotParked;
impl<T: Closed> Checkout<T> {
fn poll_parked(&mut self, cx: &mut task::Context) -> Poll<Pooled<T>, NotParked> {
fn poll_parked(&mut self) -> Poll<Pooled<T>, NotParked> {
let mut drop_parked = false;
if let Some(ref mut rx) = self.parked {
match rx.poll(cx) {
match rx.poll() {
Ok(Async::Ready(value)) => {
if !value.is_closed() {
return Ok(Async::Ready(self.pool.reuse(&self.key, value)));
}
drop_parked = true;
},
Ok(Async::Pending) => return Ok(Async::Pending),
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(_canceled) => drop_parked = true,
}
}
@@ -347,27 +347,22 @@ impl<T: Closed> Checkout<T> {
Err(NotParked)
}
fn park(&mut self, cx: &mut task::Context) {
fn park(&mut self) {
if self.parked.is_none() {
let (tx, mut rx) = oneshot::channel();
let _ = rx.poll(cx); // park this task
let _ = rx.poll(); // park this task
self.pool.park(self.key.clone(), tx);
self.parked = Some(rx);
}
}
}
impl<T: Closed + Send + 'static> Future for Checkout<T> {
impl<T: Closed> Future for Checkout<T> {
type Item = Pooled<T>;
type Error = ::Error;
fn poll(&mut self, cx: &mut task::Context) -> Poll<Self::Item, Self::Error> {
if !self.spawned_expired_interval {
self.pool.spawn_expired_interval(cx)?;
self.spawned_expired_interval = true;
}
match self.poll_parked(cx) {
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.poll_parked() {
Ok(async) => return Ok(async),
Err(_not_parked) => (),
}
@@ -377,8 +372,8 @@ impl<T: Closed + Send + 'static> Future for Checkout<T> {
if let Some(pooled) = entry {
Ok(Async::Ready(pooled))
} else {
self.park(cx);
Ok(Async::Pending)
self.park();
Ok(Async::NotReady)
}
}
}
@@ -418,20 +413,20 @@ struct IdleInterval<T> {
impl<T: Closed + 'static> Future for IdleInterval<T> {
type Item = ();
type Error = Never;
type Error = ();
fn poll(&mut self, cx: &mut task::Context) -> Poll<Self::Item, Self::Error> {
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop {
match self.pool_drop_notifier.poll(cx) {
match self.pool_drop_notifier.poll() {
Ok(Async::Ready(n)) => match n {},
Ok(Async::Pending) => (),
Ok(Async::NotReady) => (),
Err(_canceled) => {
trace!("pool closed, canceling idle interval");
return Ok(Async::Ready(()));
}
}
try_ready!(self.interval.poll_next(cx).map_err(|_| unreachable!("interval cannot error")));
try_ready!(self.interval.poll().map_err(|_| unreachable!("interval cannot error")));
if let Some(inner) = self.pool.upgrade() {
if let Ok(mut inner) = inner.lock() {
@@ -448,10 +443,9 @@ impl<T: Closed + 'static> Future for IdleInterval<T> {
mod tests {
use std::sync::Arc;
use std::time::Duration;
use futures::{Async, Future, FutureExt};
use futures::{Async, Future};
use futures::future;
use futures::executor::block_on;
use super::{Closed, Pool};
use super::{Closed, Pool, Exec};
impl Closed for i32 {
fn is_closed(&self) -> bool {
@@ -461,37 +455,34 @@ mod tests {
#[test]
fn test_pool_checkout_smoke() {
block_on(future::lazy(|cx| {
let pool = Pool::new(true, Some(Duration::from_secs(5)));
let key = Arc::new("foo".to_string());
let pooled = pool.pooled(key.clone(), 41);
let pool = Pool::new(true, Some(Duration::from_secs(5)));
let key = Arc::new("foo".to_string());
let pooled = pool.pooled(key.clone(), 41);
drop(pooled);
drop(pooled);
match pool.checkout(&key).poll(cx).unwrap() {
Async::Ready(pooled) => assert_eq!(*pooled, 41),
_ => panic!("not ready"),
}
Ok::<_, ()>(())
})).unwrap();
match pool.checkout(&key).poll().unwrap() {
Async::Ready(pooled) => assert_eq!(*pooled, 41),
_ => panic!("not ready"),
}
}
#[test]
fn test_pool_checkout_returns_none_if_expired() {
block_on(future::lazy(|cx| {
future::lazy(|| {
let pool = Pool::new(true, Some(Duration::from_millis(100)));
let key = Arc::new("foo".to_string());
let pooled = pool.pooled(key.clone(), 41);
drop(pooled);
::std::thread::sleep(pool.inner.lock().unwrap().timeout.unwrap());
assert!(pool.checkout(&key).poll(cx).unwrap().is_pending());
Ok::<_, ()>(())
})).unwrap();
assert!(pool.checkout(&key).poll().unwrap().is_not_ready());
::futures::future::ok::<(), ()>(())
}).wait().unwrap();
}
#[test]
fn test_pool_checkout_removes_expired() {
block_on(future::lazy(|cx| {
future::lazy(|| {
let pool = Pool::new(true, Some(Duration::from_millis(100)));
let key = Arc::new("foo".to_string());
@@ -503,20 +494,20 @@ mod tests {
::std::thread::sleep(pool.inner.lock().unwrap().timeout.unwrap());
// checkout.poll() should clean out the expired
pool.checkout(&key).poll(cx).unwrap();
pool.checkout(&key).poll().unwrap();
assert!(pool.inner.lock().unwrap().idle.get(&key).is_none());
Ok::<_, ()>(())
})).unwrap();
Ok::<(), ()>(())
}).wait().unwrap();
}
#[test]
fn test_pool_timer_removes_expired() {
let mut pool = Pool::new(true, Some(Duration::from_millis(100)));
let runtime = ::tokio::runtime::Runtime::new().unwrap();
let pool = Pool::new(true, Some(Duration::from_millis(100)));
block_on(future::lazy(|cx| {
pool.spawn_expired_interval(cx)
})).unwrap();
let executor = runtime.executor();
pool.spawn_expired_interval(&Exec::new(executor));
let key = Arc::new("foo".to_string());
pool.pooled(key.clone(), 41);
@@ -525,9 +516,9 @@ mod tests {
assert_eq!(pool.inner.lock().unwrap().idle.get(&key).map(|entries| entries.len()), Some(3));
block_on(::futures_timer::Delay::new(
::futures_timer::Delay::new(
Duration::from_millis(400) // allow for too-good resolution
)).unwrap();
).wait().unwrap();
assert!(pool.inner.lock().unwrap().idle.get(&key).is_none());
}
@@ -538,7 +529,7 @@ mod tests {
let key = Arc::new("foo".to_string());
let pooled = pool.pooled(key.clone(), 41);
let checkout = pool.checkout(&key).join(future::lazy(move |_| {
let checkout = pool.checkout(&key).join(future::lazy(move || {
// the checkout future will park first,
// and then this lazy future will be polled, which will insert
// the pooled back into the pool
@@ -547,12 +538,12 @@ mod tests {
drop(pooled);
Ok(())
})).map(|(entry, _)| entry);
assert_eq!(*block_on(checkout).unwrap(), 41);
assert_eq!(*checkout.wait().unwrap(), 41);
}
#[test]
fn test_pool_checkout_drop_cleans_up_parked() {
block_on(future::lazy(|cx| {
future::lazy(|| {
let pool = Pool::<i32>::new(true, Some(Duration::from_secs(10)));
let key = Arc::new("localhost:12345".to_string());
@@ -560,9 +551,9 @@ mod tests {
let mut checkout2 = pool.checkout(&key);
// first poll needed to get into Pool's parked
checkout1.poll(cx).unwrap();
checkout1.poll().unwrap();
assert_eq!(pool.inner.lock().unwrap().parked.get(&key).unwrap().len(), 1);
checkout2.poll(cx).unwrap();
checkout2.poll().unwrap();
assert_eq!(pool.inner.lock().unwrap().parked.get(&key).unwrap().len(), 2);
// on drop, clean up Pool
@@ -572,7 +563,7 @@ mod tests {
drop(checkout2);
assert!(pool.inner.lock().unwrap().parked.get(&key).is_none());
Ok::<_, ()>(())
})).unwrap();
::futures::future::ok::<(), ()>(())
}).wait().unwrap();
}
}

View File

@@ -5,7 +5,6 @@ use std::time::Duration;
use futures::Async;
use futures::future::poll_fn;
use futures::executor::block_on;
use tokio::executor::thread_pool::{Builder as ThreadPoolBuilder};
use mock::MockConnector;
@@ -23,7 +22,7 @@ fn retryable_request() {
let client = Client::configure()
.connector(connector)
.build();
.executor(executor.sender().clone());
{
@@ -31,13 +30,13 @@ fn retryable_request() {
.uri("http://mock.local/a")
.body(Default::default())
.unwrap();
let res1 = client.request(req).with_executor(executor.sender().clone());
let srv1 = poll_fn(|cx| {
try_ready!(sock1.read(cx, &mut [0u8; 512]));
let res1 = client.request(req);
let srv1 = poll_fn(|| {
try_ready!(sock1.read(&mut [0u8; 512]));
try_ready!(sock1.write(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n"));
Ok(Async::Ready(()))
});
block_on(res1.join(srv1)).expect("res1");
res1.join(srv1).wait().expect("res1");
}
drop(sock1);
@@ -45,17 +44,17 @@ fn retryable_request() {
.uri("http://mock.local/b")
.body(Default::default())
.unwrap();
let res2 = client.request(req).with_executor(executor.sender().clone())
let res2 = client.request(req)
.map(|res| {
assert_eq!(res.status().as_u16(), 222);
});
let srv2 = poll_fn(|cx| {
try_ready!(sock2.read(cx, &mut [0u8; 512]));
let srv2 = poll_fn(|| {
try_ready!(sock2.read(&mut [0u8; 512]));
try_ready!(sock2.write(b"HTTP/1.1 222 OK\r\nContent-Length: 0\r\n\r\n"));
Ok(Async::Ready(()))
});
block_on(res2.join(srv2)).expect("res2");
res2.join(srv2).wait().expect("res2");
}
#[test]
@@ -69,7 +68,7 @@ fn conn_reset_after_write() {
let client = Client::configure()
.connector(connector)
.build();
.executor(executor.sender().clone());
{
let req = Request::builder()
@@ -78,13 +77,13 @@ fn conn_reset_after_write() {
.header("content-length", "0")
.body(Default::default())
.unwrap();
let res1 = client.request(req).with_executor(executor.sender().clone());
let srv1 = poll_fn(|cx| {
try_ready!(sock1.read(cx, &mut [0u8; 512]));
let res1 = client.request(req);
let srv1 = poll_fn(|| {
try_ready!(sock1.read(&mut [0u8; 512]));
try_ready!(sock1.write(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n"));
Ok(Async::Ready(()))
});
block_on(res1.join(srv1)).expect("res1");
res1.join(srv1).wait().expect("res1");
}
// sleep to allow some time for the connection to return to the pool
@@ -94,20 +93,20 @@ fn conn_reset_after_write() {
.uri("http://mock.local/a")
.body(Default::default())
.unwrap();
let res2 = client.request(req).with_executor(executor.sender().clone());
let res2 = client.request(req);
let mut sock1 = Some(sock1);
let srv2 = poll_fn(|cx| {
let srv2 = poll_fn(|| {
// We purposefully keep the socket open until the client
// has written the second request, and THEN disconnect.
//
// Not because we expect servers to be jerks, but to trigger
// state where we write on an assumedly good connetion, and
// only reset the close AFTER we wrote bytes.
try_ready!(sock1.as_mut().unwrap().read(cx, &mut [0u8; 512]));
try_ready!(sock1.as_mut().unwrap().read(&mut [0u8; 512]));
sock1.take();
Ok(Async::Ready(()))
});
let err = block_on(res2.join(srv2)).expect_err("res2");
let err = res2.join(srv2).wait().expect_err("res2");
match err {
::Error::Incomplete => (),
other => panic!("expected Incomplete, found {:?}", other)

2
src/common/mod.rs Normal file
View File

@@ -0,0 +1,2 @@
#[derive(Debug)]
pub enum Never {}

View File

@@ -21,8 +21,7 @@ use self::Error::{
Io,
TooLarge,
Incomplete,
Utf8,
Executor
Utf8
};
/// Result type often returned from methods that can have hyper `Error`s.
@@ -57,8 +56,6 @@ pub enum Error {
Io(IoError),
/// Parsing a field as string failed
Utf8(Utf8Error),
/// Executing a future failed
Executor,
#[doc(hidden)]
__Nonexhaustive(Void)
@@ -130,7 +127,6 @@ impl StdError for Error {
Cancel(ref e) => e.description(),
Io(ref e) => e.description(),
Utf8(ref e) => e.description(),
Executor => "executor is missing or failed to spawn",
Error::__Nonexhaustive(..) => unreachable!(),
}
}

View File

@@ -1,17 +0,0 @@
use futures::executor::Executor;
pub(crate) trait CloneBoxedExecutor: Executor + Send + Sync {
fn clone_boxed(&self) -> Box<CloneBoxedExecutor + Send + Sync>;
}
impl<E: Executor + Clone + Send + Sync + 'static> CloneBoxedExecutor for E {
fn clone_boxed(&self) -> Box<CloneBoxedExecutor + Send + Sync> {
Box::new(self.clone())
}
}
impl Clone for Box<CloneBoxedExecutor> {
fn clone(&self) -> Self {
self.clone_boxed()
}
}

View File

@@ -128,7 +128,7 @@ fn eq_ascii(left: &str, right: &str) -> bool {
// As of Rust 1.23, str gained this method inherently, and so the
// compiler says this trait is unused.
//
// Once our minimum Rust compiler version is >=1.23, this can be removed.
// TODO: Once our minimum Rust compiler version is >=1.23, this can be removed.
#[allow(unused, deprecated)]
use std::ascii::AsciiExt;

View File

@@ -18,6 +18,7 @@
extern crate bytes;
#[macro_use] extern crate futures;
extern crate futures_cpupool;
extern crate futures_timer;
extern crate http;
extern crate httparse;
@@ -27,6 +28,8 @@ extern crate net2;
extern crate time;
extern crate tokio;
extern crate tokio_executor;
#[macro_use] extern crate tokio_io;
extern crate tokio_service;
extern crate want;
#[cfg(all(test, feature = "nightly"))]
@@ -46,8 +49,7 @@ pub use error::{Result, Error};
pub use proto::{body, Body, Chunk};
pub use server::Server;
mod executor;
mod service;
mod common;
#[cfg(test)]
mod mock;
pub mod client;

View File

@@ -3,10 +3,10 @@ use std::cmp;
use std::io::{self, Read, Write};
use std::sync::{Arc, Mutex};
use bytes::Buf;
use futures::{Async, Poll};
use futures::task;
use futures::io::{AsyncRead, AsyncWrite};
use iovec::IoVec;
use futures::task::{self, Task};
use tokio_io::{AsyncRead, AsyncWrite};
use ::client::connect::{Connect, Connected, Destination};
@@ -77,7 +77,7 @@ pub struct AsyncIo<T> {
max_read_vecs: usize,
num_writes: usize,
park_tasks: bool,
task: Option<task::Waker>,
task: Option<Task>,
}
impl<T> AsyncIo<T> {
@@ -99,7 +99,7 @@ impl<T> AsyncIo<T> {
self.bytes_until_block = bytes;
if let Some(task) = self.task.take() {
task.wake();
task.notify();
}
}
@@ -130,12 +130,12 @@ impl<T> AsyncIo<T> {
self.num_writes
}
fn would_block<X, E>(&mut self, cx: &mut task::Context) -> Poll<X, E> {
fn would_block(&mut self) -> io::Error {
self.blocked = true;
if self.park_tasks {
self.task = Some(cx.waker().clone());
self.task = Some(task::current());
}
Ok(Async::Pending)
io::ErrorKind::WouldBlock.into()
}
}
@@ -159,101 +159,118 @@ impl AsyncIo<MockCursor> {
}
}
impl<T: Read + Write> AsyncIo<T> {
fn write_no_vecs<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
if !buf.has_remaining() {
return Ok(Async::Ready(0));
}
let n = try_nb!(self.write(buf.bytes()));
buf.advance(n);
Ok(Async::Ready(n))
}
}
impl<S: AsRef<[u8]>, T: AsRef<[u8]>> PartialEq<S> for AsyncIo<T> {
fn eq(&self, other: &S) -> bool {
self.inner.as_ref() == other.as_ref()
}
}
impl<T: Read> AsyncRead for AsyncIo<T> {
fn poll_read(&mut self, cx: &mut task::Context, buf: &mut [u8]) -> Poll<usize, io::Error> {
impl<T: Read> Read for AsyncIo<T> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.blocked = false;
if let Some(err) = self.error.take() {
Err(err)
} else if self.bytes_until_block == 0 {
self.would_block(cx)
Err(self.would_block())
} else {
let n = cmp::min(self.bytes_until_block, buf.len());
let n = try!(self.inner.read(&mut buf[..n]));
self.bytes_until_block -= n;
Ok(Async::Ready(n))
Ok(n)
}
}
}
impl<T: Read + Write> AsyncIo<T> {
fn write_no_vecs(&mut self, cx: &mut task::Context, buf: &[u8]) -> Poll<usize, io::Error> {
if buf.len() == 0 {
return Ok(Async::Ready(0));
}
self.poll_write(cx, buf)
}
}
impl<T: Read + Write> AsyncWrite for AsyncIo<T> {
fn poll_write(&mut self, cx: &mut task::Context, buf: &[u8]) -> Poll<usize, io::Error> {
impl<T: Write> Write for AsyncIo<T> {
fn write(&mut self, data: &[u8]) -> io::Result<usize> {
self.num_writes += 1;
if let Some(err) = self.error.take() {
trace!("AsyncIo::write error");
Err(err)
} else if self.bytes_until_block == 0 {
trace!("AsyncIo::write would block");
self.would_block(cx)
Err(self.would_block())
} else {
trace!("AsyncIo::write; {} bytes", buf.len());
trace!("AsyncIo::write; {} bytes", data.len());
self.flushed = false;
let n = cmp::min(self.bytes_until_block, buf.len());
let n = try!(self.inner.write(&buf[..n]));
let n = cmp::min(self.bytes_until_block, data.len());
let n = try!(self.inner.write(&data[..n]));
self.bytes_until_block -= n;
Ok(Async::Ready(n))
Ok(n)
}
}
fn poll_flush(&mut self, _cx: &mut task::Context) -> Poll<(), io::Error> {
fn flush(&mut self) -> io::Result<()> {
self.flushed = true;
try!(self.inner.flush());
Ok(Async::Ready(()))
self.inner.flush()
}
}
fn poll_close(&mut self, _cx: &mut task::Context) -> Poll<(), io::Error> {
impl<T: Read + Write> AsyncRead for AsyncIo<T> {
}
impl<T: Read + Write> AsyncWrite for AsyncIo<T> {
fn shutdown(&mut self) -> Poll<(), io::Error> {
Ok(().into())
}
fn poll_vectored_write(&mut self, cx: &mut task::Context, vec: &[&IoVec]) -> Poll<usize, io::Error> {
fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
if self.max_read_vecs == 0 {
if let Some(ref first_iovec) = vec.get(0) {
return self.write_no_vecs(cx, &*first_iovec)
} else {
return Ok(Async::Ready(0));
}
return self.write_no_vecs(buf);
}
let mut n = 0;
let mut ret = Ok(Async::Ready(0));
// each call to poll_write() will increase our count, but we assume
// that if iovecs are used, its really only 1 write call.
let num_writes = self.num_writes;
for buf in vec {
match self.poll_write(cx, &buf) {
Ok(Async::Ready(num)) => {
n += num;
ret = Ok(Async::Ready(n));
},
Ok(Async::Pending) => {
if let Ok(Async::Ready(0)) = ret {
ret = Ok(Async::Pending);
let r = {
static DUMMY: &[u8] = &[0];
let mut bufs = [From::from(DUMMY); READ_VECS_CNT];
let i = Buf::bytes_vec(&buf, &mut bufs[..self.max_read_vecs]);
let mut n = 0;
let mut ret = Ok(0);
// each call to write() will increase our count, but we assume
// that if iovecs are used, its really only 1 write call.
let num_writes = self.num_writes;
for iovec in &bufs[..i] {
match self.write(iovec) {
Ok(num) => {
n += num;
ret = Ok(n);
},
Err(e) => {
if e.kind() == io::ErrorKind::WouldBlock {
if let Ok(0) = ret {
ret = Err(e);
}
} else {
ret = Err(e);
}
break;
}
break;
},
Err(err) => {
ret = Err(err);
break;
}
}
self.num_writes = num_writes + 1;
ret
};
match r {
Ok(n) => {
Buf::advance(buf, n);
Ok(Async::Ready(n))
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
Ok(Async::NotReady)
}
Err(e) => Err(e),
}
self.num_writes = num_writes + 1;
ret
}
}
@@ -270,34 +287,48 @@ pub struct Duplex {
}
struct DuplexInner {
handle_read_task: Option<task::Waker>,
handle_read_task: Option<Task>,
read: AsyncIo<MockCursor>,
write: AsyncIo<MockCursor>,
}
impl AsyncRead for Duplex {
fn poll_read(&mut self, cx: &mut task::Context, buf: &mut [u8]) -> Poll<usize, io::Error> {
self.inner.lock().unwrap().read.poll_read(cx, buf)
impl Read for Duplex {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.inner.lock().unwrap().read.read(buf)
}
}
impl AsyncWrite for Duplex {
fn poll_write(&mut self, cx: &mut task::Context, buf: &[u8]) -> Poll<usize, io::Error> {
impl Write for Duplex {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let mut inner = self.inner.lock().unwrap();
if let Some(task) = inner.handle_read_task.take() {
trace!("waking DuplexHandle read");
task.wake();
task.notify();
}
inner.write.poll_write(cx, buf)
inner.write.write(buf)
}
fn poll_flush(&mut self, cx: &mut task::Context) -> Poll<(), io::Error> {
self.inner.lock().unwrap().write.poll_flush(cx)
fn flush(&mut self) -> io::Result<()> {
self.inner.lock().unwrap().write.flush()
}
}
fn poll_close(&mut self, _cx: &mut task::Context) -> Poll<(), io::Error> {
impl AsyncRead for Duplex {
}
impl AsyncWrite for Duplex {
fn shutdown(&mut self) -> Poll<(), io::Error> {
Ok(().into())
}
fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
let mut inner = self.inner.lock().unwrap();
if let Some(task) = inner.handle_read_task.take() {
task.notify();
}
inner.write.write_buf(buf)
}
}
pub struct DuplexHandle {
@@ -305,13 +336,13 @@ pub struct DuplexHandle {
}
impl DuplexHandle {
pub fn read(&self, cx: &mut task::Context, buf: &mut [u8]) -> Poll<usize, io::Error> {
pub fn read(&self, buf: &mut [u8]) -> Poll<usize, io::Error> {
let mut inner = self.inner.lock().unwrap();
assert!(buf.len() >= inner.write.inner.len());
if inner.write.inner.is_empty() {
trace!("DuplexHandle read parking");
inner.handle_read_task = Some(cx.waker().clone());
return Ok(Async::Pending);
inner.handle_read_task = Some(task::current());
return Ok(Async::NotReady);
}
inner.write.inner.vec.truncate(0);
Ok(Async::Ready(inner.write.inner.len()))

View File

@@ -3,11 +3,11 @@ use std::borrow::Cow;
use std::fmt;
use bytes::Bytes;
use futures::{Async, Future, Never, Poll, Stream, StreamExt};
use futures::task;
use futures::channel::{mpsc, oneshot};
use futures::{Async, Future, Poll, Stream};
use futures::sync::{mpsc, oneshot};
use http::HeaderMap;
use common::Never;
use super::Chunk;
type BodySender = mpsc::Sender<Result<Chunk, ::Error>>;
@@ -25,14 +25,14 @@ pub trait Entity {
///
/// Similar to `Stream::poll_next`, this yields `Some(Data)` until
/// the body ends, when it yields `None`.
fn poll_data(&mut self, cx: &mut task::Context) -> Poll<Option<Self::Data>, Self::Error>;
fn poll_data(&mut self) -> Poll<Option<Self::Data>, Self::Error>;
/// Poll for an optional **single** `HeaderMap` of trailers.
///
/// This should **only** be called after `poll_data` has ended.
///
/// Note: Trailers aren't currently used for HTTP/1, only for HTTP/2.
fn poll_trailers(&mut self, _cx: &mut task::Context) -> Poll<Option<HeaderMap>, Self::Error> {
fn poll_trailers(&mut self) -> Poll<Option<HeaderMap>, Self::Error> {
Ok(Async::Ready(None))
}
@@ -68,12 +68,12 @@ impl<E: Entity> Entity for Box<E> {
type Data = E::Data;
type Error = E::Error;
fn poll_data(&mut self, cx: &mut task::Context) -> Poll<Option<Self::Data>, Self::Error> {
(**self).poll_data(cx)
fn poll_data(&mut self) -> Poll<Option<Self::Data>, Self::Error> {
(**self).poll_data()
}
fn poll_trailers(&mut self, cx: &mut task::Context) -> Poll<Option<HeaderMap>, Self::Error> {
(**self).poll_trailers(cx)
fn poll_trailers(&mut self) -> Poll<Option<HeaderMap>, Self::Error> {
(**self).poll_trailers()
}
fn is_end_stream(&self) -> bool {
@@ -97,10 +97,10 @@ impl<E: Entity> Stream for EntityStream<E> {
type Item = E::Data;
type Error = E::Error;
fn poll_next(&mut self, cx: &mut task::Context) -> Poll<Option<Self::Item>, Self::Error> {
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
loop {
if self.is_data_eof {
return self.entity.poll_trailers(cx)
return self.entity.poll_trailers()
.map(|async| {
async.map(|_opt| {
// drop the trailers and return that Stream is done
@@ -109,7 +109,7 @@ impl<E: Entity> Stream for EntityStream<E> {
});
}
let opt = try_ready!(self.entity.poll_data(cx));
let opt = try_ready!(self.entity.poll_data());
if let Some(data) = opt {
return Ok(Async::Ready(Some(data)));
} else {
@@ -232,14 +232,14 @@ impl Body {
/// ```
/// # extern crate futures;
/// # extern crate hyper;
/// # use futures::{FutureExt, StreamExt};
/// # use futures::{Future, Stream};
/// # use hyper::{Body, Request};
/// # fn request_concat(some_req: Request<Body>) {
/// let req: Request<Body> = some_req;
/// let body = req.into_body();
///
/// let stream = body.into_stream();
/// stream.concat()
/// stream.concat2()
/// .map(|buf| {
/// println!("body length: {}", buf.len());
/// });
@@ -281,20 +281,20 @@ impl Body {
self.delayed_eof = Some(DelayEof::NotEof(fut));
}
fn poll_eof(&mut self, cx: &mut task::Context) -> Poll<Option<Chunk>, ::Error> {
fn poll_eof(&mut self) -> Poll<Option<Chunk>, ::Error> {
match self.delayed_eof.take() {
Some(DelayEof::NotEof(mut delay)) => {
match self.poll_inner(cx) {
match self.poll_inner() {
ok @ Ok(Async::Ready(Some(..))) |
ok @ Ok(Async::Pending) => {
ok @ Ok(Async::NotReady) => {
self.delayed_eof = Some(DelayEof::NotEof(delay));
ok
},
Ok(Async::Ready(None)) => match delay.poll(cx) {
Ok(Async::Ready(None)) => match delay.poll() {
Ok(Async::Ready(never)) => match never {},
Ok(Async::Pending) => {
Ok(Async::NotReady) => {
self.delayed_eof = Some(DelayEof::Eof(delay));
Ok(Async::Pending)
Ok(Async::NotReady)
},
Err(_done) => {
Ok(Async::Ready(None))
@@ -304,30 +304,30 @@ impl Body {
}
},
Some(DelayEof::Eof(mut delay)) => {
match delay.poll(cx) {
match delay.poll() {
Ok(Async::Ready(never)) => match never {},
Ok(Async::Pending) => {
Ok(Async::NotReady) => {
self.delayed_eof = Some(DelayEof::Eof(delay));
Ok(Async::Pending)
Ok(Async::NotReady)
},
Err(_done) => {
Ok(Async::Ready(None))
},
}
},
None => self.poll_inner(cx),
None => self.poll_inner(),
}
}
fn poll_inner(&mut self, cx: &mut task::Context) -> Poll<Option<Chunk>, ::Error> {
fn poll_inner(&mut self) -> Poll<Option<Chunk>, ::Error> {
match self.kind {
Kind::Chan { ref mut rx, .. } => match rx.poll_next(cx).expect("mpsc cannot error") {
Kind::Chan { ref mut rx, .. } => match rx.poll().expect("mpsc cannot error") {
Async::Ready(Some(Ok(chunk))) => Ok(Async::Ready(Some(chunk))),
Async::Ready(Some(Err(err))) => Err(err),
Async::Ready(None) => Ok(Async::Ready(None)),
Async::Pending => Ok(Async::Pending),
Async::NotReady => Ok(Async::NotReady),
},
Kind::Wrapped(ref mut s) => s.poll_next(cx),
Kind::Wrapped(ref mut s) => s.poll(),
Kind::Once(ref mut val) => Ok(Async::Ready(val.take())),
Kind::Empty => Ok(Async::Ready(None)),
}
@@ -345,8 +345,8 @@ impl Entity for Body {
type Data = Chunk;
type Error = ::Error;
fn poll_data(&mut self, cx: &mut task::Context) -> Poll<Option<Self::Data>, Self::Error> {
self.poll_eof(cx)
fn poll_data(&mut self) -> Poll<Option<Self::Data>, Self::Error> {
self.poll_eof()
}
fn is_end_stream(&self) -> bool {
@@ -367,7 +367,6 @@ impl Entity for Body {
Kind::Empty => Some(0)
}
}
}
impl fmt::Debug for Body {
@@ -379,13 +378,13 @@ impl fmt::Debug for Body {
impl Sender {
/// Check to see if this `Sender` can send more data.
pub fn poll_ready(&mut self, cx: &mut task::Context) -> Poll<(), ()> {
match self.close_rx.poll(cx) {
pub fn poll_ready(&mut self) -> Poll<(), ()> {
match self.close_rx.poll() {
Ok(Async::Ready(())) | Err(_) => return Err(()),
Ok(Async::Pending) => (),
Ok(Async::NotReady) => (),
}
self.tx.poll_ready(cx).map_err(|_| ())
self.tx.poll_ready().map_err(|_| ())
}
/// Sends data on this channel.
@@ -482,11 +481,13 @@ fn _assert_send_sync() {
#[test]
fn test_body_stream_concat() {
use futures::{StreamExt};
use futures::{Stream, Future};
let body = Body::from("hello world");
let total = ::futures::executor::block_on(body.into_stream().concat())
let total = body.into_stream()
.concat2()
.wait()
.unwrap();
assert_eq!(total.as_ref(), b"hello world");

View File

@@ -3,10 +3,10 @@ use std::io::{self};
use std::marker::PhantomData;
use bytes::Bytes;
use futures::{Async, Poll};
use futures::task;
use futures::io::{AsyncRead, AsyncWrite};
use futures::{Async, AsyncSink, Poll, StartSend};
use futures::task::Task;
use http::{Method, Version};
use tokio_io::{AsyncRead, AsyncWrite};
use proto::{BodyLength, Chunk, Decode, Http1Transaction, MessageHead};
use super::io::{Cursor, Buffered};
@@ -113,14 +113,14 @@ where I: AsyncRead + AsyncWrite,
T::should_error_on_parse_eof() && !self.state.is_idle()
}
pub fn read_head(&mut self, cx: &mut task::Context) -> Poll<Option<(MessageHead<T::Incoming>, bool)>, ::Error> {
pub fn read_head(&mut self) -> Poll<Option<(MessageHead<T::Incoming>, bool)>, ::Error> {
debug_assert!(self.can_read_head());
trace!("Conn::read_head");
loop {
let (version, head) = match self.io.parse::<T>(cx) {
let (version, head) = match self.io.parse::<T>() {
Ok(Async::Ready(head)) => (head.version, head),
Ok(Async::Pending) => return Ok(Async::Pending),
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(e) => {
// If we are currently waiting on a message, then an empty
// message should be reported as an error. If not, it is just
@@ -132,7 +132,7 @@ where I: AsyncRead + AsyncWrite,
return if was_mid_parse || must_error {
debug!("parse error ({}) with {} bytes", e, self.io.read_buf().len());
self.on_parse_error(e)
.map(|()| Async::Pending)
.map(|()| Async::NotReady)
} else {
debug!("read eof");
Ok(Async::Ready(None))
@@ -169,7 +169,7 @@ where I: AsyncRead + AsyncWrite,
debug!("decoder error = {:?}", e);
self.state.close_read();
return self.on_parse_error(e)
.map(|()| Async::Pending);
.map(|()| Async::NotReady);
}
};
@@ -193,20 +193,20 @@ where I: AsyncRead + AsyncWrite,
self.state.reading = reading;
}
if !body {
self.try_keep_alive(cx);
self.try_keep_alive();
}
return Ok(Async::Ready(Some((head, body))));
}
}
pub fn read_body(&mut self, cx: &mut task::Context) -> Poll<Option<Chunk>, io::Error> {
pub fn read_body(&mut self) -> Poll<Option<Chunk>, io::Error> {
debug_assert!(self.can_read_body());
trace!("Conn::read_body");
let (reading, ret) = match self.state.reading {
Reading::Body(ref mut decoder) => {
match decoder.decode(&mut self.io, cx) {
match decoder.decode(&mut self.io) {
Ok(Async::Ready(slice)) => {
let (reading, chunk) = if !slice.is_empty() {
return Ok(Async::Ready(Some(Chunk::from(slice))));
@@ -222,7 +222,7 @@ where I: AsyncRead + AsyncWrite,
};
(reading, Ok(Async::Ready(chunk)))
},
Ok(Async::Pending) => return Ok(Async::Pending),
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(e) => {
trace!("decode stream error: {}", e);
(Reading::Closed, Err(e))
@@ -233,19 +233,19 @@ where I: AsyncRead + AsyncWrite,
};
self.state.reading = reading;
self.try_keep_alive(cx);
self.try_keep_alive();
ret
}
pub fn read_keep_alive(&mut self, cx: &mut task::Context) -> Result<(), ::Error> {
pub fn read_keep_alive(&mut self) -> Result<(), ::Error> {
debug_assert!(!self.can_read_head() && !self.can_read_body());
trace!("read_keep_alive; is_mid_message={}", self.is_mid_message());
if self.is_mid_message() {
self.maybe_park_read(cx);
self.maybe_park_read();
} else {
self.require_empty_read(cx)?;
self.require_empty_read()?;
}
Ok(())
}
@@ -257,19 +257,18 @@ where I: AsyncRead + AsyncWrite,
}
}
fn maybe_park_read(&mut self, cx: &mut task::Context) {
fn maybe_park_read(&mut self) {
if !self.io.is_read_blocked() {
// the Io object is ready to read, which means it will never alert
// us that it is ready until we drain it. However, we're currently
// finished reading, so we need to park the task to be able to
// wake back up later when more reading should happen.
let current_waker = cx.waker();
let park = self.state.read_task.as_ref()
.map(|t| !t.will_wake(current_waker))
.map(|t| !t.will_notify_current())
.unwrap_or(true);
if park {
trace!("parking current task");
self.state.read_task = Some(current_waker.clone());
self.state.read_task = Some(::futures::task::current());
}
}
}
@@ -278,14 +277,14 @@ where I: AsyncRead + AsyncWrite,
//
// This should only be called for Clients wanting to enter the idle
// state.
fn require_empty_read(&mut self, cx: &mut task::Context) -> io::Result<()> {
fn require_empty_read(&mut self) -> io::Result<()> {
assert!(!self.can_read_head() && !self.can_read_body());
if !self.io.read_buf().is_empty() {
debug!("received an unexpected {} bytes", self.io.read_buf().len());
Err(io::Error::new(io::ErrorKind::InvalidData, "unexpected bytes after message ended"))
} else {
match self.try_io_read(cx)? {
match self.try_io_read()? {
Async::Ready(0) => {
// case handled in try_io_read
Ok(())
@@ -299,15 +298,15 @@ where I: AsyncRead + AsyncWrite,
};
Err(io::Error::new(io::ErrorKind::InvalidData, desc))
},
Async::Pending => {
Async::NotReady => {
Ok(())
},
}
}
}
fn try_io_read(&mut self, cx: &mut task::Context) -> Poll<usize, io::Error> {
match self.io.read_from_io(cx) {
fn try_io_read(&mut self) -> Poll<usize, io::Error> {
match self.io.read_from_io() {
Ok(Async::Ready(0)) => {
trace!("try_io_read; found EOF on connection: {:?}", self.state);
let must_error = self.should_error_on_eof();
@@ -329,8 +328,8 @@ where I: AsyncRead + AsyncWrite,
Ok(Async::Ready(n)) => {
Ok(Async::Ready(n))
},
Ok(Async::Pending) => {
Ok(Async::Pending)
Ok(Async::NotReady) => {
Ok(Async::NotReady)
},
Err(e) => {
self.state.close();
@@ -340,8 +339,8 @@ where I: AsyncRead + AsyncWrite,
}
fn maybe_notify(&mut self, cx: &mut task::Context) {
// its possible that we returned Pending from poll() without having
fn maybe_notify(&mut self) {
// its possible that we returned NotReady from poll() without having
// exhausted the underlying Io. We would have done this when we
// determined we couldn't keep reading until we knew how writing
// would finish.
@@ -367,9 +366,9 @@ where I: AsyncRead + AsyncWrite,
if !self.io.is_read_blocked() {
if wants_read && self.io.read_buf().is_empty() {
match self.io.read_from_io(cx) {
match self.io.read_from_io() {
Ok(Async::Ready(_)) => (),
Ok(Async::Pending) => {
Ok(Async::NotReady) => {
trace!("maybe_notify; read_from_io blocked");
return
},
@@ -381,16 +380,16 @@ where I: AsyncRead + AsyncWrite,
}
if let Some(ref task) = self.state.read_task {
trace!("maybe_notify; notifying task");
task.wake();
task.notify();
} else {
trace!("maybe_notify; no task to notify");
}
}
}
fn try_keep_alive(&mut self, cx: &mut task::Context) {
fn try_keep_alive(&mut self) {
self.state.try_keep_alive();
self.maybe_notify(cx);
self.maybe_notify();
}
pub fn can_write_head(&self) -> bool {
@@ -476,15 +475,17 @@ where I: AsyncRead + AsyncWrite,
}
}
pub fn write_body(&mut self, _cx: &mut task::Context, chunk: Option<B>) -> Poll<(), io::Error> {
pub fn write_body(&mut self, chunk: Option<B>) -> StartSend<Option<B>, io::Error> {
debug_assert!(self.can_write_body());
if !self.can_buffer_body() {
// if chunk is Some(&[]), aka empty, whatever, just skip it
if chunk.as_ref().map(|c| c.as_ref().is_empty()).unwrap_or(true) {
return Ok(Async::Ready(()));
} else {
return Err(io::Error::new(io::ErrorKind::Other, "tried to write chunk when body can't buffer"));
if let Async::NotReady = self.flush()? {
// if chunk is Some(&[]), aka empty, whatever, just skip it
if chunk.as_ref().map(|c| c.as_ref().is_empty()).unwrap_or(false) {
return Ok(AsyncSink::Ready);
} else {
return Ok(AsyncSink::NotReady(chunk));
}
}
}
@@ -492,7 +493,7 @@ where I: AsyncRead + AsyncWrite,
Writing::Body(ref mut encoder) => {
if let Some(chunk) = chunk {
if chunk.as_ref().is_empty() {
return Ok(Async::Ready(()));
return Ok(AsyncSink::Ready);
}
let encoded = encoder.encode(Cursor::new(chunk));
@@ -505,7 +506,7 @@ where I: AsyncRead + AsyncWrite,
Writing::KeepAlive
}
} else {
return Ok(Async::Ready(()));
return Ok(AsyncSink::Ready);
}
} else {
// end of stream, that means we should try to eof
@@ -528,7 +529,7 @@ where I: AsyncRead + AsyncWrite,
};
self.state.writing = state;
Ok(Async::Ready(()))
Ok(AsyncSink::Ready)
}
// When we get a parse error, depending on what side we are, we might be able
@@ -552,16 +553,16 @@ where I: AsyncRead + AsyncWrite,
Err(err)
}
pub fn flush(&mut self, cx: &mut task::Context) -> Poll<(), io::Error> {
try_ready!(self.io.flush(cx));
self.try_keep_alive(cx);
pub fn flush(&mut self) -> Poll<(), io::Error> {
try_ready!(self.io.flush());
self.try_keep_alive();
trace!("flushed {:?}", self.state);
Ok(Async::Ready(()))
}
pub fn shutdown(&mut self, cx: &mut task::Context) -> Poll<(), io::Error> {
match self.io.io_mut().poll_close(cx) {
Ok(Async::Pending) => Ok(Async::Pending),
pub fn shutdown(&mut self) -> Poll<(), io::Error> {
match self.io.io_mut().shutdown() {
Ok(Async::NotReady) => Ok(Async::NotReady),
Ok(Async::Ready(())) => {
trace!("shut down IO");
Ok(Async::Ready(()))
@@ -611,7 +612,7 @@ struct State {
error: Option<::Error>,
keep_alive: KA,
method: Option<Method>,
read_task: Option<task::Waker>,
read_task: Option<Task>,
reading: Reading,
writing: Writing,
version: Version,
@@ -963,7 +964,7 @@ mod tests {
}
match conn.poll() {
Ok(Async::Pending) => (),
Ok(Async::NotReady) => (),
other => panic!("unexpected frame: {:?}", other)
}
Ok(())

View File

@@ -4,7 +4,6 @@ use std::usize;
use std::io;
use futures::{Async, Poll};
use futures::task;
use bytes::Bytes;
use super::io::MemRead;
@@ -85,7 +84,7 @@ impl Decoder {
}
}
pub fn decode<R: MemRead>(&mut self, body: &mut R, cx: &mut task::Context) -> Poll<Bytes, io::Error> {
pub fn decode<R: MemRead>(&mut self, body: &mut R) -> Poll<Bytes, io::Error> {
trace!("decode; state={:?}", self.kind);
match self.kind {
Length(ref mut remaining) => {
@@ -93,7 +92,7 @@ impl Decoder {
Ok(Async::Ready(Bytes::new()))
} else {
let to_read = *remaining as usize;
let buf = try_ready!(body.read_mem(cx, to_read));
let buf = try_ready!(body.read_mem(to_read));
let num = buf.as_ref().len() as u64;
if num > *remaining {
*remaining = 0;
@@ -109,7 +108,7 @@ impl Decoder {
loop {
let mut buf = None;
// advances the chunked state
*state = try_ready!(state.step(body, cx, size, &mut buf));
*state = try_ready!(state.step(body, size, &mut buf));
if *state == ChunkedState::End {
trace!("end of chunked");
return Ok(Async::Ready(Bytes::new()));
@@ -126,7 +125,7 @@ impl Decoder {
// 8192 chosen because its about 2 packets, there probably
// won't be that much available, so don't have MemReaders
// allocate buffers to big
let slice = try_ready!(body.read_mem(cx, 8192));
let slice = try_ready!(body.read_mem(8192));
*is_eof = slice.is_empty();
Ok(Async::Ready(slice))
}
@@ -153,8 +152,8 @@ impl fmt::Display for Decoder {
}
macro_rules! byte (
($rdr:ident, $cx:ident) => ({
let buf = try_ready!($rdr.read_mem($cx, 1));
($rdr:ident) => ({
let buf = try_ready!($rdr.read_mem(1));
if !buf.is_empty() {
buf[0]
} else {
@@ -167,28 +166,27 @@ macro_rules! byte (
impl ChunkedState {
fn step<R: MemRead>(&self,
body: &mut R,
cx: &mut task::Context,
size: &mut u64,
buf: &mut Option<Bytes>)
-> Poll<ChunkedState, io::Error> {
use self::ChunkedState::*;
match *self {
Size => ChunkedState::read_size(body, cx, size),
SizeLws => ChunkedState::read_size_lws(body, cx),
Extension => ChunkedState::read_extension(body, cx),
SizeLf => ChunkedState::read_size_lf(body, cx, *size),
Body => ChunkedState::read_body(body, cx, size, buf),
BodyCr => ChunkedState::read_body_cr(body, cx),
BodyLf => ChunkedState::read_body_lf(body, cx),
EndCr => ChunkedState::read_end_cr(body, cx),
EndLf => ChunkedState::read_end_lf(body, cx),
Size => ChunkedState::read_size(body, size),
SizeLws => ChunkedState::read_size_lws(body),
Extension => ChunkedState::read_extension(body),
SizeLf => ChunkedState::read_size_lf(body, *size),
Body => ChunkedState::read_body(body, size, buf),
BodyCr => ChunkedState::read_body_cr(body),
BodyLf => ChunkedState::read_body_lf(body),
EndCr => ChunkedState::read_end_cr(body),
EndLf => ChunkedState::read_end_lf(body),
End => Ok(Async::Ready(ChunkedState::End)),
}
}
fn read_size<R: MemRead>(rdr: &mut R, cx: &mut task::Context, size: &mut u64) -> Poll<ChunkedState, io::Error> {
fn read_size<R: MemRead>(rdr: &mut R, size: &mut u64) -> Poll<ChunkedState, io::Error> {
trace!("Read chunk hex size");
let radix = 16;
match byte!(rdr, cx) {
match byte!(rdr) {
b @ b'0'...b'9' => {
*size *= radix;
*size += (b - b'0') as u64;
@@ -211,9 +209,9 @@ impl ChunkedState {
}
Ok(Async::Ready(ChunkedState::Size))
}
fn read_size_lws<R: MemRead>(rdr: &mut R, cx: &mut task::Context) -> Poll<ChunkedState, io::Error> {
fn read_size_lws<R: MemRead>(rdr: &mut R) -> Poll<ChunkedState, io::Error> {
trace!("read_size_lws");
match byte!(rdr, cx) {
match byte!(rdr) {
// LWS can follow the chunk size, but no more digits can come
b'\t' | b' ' => Ok(Async::Ready(ChunkedState::SizeLws)),
b';' => Ok(Async::Ready(ChunkedState::Extension)),
@@ -224,16 +222,16 @@ impl ChunkedState {
}
}
}
fn read_extension<R: MemRead>(rdr: &mut R, cx: &mut task::Context) -> Poll<ChunkedState, io::Error> {
fn read_extension<R: MemRead>(rdr: &mut R) -> Poll<ChunkedState, io::Error> {
trace!("read_extension");
match byte!(rdr, cx) {
match byte!(rdr) {
b'\r' => Ok(Async::Ready(ChunkedState::SizeLf)),
_ => Ok(Async::Ready(ChunkedState::Extension)), // no supported extensions
}
}
fn read_size_lf<R: MemRead>(rdr: &mut R, cx: &mut task::Context, size: u64) -> Poll<ChunkedState, io::Error> {
fn read_size_lf<R: MemRead>(rdr: &mut R, size: u64) -> Poll<ChunkedState, io::Error> {
trace!("Chunk size is {:?}", size);
match byte!(rdr, cx) {
match byte!(rdr) {
b'\n' => {
if size == 0 {
Ok(Async::Ready(ChunkedState::EndCr))
@@ -246,7 +244,7 @@ impl ChunkedState {
}
}
fn read_body<R: MemRead>(rdr: &mut R, cx: &mut task::Context,
fn read_body<R: MemRead>(rdr: &mut R,
rem: &mut u64,
buf: &mut Option<Bytes>)
-> Poll<ChunkedState, io::Error> {
@@ -259,7 +257,7 @@ impl ChunkedState {
};
let to_read = rem_cap;
let slice = try_ready!(rdr.read_mem(cx, to_read));
let slice = try_ready!(rdr.read_mem(to_read));
let count = slice.len();
if count == 0 {
@@ -275,27 +273,27 @@ impl ChunkedState {
Ok(Async::Ready(ChunkedState::BodyCr))
}
}
fn read_body_cr<R: MemRead>(rdr: &mut R, cx: &mut task::Context) -> Poll<ChunkedState, io::Error> {
match byte!(rdr, cx) {
fn read_body_cr<R: MemRead>(rdr: &mut R) -> Poll<ChunkedState, io::Error> {
match byte!(rdr) {
b'\r' => Ok(Async::Ready(ChunkedState::BodyLf)),
_ => Err(io::Error::new(io::ErrorKind::InvalidInput, "Invalid chunk body CR")),
}
}
fn read_body_lf<R: MemRead>(rdr: &mut R, cx: &mut task::Context) -> Poll<ChunkedState, io::Error> {
match byte!(rdr, cx) {
fn read_body_lf<R: MemRead>(rdr: &mut R) -> Poll<ChunkedState, io::Error> {
match byte!(rdr) {
b'\n' => Ok(Async::Ready(ChunkedState::Size)),
_ => Err(io::Error::new(io::ErrorKind::InvalidInput, "Invalid chunk body LF")),
}
}
fn read_end_cr<R: MemRead>(rdr: &mut R, cx: &mut task::Context) -> Poll<ChunkedState, io::Error> {
match byte!(rdr, cx) {
fn read_end_cr<R: MemRead>(rdr: &mut R) -> Poll<ChunkedState, io::Error> {
match byte!(rdr) {
b'\r' => Ok(Async::Ready(ChunkedState::EndLf)),
_ => Err(io::Error::new(io::ErrorKind::InvalidInput, "Invalid chunk end CR")),
}
}
fn read_end_lf<R: MemRead>(rdr: &mut R, cx: &mut task::Context) -> Poll<ChunkedState, io::Error> {
match byte!(rdr, cx) {
fn read_end_lf<R: MemRead>(rdr: &mut R) -> Poll<ChunkedState, io::Error> {
match byte!(rdr) {
b'\n' => Ok(Async::Ready(ChunkedState::End)),
_ => Err(io::Error::new(io::ErrorKind::InvalidInput, "Invalid chunk end LF")),
}
@@ -325,14 +323,11 @@ mod tests {
use super::ChunkedState;
use super::super::io::MemRead;
use futures::{Async, Poll};
use futures::task;
use futures::future::lazy;
use futures::executor::block_on;
use bytes::{BytesMut, Bytes};
use mock::AsyncIo;
impl<'a> MemRead for &'a [u8] {
fn read_mem(&mut self, _cx: &mut task::Context, len: usize) -> Poll<Bytes, io::Error> {
fn read_mem(&mut self, len: usize) -> Poll<Bytes, io::Error> {
let n = ::std::cmp::min(len, self.len());
if n > 0 {
let (a, b) = self.split_at(n);
@@ -352,7 +347,7 @@ mod tests {
fn unwrap(self) -> Bytes {
match self {
Async::Ready(bytes) => bytes,
Async::Pending => panic!(),
Async::NotReady => panic!(),
}
}
}
@@ -360,7 +355,7 @@ mod tests {
fn unwrap(self) -> ChunkedState {
match self {
Async::Ready(state) => state,
Async::Pending => panic!(),
Async::NotReady => panic!(),
}
}
}
@@ -369,12 +364,12 @@ mod tests {
fn test_read_chunk_size() {
use std::io::ErrorKind::{UnexpectedEof, InvalidInput};
fn read(cx: &mut task::Context, s: &str) -> u64 {
fn read(s: &str) -> u64 {
let mut state = ChunkedState::Size;
let rdr = &mut s.as_bytes();
let mut size = 0;
loop {
let result = state.step(rdr, cx, &mut size, &mut None);
let result = state.step(rdr, &mut size, &mut None);
let desc = format!("read_size failed for {:?}", s);
state = result.expect(desc.as_str()).unwrap();
if state == ChunkedState::Body || state == ChunkedState::EndCr {
@@ -384,12 +379,12 @@ mod tests {
size
}
fn read_err(cx: &mut task::Context, s: &str, expected_err: io::ErrorKind) {
fn read_err(s: &str, expected_err: io::ErrorKind) {
let mut state = ChunkedState::Size;
let rdr = &mut s.as_bytes();
let mut size = 0;
loop {
let result = state.step(rdr, cx, &mut size, &mut None);
let result = state.step(rdr, &mut size, &mut None);
state = match result {
Ok(s) => s.unwrap(),
Err(e) => {
@@ -404,111 +399,90 @@ mod tests {
}
}
block_on(lazy(|cx| {
assert_eq!(1, read(cx, "1\r\n"));
assert_eq!(1, read(cx, "01\r\n"));
assert_eq!(0, read(cx, "0\r\n"));
assert_eq!(0, read(cx, "00\r\n"));
assert_eq!(10, read(cx, "A\r\n"));
assert_eq!(10, read(cx, "a\r\n"));
assert_eq!(255, read(cx, "Ff\r\n"));
assert_eq!(255, read(cx, "Ff \r\n"));
// Missing LF or CRLF
read_err(cx, "F\rF", InvalidInput);
read_err(cx, "F", UnexpectedEof);
// Invalid hex digit
read_err(cx, "X\r\n", InvalidInput);
read_err(cx, "1X\r\n", InvalidInput);
read_err(cx, "-\r\n", InvalidInput);
read_err(cx, "-1\r\n", InvalidInput);
// Acceptable (if not fully valid) extensions do not influence the size
assert_eq!(1, read(cx, "1;extension\r\n"));
assert_eq!(10, read(cx, "a;ext name=value\r\n"));
assert_eq!(1, read(cx, "1;extension;extension2\r\n"));
assert_eq!(1, read(cx, "1;;; ;\r\n"));
assert_eq!(2, read(cx, "2; extension...\r\n"));
assert_eq!(3, read(cx, "3 ; extension=123\r\n"));
assert_eq!(3, read(cx, "3 ;\r\n"));
assert_eq!(3, read(cx, "3 ; \r\n"));
// Invalid extensions cause an error
read_err(cx, "1 invalid extension\r\n", InvalidInput);
read_err(cx, "1 A\r\n", InvalidInput);
read_err(cx, "1;no CRLF", UnexpectedEof);
Ok::<_, ()>(())
})).unwrap()
assert_eq!(1, read("1\r\n"));
assert_eq!(1, read("01\r\n"));
assert_eq!(0, read("0\r\n"));
assert_eq!(0, read("00\r\n"));
assert_eq!(10, read("A\r\n"));
assert_eq!(10, read("a\r\n"));
assert_eq!(255, read("Ff\r\n"));
assert_eq!(255, read("Ff \r\n"));
// Missing LF or CRLF
read_err("F\rF", InvalidInput);
read_err("F", UnexpectedEof);
// Invalid hex digit
read_err("X\r\n", InvalidInput);
read_err("1X\r\n", InvalidInput);
read_err("-\r\n", InvalidInput);
read_err("-1\r\n", InvalidInput);
// Acceptable (if not fully valid) extensions do not influence the size
assert_eq!(1, read("1;extension\r\n"));
assert_eq!(10, read("a;ext name=value\r\n"));
assert_eq!(1, read("1;extension;extension2\r\n"));
assert_eq!(1, read("1;;; ;\r\n"));
assert_eq!(2, read("2; extension...\r\n"));
assert_eq!(3, read("3 ; extension=123\r\n"));
assert_eq!(3, read("3 ;\r\n"));
assert_eq!(3, read("3 ; \r\n"));
// Invalid extensions cause an error
read_err("1 invalid extension\r\n", InvalidInput);
read_err("1 A\r\n", InvalidInput);
read_err("1;no CRLF", UnexpectedEof);
}
#[test]
fn test_read_sized_early_eof() {
block_on(lazy(|cx| {
let mut bytes = &b"foo bar"[..];
let mut decoder = Decoder::length(10);
assert_eq!(decoder.decode(&mut bytes, cx).unwrap().unwrap().len(), 7);
let e = decoder.decode(&mut bytes, cx).unwrap_err();
assert_eq!(e.kind(), io::ErrorKind::UnexpectedEof);
Ok::<_, ()>(())
})).unwrap()
let mut bytes = &b"foo bar"[..];
let mut decoder = Decoder::length(10);
assert_eq!(decoder.decode(&mut bytes).unwrap().unwrap().len(), 7);
let e = decoder.decode(&mut bytes).unwrap_err();
assert_eq!(e.kind(), io::ErrorKind::UnexpectedEof);
}
#[test]
fn test_read_chunked_early_eof() {
block_on(lazy(|cx| {
let mut bytes = &b"\
9\r\n\
foo bar\
"[..];
let mut decoder = Decoder::chunked();
assert_eq!(decoder.decode(&mut bytes, cx).unwrap().unwrap().len(), 7);
let e = decoder.decode(&mut bytes, cx).unwrap_err();
assert_eq!(e.kind(), io::ErrorKind::UnexpectedEof);
Ok::<_, ()>(())
})).unwrap()
let mut bytes = &b"\
9\r\n\
foo bar\
"[..];
let mut decoder = Decoder::chunked();
assert_eq!(decoder.decode(&mut bytes).unwrap().unwrap().len(), 7);
let e = decoder.decode(&mut bytes).unwrap_err();
assert_eq!(e.kind(), io::ErrorKind::UnexpectedEof);
}
#[test]
fn test_read_chunked_single_read() {
block_on(lazy(|cx| {
let mut mock_buf = &b"10\r\n1234567890abcdef\r\n0\r\n"[..];
let buf = Decoder::chunked().decode(&mut mock_buf, cx).expect("decode").unwrap();
assert_eq!(16, buf.len());
let result = String::from_utf8(buf.as_ref().to_vec()).expect("decode String");
assert_eq!("1234567890abcdef", &result);
Ok::<_, ()>(())
})).unwrap()
let mut mock_buf = &b"10\r\n1234567890abcdef\r\n0\r\n"[..];
let buf = Decoder::chunked().decode(&mut mock_buf).expect("decode").unwrap();
assert_eq!(16, buf.len());
let result = String::from_utf8(buf.as_ref().to_vec()).expect("decode String");
assert_eq!("1234567890abcdef", &result);
}
#[test]
fn test_read_chunked_after_eof() {
block_on(lazy(|cx| {
let mut mock_buf = &b"10\r\n1234567890abcdef\r\n0\r\n\r\n"[..];
let mut decoder = Decoder::chunked();
let mut mock_buf = &b"10\r\n1234567890abcdef\r\n0\r\n\r\n"[..];
let mut decoder = Decoder::chunked();
// normal read
let buf = decoder.decode(&mut mock_buf, cx).expect("decode").unwrap();
assert_eq!(16, buf.len());
let result = String::from_utf8(buf.as_ref().to_vec()).expect("decode String");
assert_eq!("1234567890abcdef", &result);
// normal read
let buf = decoder.decode(&mut mock_buf).expect("decode").unwrap();
assert_eq!(16, buf.len());
let result = String::from_utf8(buf.as_ref().to_vec()).expect("decode String");
assert_eq!("1234567890abcdef", &result);
// eof read
let buf = decoder.decode(&mut mock_buf, cx).expect("decode").unwrap();
assert_eq!(0, buf.len());
// eof read
let buf = decoder.decode(&mut mock_buf).expect("decode").unwrap();
assert_eq!(0, buf.len());
// ensure read after eof also returns eof
let buf = decoder.decode(&mut mock_buf, cx).expect("decode").unwrap();
assert_eq!(0, buf.len());
Ok::<_, ()>(())
})).unwrap()
// ensure read after eof also returns eof
let buf = decoder.decode(&mut mock_buf).expect("decode").unwrap();
assert_eq!(0, buf.len());
}
// perform an async read using a custom buffer size and causing a blocking
// read at the specified byte
fn read_async(mut decoder: Decoder,
cx: &mut task::Context,
content: &[u8],
block_at: usize)
-> String {
@@ -516,14 +490,14 @@ mod tests {
let mut ins = AsyncIo::new(content, block_at);
let mut outs = Vec::new();
loop {
match decoder.decode(&mut ins, cx).expect("unexpected decode error: {}") {
match decoder.decode(&mut ins).expect("unexpected decode error: {}") {
Async::Ready(buf) => {
if buf.is_empty() {
break; // eof
}
outs.write(buf.as_ref()).expect("write buffer");
},
Async::Pending => {
Async::NotReady => {
ins.block_in(content_len); // we only block once
}
};
@@ -534,14 +508,11 @@ mod tests {
// iterate over the different ways that this async read could go.
// tests blocking a read at each byte along the content - The shotgun approach
fn all_async_cases(content: &str, expected: &str, decoder: Decoder) {
block_on(lazy(|cx| {
let content_len = content.len();
for block_at in 0..content_len {
let actual = read_async(decoder.clone(), cx, content.as_bytes(), block_at);
assert_eq!(expected, &actual) //, "Failed async. Blocking at {}", block_at);
}
Ok::<_, ()>(())
})).unwrap()
let content_len = content.len();
for block_at in 0..content_len {
let actual = read_async(decoder.clone(), content.as_bytes(), block_at);
assert_eq!(expected, &actual) //, "Failed async. Blocking at {}", block_at);
}
}
#[test]

View File

@@ -2,15 +2,13 @@ use std::io;
use bytes::Bytes;
use futures::{Async, Future, Poll, Stream};
use futures::task;
use futures::io::{AsyncRead, AsyncWrite};
use http::{Request, Response, StatusCode};
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_service::Service;
use proto::body::Entity;
use proto::{Body, BodyLength, Conn, Http1Transaction, MessageHead, RequestHead, RequestLine, ResponseHead};
use ::service::Service;
pub struct Dispatcher<D, Bs, I, B, T> {
conn: Conn<I, B, T>,
dispatch: D,
@@ -23,9 +21,9 @@ pub trait Dispatch {
type PollItem;
type PollBody;
type RecvItem;
fn poll_msg(&mut self, cx: &mut task::Context) -> Poll<Option<(Self::PollItem, Option<Self::PollBody>)>, ::Error>;
fn recv_msg(&mut self, cx: &mut task::Context, msg: ::Result<(Self::RecvItem, Body)>) -> ::Result<()>;
fn poll_ready(&mut self, cx: &mut task::Context) -> Poll<(), ()>;
fn poll_msg(&mut self) -> Poll<Option<(Self::PollItem, Option<Self::PollBody>)>, ::Error>;
fn recv_msg(&mut self, msg: ::Result<(Self::RecvItem, Body)>) -> ::Result<()>;
fn poll_ready(&mut self) -> Poll<(), ()>;
fn should_poll(&self) -> bool;
}
@@ -71,57 +69,57 @@ where
/// The "Future" poll function. Runs this dispatcher until the
/// connection is shutdown, or an error occurs.
pub fn poll_until_shutdown(&mut self, cx: &mut task::Context) -> Poll<(), ::Error> {
self.poll_catch(cx, true)
pub fn poll_until_shutdown(&mut self) -> Poll<(), ::Error> {
self.poll_catch(true)
}
/// Run this dispatcher until HTTP says this connection is done,
/// but don't call `AsyncWrite::shutdown` on the underlying IO.
///
/// This is useful for HTTP upgrades.
pub fn poll_without_shutdown(&mut self, cx: &mut task::Context) -> Poll<(), ::Error> {
self.poll_catch(cx, false)
pub fn poll_without_shutdown(&mut self) -> Poll<(), ::Error> {
self.poll_catch(false)
}
fn poll_catch(&mut self, cx: &mut task::Context, should_shutdown: bool) -> Poll<(), ::Error> {
self.poll_inner(cx, should_shutdown).or_else(|e| {
fn poll_catch(&mut self, should_shutdown: bool) -> Poll<(), ::Error> {
self.poll_inner(should_shutdown).or_else(|e| {
// An error means we're shutting down either way.
// We just try to give the error to the user,
// and close the connection with an Ok. If we
// cannot give it to the user, then return the Err.
self.dispatch.recv_msg(cx, Err(e)).map(Async::Ready)
self.dispatch.recv_msg(Err(e)).map(Async::Ready)
})
}
fn poll_inner(&mut self, cx: &mut task::Context, should_shutdown: bool) -> Poll<(), ::Error> {
self.poll_read(cx)?;
self.poll_write(cx)?;
self.poll_flush(cx)?;
fn poll_inner(&mut self, should_shutdown: bool) -> Poll<(), ::Error> {
self.poll_read()?;
self.poll_write()?;
self.poll_flush()?;
if self.is_done() {
if should_shutdown {
try_ready!(self.conn.shutdown(cx));
try_ready!(self.conn.shutdown());
}
self.conn.take_error()?;
Ok(Async::Ready(()))
} else {
Ok(Async::Pending)
Ok(Async::NotReady)
}
}
fn poll_read(&mut self, cx: &mut task::Context) -> Poll<(), ::Error> {
fn poll_read(&mut self) -> Poll<(), ::Error> {
loop {
if self.is_closing {
return Ok(Async::Ready(()));
} else if self.conn.can_read_head() {
try_ready!(self.poll_read_head(cx));
try_ready!(self.poll_read_head());
} else if let Some(mut body) = self.body_tx.take() {
if self.conn.can_read_body() {
match body.poll_ready(cx) {
match body.poll_ready() {
Ok(Async::Ready(())) => (),
Ok(Async::Pending) => {
Ok(Async::NotReady) => {
self.body_tx = Some(body);
return Ok(Async::Pending);
return Ok(Async::NotReady);
},
Err(_canceled) => {
// user doesn't care about the body
@@ -131,7 +129,7 @@ where
return Ok(Async::Ready(()));
}
}
match self.conn.read_body(cx) {
match self.conn.read_body() {
Ok(Async::Ready(Some(chunk))) => {
match body.send_data(chunk) {
Ok(()) => {
@@ -149,9 +147,9 @@ where
Ok(Async::Ready(None)) => {
// just drop, the body will close automatically
},
Ok(Async::Pending) => {
Ok(Async::NotReady) => {
self.body_tx = Some(body);
return Ok(Async::Pending);
return Ok(Async::NotReady);
}
Err(e) => {
body.send_error(::Error::Io(e));
@@ -161,16 +159,16 @@ where
// just drop, the body will close automatically
}
} else {
return self.conn.read_keep_alive(cx).map(Async::Ready);
return self.conn.read_keep_alive().map(Async::Ready);
}
}
}
fn poll_read_head(&mut self, cx: &mut task::Context) -> Poll<(), ::Error> {
fn poll_read_head(&mut self) -> Poll<(), ::Error> {
// can dispatch receive, or does it still care about, an incoming message?
match self.dispatch.poll_ready(cx) {
match self.dispatch.poll_ready() {
Ok(Async::Ready(())) => (),
Ok(Async::Pending) => unreachable!("dispatch not ready when conn is"),
Ok(Async::NotReady) => unreachable!("dispatch not ready when conn is"),
Err(()) => {
trace!("dispatch no longer receiving messages");
self.close();
@@ -178,27 +176,27 @@ where
}
}
// dispatch is ready for a message, try to read one
match self.conn.read_head(cx) {
match self.conn.read_head() {
Ok(Async::Ready(Some((head, has_body)))) => {
let body = if has_body {
let (mut tx, rx) = Body::channel();
let _ = tx.poll_ready(cx); // register this task if rx is dropped
let _ = tx.poll_ready(); // register this task if rx is dropped
self.body_tx = Some(tx);
rx
} else {
Body::empty()
};
self.dispatch.recv_msg(cx, Ok((head, body)))?;
self.dispatch.recv_msg(Ok((head, body)))?;
Ok(Async::Ready(()))
},
Ok(Async::Ready(None)) => {
// read eof, conn will start to shutdown automatically
Ok(Async::Ready(()))
}
Ok(Async::Pending) => Ok(Async::Pending),
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(err) => {
debug!("read_head error: {}", err);
self.dispatch.recv_msg(cx, Err(err))?;
self.dispatch.recv_msg(Err(err))?;
// if here, the dispatcher gave the user the error
// somewhere else. we still need to shutdown, but
// not as a second error.
@@ -207,12 +205,12 @@ where
}
}
fn poll_write(&mut self, cx: &mut task::Context) -> Poll<(), ::Error> {
fn poll_write(&mut self) -> Poll<(), ::Error> {
loop {
if self.is_closing {
return Ok(Async::Ready(()));
} else if self.body_rx.is_none() && self.conn.can_write_head() && self.dispatch.should_poll() {
if let Some((head, body)) = try_ready!(self.dispatch.poll_msg(cx)) {
if let Some((head, body)) = try_ready!(self.dispatch.poll_msg()) {
let body_type = body.as_ref().map(|body| {
body.content_length()
.map(BodyLength::Known)
@@ -225,27 +223,27 @@ where
return Ok(Async::Ready(()));
}
} else if !self.conn.can_buffer_body() {
try_ready!(self.poll_flush(cx));
try_ready!(self.poll_flush());
} else if let Some(mut body) = self.body_rx.take() {
let chunk = match body.poll_data(cx)? {
let chunk = match body.poll_data()? {
Async::Ready(Some(chunk)) => {
self.body_rx = Some(body);
chunk
},
Async::Ready(None) => {
if self.conn.can_write_body() {
self.conn.write_body(cx, None)?;
self.conn.write_body(None)?;
}
continue;
},
Async::Pending => {
Async::NotReady => {
self.body_rx = Some(body);
return Ok(Async::Pending);
return Ok(Async::NotReady);
}
};
if self.conn.can_write_body() {
self.conn.write_body(cx, Some(chunk))?;
assert!(self.conn.write_body(Some(chunk))?.is_ready());
// This allows when chunk is `None`, or `Some([])`.
} else if chunk.as_ref().len() == 0 {
// ok
@@ -253,13 +251,13 @@ where
warn!("unexpected chunk when body cannot write");
}
} else {
return Ok(Async::Pending);
return Ok(Async::NotReady);
}
}
}
fn poll_flush(&mut self, cx: &mut task::Context) -> Poll<(), ::Error> {
self.conn.flush(cx).map_err(|err| {
fn poll_flush(&mut self) -> Poll<(), ::Error> {
self.conn.flush().map_err(|err| {
debug!("error writing: {}", err);
err.into()
})
@@ -302,8 +300,8 @@ where
type Error = ::Error;
#[inline]
fn poll(&mut self, cx: &mut task::Context) -> Poll<Self::Item, Self::Error> {
self.poll_until_shutdown(cx)
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.poll_until_shutdown()
}
}
@@ -327,13 +325,13 @@ where
type PollBody = Bs;
type RecvItem = RequestHead;
fn poll_msg(&mut self, cx: &mut task::Context) -> Poll<Option<(Self::PollItem, Option<Self::PollBody>)>, ::Error> {
fn poll_msg(&mut self) -> Poll<Option<(Self::PollItem, Option<Self::PollBody>)>, ::Error> {
if let Some(mut fut) = self.in_flight.take() {
let resp = match fut.poll(cx)? {
let resp = match fut.poll()? {
Async::Ready(res) => res,
Async::Pending => {
Async::NotReady => {
self.in_flight = Some(fut);
return Ok(Async::Pending);
return Ok(Async::NotReady);
}
};
let (parts, body) = resp.into_parts();
@@ -353,7 +351,7 @@ where
}
}
fn recv_msg(&mut self, _cx: &mut task::Context, msg: ::Result<(Self::RecvItem, Body)>) -> ::Result<()> {
fn recv_msg(&mut self, msg: ::Result<(Self::RecvItem, Body)>) -> ::Result<()> {
let (msg, body) = msg?;
let mut req = Request::new(body);
*req.method_mut() = msg.subject.0;
@@ -364,9 +362,9 @@ where
Ok(())
}
fn poll_ready(&mut self, _cx: &mut task::Context) -> Poll<(), ()> {
fn poll_ready(&mut self) -> Poll<(), ()> {
if self.in_flight.is_some() {
Ok(Async::Pending)
Ok(Async::NotReady)
} else {
Ok(Async::Ready(()))
}
@@ -397,16 +395,16 @@ where
type PollBody = B;
type RecvItem = ResponseHead;
fn poll_msg(&mut self, cx: &mut task::Context) -> Poll<Option<(Self::PollItem, Option<Self::PollBody>)>, ::Error> {
match self.rx.poll_next(cx) {
fn poll_msg(&mut self) -> Poll<Option<(Self::PollItem, Option<Self::PollBody>)>, ::Error> {
match self.rx.poll() {
Ok(Async::Ready(Some((req, mut cb)))) => {
// check that future hasn't been canceled already
match cb.poll_cancel(cx).expect("poll_cancel cannot error") {
match cb.poll_cancel().expect("poll_cancel cannot error") {
Async::Ready(()) => {
trace!("request canceled");
Ok(Async::Ready(None))
},
Async::Pending => {
Async::NotReady => {
let (parts, body) = req.into_parts();
let head = RequestHead {
version: parts.version,
@@ -429,12 +427,12 @@ where
// user has dropped sender handle
Ok(Async::Ready(None))
},
Ok(Async::Pending) => return Ok(Async::Pending),
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(_) => unreachable!("receiver cannot error"),
}
}
fn recv_msg(&mut self, cx: &mut task::Context, msg: ::Result<(Self::RecvItem, Body)>) -> ::Result<()> {
fn recv_msg(&mut self, msg: ::Result<(Self::RecvItem, Body)>) -> ::Result<()> {
match msg {
Ok((msg, body)) => {
if let Some(cb) = self.callback.take() {
@@ -452,7 +450,7 @@ where
if let Some(cb) = self.callback.take() {
let _ = cb.send(Err((err, None)));
Ok(())
} else if let Ok(Async::Ready(Some((req, cb)))) = self.rx.poll_next(cx) {
} else if let Ok(Async::Ready(Some((req, cb)))) = self.rx.poll() {
trace!("canceling queued request with connection error: {}", err);
// in this case, the message was never even started, so it's safe to tell
// the user that the request was completely canceled
@@ -465,14 +463,14 @@ where
}
}
fn poll_ready(&mut self, cx: &mut task::Context) -> Poll<(), ()> {
fn poll_ready(&mut self) -> Poll<(), ()> {
match self.callback {
Some(ref mut cb) => match cb.poll_cancel(cx) {
Some(ref mut cb) => match cb.poll_cancel() {
Ok(Async::Ready(())) => {
trace!("callback receiver has dropped");
Err(())
},
Ok(Async::Pending) => Ok(Async::Ready(())),
Ok(Async::NotReady) => Ok(Async::Ready(())),
Err(_) => unreachable!("oneshot poll_cancel cannot error"),
},
None => Err(()),
@@ -489,32 +487,31 @@ mod tests {
extern crate pretty_env_logger;
use super::*;
use futures::executor::block_on;
use futures::future::lazy;
use mock::AsyncIo;
use proto::ClientTransaction;
#[test]
fn client_read_bytes_before_writing_request() {
let _ = pretty_env_logger::try_init();
block_on(lazy(|cx| {
::futures::lazy(|| {
let io = AsyncIo::new_buf(b"HTTP/1.1 200 OK\r\n\r\n".to_vec(), 100);
let (mut tx, rx) = ::client::dispatch::channel();
let conn = Conn::<_, ::Chunk, ClientTransaction>::new(io);
let mut dispatcher = Dispatcher::new(Client::new(rx), conn);
let mut res_rx = tx.try_send(::Request::new(::Body::empty())).unwrap();
let res_rx = tx.try_send(::Request::new(::Body::empty())).unwrap();
let a1 = dispatcher.poll(cx).expect("error should be sent on channel");
let a1 = dispatcher.poll().expect("error should be sent on channel");
assert!(a1.is_ready(), "dispatcher should be closed");
let result = res_rx.poll(cx)
.expect("callback poll");
let err = res_rx.wait()
.expect("callback poll")
.expect_err("callback response");
match result {
Async::Ready(Err((::Error::Cancel(_), Some(_)))) => (),
other => panic!("expected Err(Canceled), got {:?}", other),
match err {
(::Error::Cancel(_), Some(_)) => (),
other => panic!("expected Canceled, got {:?}", other),
}
Ok::<_, ()>(())
})).unwrap();
Ok::<(), ()>(())
}).wait().unwrap();
}
}

View File

@@ -1,12 +1,12 @@
use std::cell::Cell;
use std::collections::VecDeque;
use std::fmt;
use std::io;
use bytes::{Buf, BufMut, Bytes, BytesMut};
use futures::{Async, Poll};
use futures::task;
use futures::io::{AsyncRead, AsyncWrite};
use iovec::IoVec;
use tokio_io::{AsyncRead, AsyncWrite};
use proto::{Http1Transaction, MessageHead};
@@ -108,7 +108,7 @@ where
}
}
pub fn parse<S: Http1Transaction>(&mut self, cx: &mut task::Context) -> Poll<MessageHead<S::Incoming>, ::Error> {
pub fn parse<S: Http1Transaction>(&mut self) -> Poll<MessageHead<S::Incoming>, ::Error> {
loop {
match try!(S::parse(&mut self.read_buf)) {
Some((head, len)) => {
@@ -122,7 +122,7 @@ where
}
},
}
match try_ready!(self.read_from_io(cx)) {
match try_ready!(self.read_from_io()) {
0 => {
trace!("parse eof");
return Err(::Error::Incomplete);
@@ -132,21 +132,21 @@ where
}
}
pub fn read_from_io(&mut self, cx: &mut task::Context) -> Poll<usize, io::Error> {
pub fn read_from_io(&mut self) -> Poll<usize, io::Error> {
use bytes::BufMut;
self.read_blocked = false;
if self.read_buf.remaining_mut() < INIT_BUFFER_SIZE {
self.read_buf.reserve(INIT_BUFFER_SIZE);
}
read_buf(&mut self.io, cx, &mut self.read_buf).map(|ok| {
self.io.read_buf(&mut self.read_buf).map(|ok| {
match ok {
Async::Ready(n) => {
debug!("read {} bytes", n);
Async::Ready(n)
},
Async::Pending => {
Async::NotReady => {
self.read_blocked = true;
Async::Pending
Async::NotReady
}
}
})
@@ -164,14 +164,14 @@ where
self.read_blocked
}
pub fn flush(&mut self, cx: &mut task::Context) -> Poll<(), io::Error> {
pub fn flush(&mut self) -> Poll<(), io::Error> {
if self.flush_pipeline && !self.read_buf.is_empty() {
//Ok(())
} else if self.write_buf.remaining() == 0 {
try_ready!(self.io.poll_flush(cx));
try_nb!(self.io.flush());
} else {
loop {
let n = try_ready!(self.write_buf.poll_flush_into(&mut self.io, cx));
let n = try_ready!(self.io.write_buf(&mut self.write_buf.auto()));
debug!("flushed {} bytes", n);
if self.write_buf.remaining() == 0 {
break;
@@ -180,33 +180,14 @@ where
return Err(io::ErrorKind::WriteZero.into())
}
}
try_ready!(self.io.poll_flush(cx))
try_nb!(self.io.flush())
}
Ok(Async::Ready(()))
}
}
fn read_buf<I: AsyncRead, B: BufMut>(io: &mut I, cx: &mut task::Context, buf: &mut B) -> Poll<usize, io::Error> {
if !buf.has_remaining_mut() {
return Ok(Async::Ready(0));
}
unsafe {
let n = {
let b = buf.bytes_mut();
io.initializer().initialize(b);
try_ready!(io.poll_read(cx, b))
};
buf.advance_mut(n);
Ok(Async::Ready(n))
}
}
pub trait MemRead {
fn read_mem(&mut self, cx: &mut task::Context, len: usize) -> Poll<Bytes, io::Error>;
fn read_mem(&mut self, len: usize) -> Poll<Bytes, io::Error>;
}
impl<T, B> MemRead for Buffered<T, B>
@@ -214,12 +195,12 @@ where
T: AsyncRead + AsyncWrite,
B: Buf,
{
fn read_mem(&mut self, cx: &mut task::Context, len: usize) -> Poll<Bytes, io::Error> {
fn read_mem(&mut self, len: usize) -> Poll<Bytes, io::Error> {
if !self.read_buf.is_empty() {
let n = ::std::cmp::min(len, self.read_buf.len());
Ok(Async::Ready(self.read_buf.split_to(n).freeze()))
} else {
let n = try_ready!(self.read_from_io(cx));
let n = try_ready!(self.read_from_io());
Ok(Async::Ready(self.read_buf.split_to(::std::cmp::min(len, n)).freeze()))
}
}
@@ -313,6 +294,11 @@ where
self.strategy = strategy;
}
#[inline]
fn auto(&mut self) -> WriteBufAuto<B> {
WriteBufAuto::new(self)
}
fn buffer(&mut self, buf: B) {
match self.strategy {
Strategy::Flatten => {
@@ -357,48 +343,6 @@ where
unreachable!("head_buf just pushed on back");
}
}
fn poll_flush_into<I: AsyncWrite>(&mut self, io: &mut I, cx: &mut task::Context) -> Poll<usize, io::Error> {
if !self.has_remaining() {
return Ok(Async::Ready(0));
}
let (num_bufs_avail, num_bytes_written, len_first_buf) = {
static PLACEHOLDER: &[u8] = &[0];
let mut bufs = [From::from(PLACEHOLDER); 64];
let num_bufs_avail = self.bytes_vec(&mut bufs[..]);
let num_bytes_written = try_ready!(io.poll_vectored_write(cx, &bufs[..num_bufs_avail]));
(num_bufs_avail, num_bytes_written, bufs[0].len())
};
self.advance(num_bytes_written);
if let Strategy::Auto = self.strategy {
if num_bufs_avail > 1 {
// If there's more than one IoVec available, attempt to
// determine the best buffering strategy based on whether
// the underlying AsyncWrite object supports vectored I/O.
if num_bytes_written == len_first_buf {
// If only the first of many IoVec was written, we can assume
// with some certainty that vectored I/O _is not_ supported.
//
// Switch to a flattening strategy for buffering data.
trace!("detected no usage of vectored write, flattening");
let mut vec = Vec::new();
vec.put(&mut self.buf);
self.buf.bufs.push_back(VecOrBuf::Vec(Cursor::new(vec)));
self.strategy = Strategy::Flatten;
} else if num_bytes_written > len_first_buf {
// If more than the first IoVec was written, we can assume
// with some certainty that vectored I/O _is_ supported.
//
// Switch to a queuing strategy for buffering data.
self.strategy = Strategy::Queue;
}
}
}
Ok(Async::Ready(num_bytes_written))
}
}
impl<B: Buf> fmt::Debug for WriteBuf<B> {
@@ -432,6 +376,65 @@ impl<B: Buf> Buf for WriteBuf<B> {
}
}
/// Detects when wrapped `WriteBuf` is used for vectored IO, and
/// adjusts the `WriteBuf` strategy if not.
struct WriteBufAuto<'a, B: Buf + 'a> {
bytes_called: Cell<bool>,
bytes_vec_called: Cell<bool>,
inner: &'a mut WriteBuf<B>,
}
impl<'a, B: Buf> WriteBufAuto<'a, B> {
fn new(inner: &'a mut WriteBuf<B>) -> WriteBufAuto<'a, B> {
WriteBufAuto {
bytes_called: Cell::new(false),
bytes_vec_called: Cell::new(false),
inner: inner,
}
}
}
impl<'a, B: Buf> Buf for WriteBufAuto<'a, B> {
#[inline]
fn remaining(&self) -> usize {
self.inner.remaining()
}
#[inline]
fn bytes(&self) -> &[u8] {
self.bytes_called.set(true);
self.inner.bytes()
}
#[inline]
fn advance(&mut self, cnt: usize) {
self.inner.advance(cnt)
}
#[inline]
fn bytes_vec<'t>(&'t self, dst: &mut [&'t IoVec]) -> usize {
self.bytes_vec_called.set(true);
self.inner.bytes_vec(dst)
}
}
impl<'a, B: Buf + 'a> Drop for WriteBufAuto<'a, B> {
fn drop(&mut self) {
if let Strategy::Auto = self.inner.strategy {
if self.bytes_vec_called.get() {
self.inner.strategy = Strategy::Queue;
} else if self.bytes_called.get() {
trace!("detected no usage of vectored write, flattening");
self.inner.strategy = Strategy::Flatten;
let mut vec = Vec::new();
vec.put(&mut self.inner.buf);
self.inner.buf.bufs.push_back(VecOrBuf::Vec(Cursor::new(vec)));
}
}
}
}
#[derive(Debug)]
enum Strategy {
Auto,
@@ -565,68 +568,51 @@ impl<T: Buf> Buf for BufDeque<T> {
mod tests {
use super::*;
use std::io::Read;
use futures::task;
use futures::future;
use futures::executor::block_on;
use futures::io::AsyncRead;
use mock::AsyncIo;
#[cfg(test)]
impl<T: Read> MemRead for ::mock::AsyncIo<T> {
fn read_mem(&mut self, cx: &mut task::Context, len: usize) -> Poll<Bytes, io::Error> {
fn read_mem(&mut self, len: usize) -> Poll<Bytes, io::Error> {
let mut v = vec![0; len];
let n = try_ready!(self.poll_read(cx, v.as_mut_slice()));
let n = try_nb!(self.read(v.as_mut_slice()));
Ok(Async::Ready(BytesMut::from(&v[..n]).freeze()))
}
}
#[test]
fn iobuf_write_empty_slice() {
block_on(future::lazy(|cx| {
let mut mock = AsyncIo::new_buf(vec![], 256);
mock.error(io::Error::new(io::ErrorKind::Other, "logic error"));
let mut mock = AsyncIo::new_buf(vec![], 256);
mock.error(io::Error::new(io::ErrorKind::Other, "logic error"));
let mut io_buf = Buffered::<_, Cursor<Vec<u8>>>::new(mock);
let mut io_buf = Buffered::<_, Cursor<Vec<u8>>>::new(mock);
// underlying io will return the logic error upon write,
// so we are testing that the io_buf does not trigger a write
// when there is nothing to flush
io_buf.flush(cx).expect("should short-circuit flush");
Ok::<_, ()>(())
})).unwrap()
// underlying io will return the logic error upon write,
// so we are testing that the io_buf does not trigger a write
// when there is nothing to flush
io_buf.flush().expect("should short-circuit flush");
}
#[test]
fn parse_reads_until_blocked() {
block_on(future::lazy(|cx| {
// missing last line ending
let raw = "HTTP/1.1 200 OK\r\n";
// missing last line ending
let raw = "HTTP/1.1 200 OK\r\n";
let mock = AsyncIo::new_buf(raw, raw.len());
let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(mock);
assert_eq!(buffered.parse::<::proto::ClientTransaction>(cx).unwrap(), Async::Pending);
assert!(buffered.io.blocked());
Ok::<_, ()>(())
})).unwrap()
let mock = AsyncIo::new_buf(raw, raw.len());
let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(mock);
assert_eq!(buffered.parse::<::proto::ClientTransaction>().unwrap(), Async::NotReady);
assert!(buffered.io.blocked());
}
#[test]
fn write_buf_skips_empty_bufs() {
block_on(future::lazy(|cx| {
let mut mock = AsyncIo::new_buf(vec![], 1024);
mock.max_read_vecs(0); // disable vectored IO
let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(mock);
let mut mock = AsyncIo::new_buf(vec![], 1024);
mock.max_read_vecs(0); // disable vectored IO
let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(mock);
buffered.buffer(Cursor::new(Vec::new()));
buffered.buffer(Cursor::new(b"hello".to_vec()));
buffered.flush(cx).unwrap();
assert_eq!(buffered.io, b"hello");
Ok::<_, ()>(())
})).unwrap()
buffered.buffer(Cursor::new(Vec::new()));
buffered.buffer(Cursor::new(b"hello".to_vec()));
buffered.flush().unwrap();
assert_eq!(buffered.io, b"hello");
}
#[test]
@@ -634,22 +620,17 @@ mod tests {
extern crate pretty_env_logger;
let _ = pretty_env_logger::try_init();
block_on(future::lazy(|cx| {
let mock = AsyncIo::new_buf(vec![], 1024);
let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(mock);
let mock = AsyncIo::new_buf(vec![], 1024);
let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(mock);
buffered.write_buf_mut().extend(b"hello ");
buffered.buffer(Cursor::new(b"world, ".to_vec()));
buffered.write_buf_mut().extend(b"it's ");
buffered.buffer(Cursor::new(b"hyper!".to_vec()));
assert_eq!(buffered.io.num_writes(), 0);
buffered.flush(cx).unwrap();
buffered.write_buf_mut().extend(b"hello ");
buffered.buffer(Cursor::new(b"world, ".to_vec()));
buffered.write_buf_mut().extend(b"it's ");
buffered.buffer(Cursor::new(b"hyper!".to_vec()));
buffered.flush().unwrap();
assert_eq!(buffered.io, b"hello world, it's hyper!");
assert_eq!(buffered.io.num_writes(), 1);
Ok::<_, ()>(())
})).unwrap()
assert_eq!(buffered.io, b"hello world, it's hyper!");
assert_eq!(buffered.io.num_writes(), 1);
}
#[test]
@@ -657,31 +638,27 @@ mod tests {
extern crate pretty_env_logger;
let _ = pretty_env_logger::try_init();
block_on(future::lazy(|cx| {
let mock = AsyncIo::new_buf(vec![], 1024);
let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(mock);
let mock = AsyncIo::new_buf(vec![], 1024);
let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(mock);
buffered.write_buf_mut().extend(b"hello ");
assert_eq!(buffered.write_buf.buf.bufs.len(), 1);
buffered.write_buf_mut().extend(b"world, ");
assert_eq!(buffered.write_buf.buf.bufs.len(), 1);
buffered.write_buf_mut().extend(b"hello ");
assert_eq!(buffered.write_buf.buf.bufs.len(), 1);
buffered.write_buf_mut().extend(b"world, ");
assert_eq!(buffered.write_buf.buf.bufs.len(), 1);
// after flushing, reclaim the Vec
buffered.flush(cx).unwrap();
assert_eq!(buffered.write_buf.remaining(), 0);
assert_eq!(buffered.write_buf.buf.bufs.len(), 1);
// after flushing, reclaim the Vec
buffered.flush().unwrap();
assert_eq!(buffered.write_buf.remaining(), 0);
assert_eq!(buffered.write_buf.buf.bufs.len(), 1);
// add a user buf in the way
buffered.buffer(Cursor::new(b"it's ".to_vec()));
// and then add more hyper bytes
buffered.write_buf_mut().extend(b"hyper!");
buffered.flush(cx).unwrap();
assert_eq!(buffered.write_buf.buf.bufs.len(), 1);
// add a user buf in the way
buffered.buffer(Cursor::new(b"it's ".to_vec()));
// and then add more hyper bytes
buffered.write_buf_mut().extend(b"hyper!");
buffered.flush().unwrap();
assert_eq!(buffered.write_buf.buf.bufs.len(), 1);
assert_eq!(buffered.io, b"hello world, it's hyper!");
Ok::<_, ()>(())
})).unwrap()
assert_eq!(buffered.io, b"hello world, it's hyper!");
}
#[test]
@@ -689,25 +666,21 @@ mod tests {
extern crate pretty_env_logger;
let _ = pretty_env_logger::try_init();
block_on(future::lazy(|cx| {
let mock = AsyncIo::new_buf(vec![], 1024);
let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(mock);
buffered.write_buf.set_strategy(Strategy::Flatten);
let mock = AsyncIo::new_buf(vec![], 1024);
let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(mock);
buffered.write_buf.set_strategy(Strategy::Flatten);
buffered.write_buf_mut().extend(b"hello ");
buffered.buffer(Cursor::new(b"world, ".to_vec()));
buffered.write_buf_mut().extend(b"it's ");
buffered.buffer(Cursor::new(b"hyper!".to_vec()));
assert_eq!(buffered.write_buf.buf.bufs.len(), 1);
buffered.write_buf_mut().extend(b"hello ");
buffered.buffer(Cursor::new(b"world, ".to_vec()));
buffered.write_buf_mut().extend(b"it's ");
buffered.buffer(Cursor::new(b"hyper!".to_vec()));
assert_eq!(buffered.write_buf.buf.bufs.len(), 1);
buffered.flush(cx).unwrap();
buffered.flush().unwrap();
assert_eq!(buffered.io, b"hello world, it's hyper!");
assert_eq!(buffered.io.num_writes(), 1);
assert_eq!(buffered.write_buf.buf.bufs.len(), 1);
Ok::<_, ()>(())
})).unwrap()
assert_eq!(buffered.io, b"hello world, it's hyper!");
assert_eq!(buffered.io.num_writes(), 1);
assert_eq!(buffered.write_buf.buf.bufs.len(), 1);
}
#[test]
@@ -715,26 +688,22 @@ mod tests {
extern crate pretty_env_logger;
let _ = pretty_env_logger::try_init();
block_on(future::lazy(|cx| {
let mut mock = AsyncIo::new_buf(vec![], 1024);
mock.max_read_vecs(0); // disable vectored IO
let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(mock);
let mut mock = AsyncIo::new_buf(vec![], 1024);
mock.max_read_vecs(0); // disable vectored IO
let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(mock);
// we have 4 buffers, but hope to detect that vectored IO isn't
// being used, and switch to flattening automatically,
// resulting in only 2 writes
buffered.write_buf_mut().extend(b"hello ");
buffered.buffer(Cursor::new(b"world, ".to_vec()));
buffered.write_buf_mut().extend(b"it's hyper!");
//buffered.buffer(Cursor::new(b"hyper!".to_vec()));
buffered.flush(cx).unwrap();
// we have 4 buffers, but hope to detect that vectored IO isn't
// being used, and switch to flattening automatically,
// resulting in only 2 writes
buffered.write_buf_mut().extend(b"hello ");
buffered.buffer(Cursor::new(b"world, ".to_vec()));
buffered.write_buf_mut().extend(b"it's hyper!");
//buffered.buffer(Cursor::new(b"hyper!".to_vec()));
buffered.flush().unwrap();
assert_eq!(buffered.io, b"hello world, it's hyper!");
assert_eq!(buffered.io.num_writes(), 2);
assert_eq!(buffered.write_buf.buf.bufs.len(), 1);
Ok::<_, ()>(())
})).unwrap()
assert_eq!(buffered.io, b"hello world, it's hyper!");
assert_eq!(buffered.io.num_writes(), 2);
assert_eq!(buffered.write_buf.buf.bufs.len(), 1);
}
#[test]
@@ -742,24 +711,20 @@ mod tests {
extern crate pretty_env_logger;
let _ = pretty_env_logger::try_init();
block_on(future::lazy(move |cx| {
let mut mock = AsyncIo::new_buf(vec![], 1024);
mock.max_read_vecs(0); // disable vectored IO
let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(mock);
buffered.write_buf.set_strategy(Strategy::Queue);
let mut mock = AsyncIo::new_buf(vec![], 1024);
mock.max_read_vecs(0); // disable vectored IO
let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(mock);
buffered.write_buf.set_strategy(Strategy::Queue);
// we have 4 buffers, and vec IO disabled, but explicitly said
// don't try to auto detect (via setting strategy above)
buffered.write_buf_mut().extend(b"hello ");
buffered.buffer(Cursor::new(b"world, ".to_vec()));
buffered.write_buf_mut().extend(b"it's ");
buffered.buffer(Cursor::new(b"hyper!".to_vec()));
buffered.flush(cx).unwrap();
// we have 4 buffers, and vec IO disabled, but explicitly said
// don't try to auto detect (via setting strategy above)
buffered.write_buf_mut().extend(b"hello ");
buffered.buffer(Cursor::new(b"world, ".to_vec()));
buffered.write_buf_mut().extend(b"it's ");
buffered.buffer(Cursor::new(b"hyper!".to_vec()));
buffered.flush().unwrap();
assert_eq!(buffered.io, b"hello world, it's hyper!");
assert_eq!(buffered.io.num_writes(), 4);
Ok::<_, ()>(())
})).unwrap()
assert_eq!(buffered.io, b"hello world, it's hyper!");
assert_eq!(buffered.io.num_writes(), 4);
}
}

View File

@@ -12,8 +12,7 @@ use std::fmt;
use bytes::Bytes;
use futures::{Future, Poll};
use futures::task;
use futures::io::{AsyncRead, AsyncWrite};
use tokio_io::{AsyncRead, AsyncWrite};
use proto;
use proto::body::{Body, Entity};
@@ -90,8 +89,8 @@ where S: Service<Request = Request<Body>, Response = Response<B>, Error = ::Erro
/// upgrade. Once the upgrade is completed, the connection would be "done",
/// but it is not desired to actally shutdown the IO object. Instead you
/// would take it back using `into_parts`.
pub fn poll_without_shutdown(&mut self, cx: &mut task::Context) -> Poll<(), ::Error> {
try_ready!(self.conn.poll_without_shutdown(cx));
pub fn poll_without_shutdown(&mut self) -> Poll<(), ::Error> {
try_ready!(self.conn.poll_without_shutdown());
Ok(().into())
}
}
@@ -104,8 +103,8 @@ where S: Service<Request = Request<Body>, Response = Response<B>, Error = ::Erro
type Item = ();
type Error = ::Error;
fn poll(&mut self, cx: &mut task::Context) -> Poll<Self::Item, Self::Error> {
self.conn.poll(cx)
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.conn.poll()
}
}

View File

@@ -4,6 +4,7 @@
//! them off to a `Service`.
pub mod conn;
mod service;
use std::fmt;
use std::io;
@@ -12,15 +13,16 @@ use std::net::{SocketAddr, TcpListener as StdTcpListener};
use std::sync::{Arc, Mutex, Weak};
use std::time::Duration;
use futures::task::{self, Waker};
use futures::task::{self, Task};
use futures::future::{self};
use futures::{Future, FutureExt, Stream, StreamExt, Poll, Async};
use futures::io::{AsyncRead, AsyncWrite};
use futures::executor::spawn;
use futures::{Future, Stream, Poll, Async};
use futures_timer::Delay;
use http::{Request, Response};
use tokio_io::{AsyncRead, AsyncWrite};
use tokio::spawn;
use tokio::reactor::Handle;
use tokio::net::TcpListener;
pub use tokio_service::{NewService, Service};
use proto::body::{Body, Entity};
use proto;
@@ -28,8 +30,7 @@ use self::addr_stream::AddrStream;
use self::hyper_service::HyperService;
pub use self::conn::Connection;
pub use super::service::{NewService, Service};
pub use super::service::{const_service, service_fn};
pub use self::service::{const_service, service_fn};
/// A configuration of the HTTP protocol.
///
@@ -254,10 +255,11 @@ impl<B: AsRef<[u8]> + 'static> Http<B> {
/// # extern crate futures;
/// # extern crate hyper;
/// # extern crate tokio;
/// # use futures::FutureExt;
/// # use futures::io::{AsyncRead, AsyncWrite};
/// # extern crate tokio_io;
/// # use futures::Future;
/// # use hyper::{Body, Request, Response};
/// # use hyper::server::{Http, Service};
/// # use tokio_io::{AsyncRead, AsyncWrite};
/// # use tokio::reactor::Handle;
/// # fn run<I, S>(some_io: I, some_service: S)
/// # where
@@ -270,9 +272,9 @@ impl<B: AsRef<[u8]> + 'static> Http<B> {
///
/// let fut = conn
/// .map(|_| ())
/// .map_err(|e| panic!("server connection error: {}", e));
/// .map_err(|e| eprintln!("server connection error: {}", e));
///
/// tokio::spawn2(fut);
/// tokio::spawn(fut);
/// # }
/// # fn main() {}
/// ```
@@ -332,8 +334,8 @@ impl Future for Run {
type Item = ();
type Error = ::Error;
fn poll(&mut self, cx: &mut task::Context) -> Poll<(), ::Error> {
self.0.poll(cx)
fn poll(&mut self) -> Poll<(), ::Error> {
self.0.poll()
}
}
@@ -410,21 +412,17 @@ impl<S, B> Server<S, B>
let addr = socket.remote_addr;
debug!("accepted new connection ({})", addr);
let service = match new_service.new_service() {
Ok(service) => service,
Err(err) => return future::err(err).left()
};
let service = new_service.new_service()?;
let s = NotifyService {
inner: service,
info: Arc::downgrade(&info_cloned),
};
info_cloned.lock().unwrap().active += 1;
let fut = protocol.serve_connection(socket, s).recover(move |err| {
error!("server connection error: ({}) {}", addr, err);
});
spawn(fut).map_err(|err| err.never_into()).right()
let fut = protocol.serve_connection(socket, s)
.map(|_| ())
.map_err(move |err| error!("server connection error: ({}) {}", addr, err));
spawn(fut);
Ok(())
});
// for now, we don't care if the shutdown signal succeeds or errors
@@ -440,11 +438,8 @@ impl<S, B> Server<S, B>
// stop accepting incoming connections.
let main_execution = shutdown_signal.select(srv).then(move |result| {
match result {
Ok(_) => {},
Err(future::Either::Left((e, _other))) =>
return future::Either::Left(future::err(e)),
Err(future::Either::Right((e, _other))) =>
return future::Either::Left(future::err(e.into())),
Ok(((), _incoming)) => {},
Err((e, _other)) => return future::Either::A(future::err(e.into()))
}
// Ok we've stopped accepting new connections at this point, but we want
@@ -456,11 +451,10 @@ impl<S, B> Server<S, B>
// here have been destroyed.
let timeout = Delay::new(shutdown_timeout);
let wait = WaitUntilZero { info: info.clone() };
future::Either::Right(wait.select(timeout).then(|result| {
future::Either::B(wait.select(timeout).then(|result| {
match result {
Ok(_) => Ok(()),
Err(future::Either::Left((e, _))) => Err(e.into()),
Err(future::Either::Right((e, _))) => Err(e.into())
Err((e, _)) => Err(e.into())
}
}))
});
@@ -511,8 +505,8 @@ where
type Item = Connection<I::Item, S::Instance>;
type Error = ::Error;
fn poll_next(&mut self, cx: &mut task::Context) -> Poll<Option<Self::Item>, Self::Error> {
if let Some(io) = try_ready!(self.incoming.poll_next(cx)) {
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
if let Some(io) = try_ready!(self.incoming.poll()) {
let service = self.new_service.new_service()?;
Ok(Async::Ready(Some(self.protocol.serve_connection(io, service))))
} else {
@@ -592,17 +586,17 @@ impl Stream for AddrIncoming {
type Item = AddrStream;
type Error = ::std::io::Error;
fn poll_next(&mut self, cx: &mut task::Context) -> Poll<Option<Self::Item>, Self::Error> {
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
// Check if a previous timeout is active that was set by IO errors.
if let Some(ref mut to) = self.timeout {
match to.poll(cx).expect("timeout never fails") {
match to.poll().expect("timeout never fails") {
Async::Ready(_) => {}
Async::Pending => return Ok(Async::Pending),
Async::NotReady => return Ok(Async::NotReady),
}
}
self.timeout = None;
loop {
match self.listener.poll_accept2(cx) {
match self.listener.poll_accept() {
Ok(Async::Ready((socket, addr))) => {
if let Some(dur) = self.keep_alive_timeout {
if let Err(e) = socket.set_keepalive(Some(dur)) {
@@ -611,7 +605,7 @@ impl Stream for AddrIncoming {
}
return Ok(Async::Ready(Some(AddrStream::new(socket, addr))));
},
Ok(Async::Pending) => return Ok(Async::Pending),
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(ref e) if self.sleep_on_errors => {
// Connection errors can be ignored directly, continue by
// accepting the next request.
@@ -623,13 +617,13 @@ impl Stream for AddrIncoming {
debug!("accept error: {}; sleeping {:?}",
e, delay);
let mut timeout = Delay::new(delay);
let result = timeout.poll(cx)
let result = timeout.poll()
.expect("timeout never fails");
match result {
Async::Ready(()) => continue,
Async::Pending => {
Async::NotReady => {
self.timeout = Some(timeout);
return Ok(Async::Pending);
return Ok(Async::NotReady);
}
}
},
@@ -653,13 +647,12 @@ fn connection_error(e: &io::Error) -> bool {
}
mod addr_stream {
use std::io;
use std::io::{self, Read, Write};
use std::net::SocketAddr;
use bytes::{Buf, BufMut};
use futures::Poll;
use futures::task;
use futures::io::{AsyncRead, AsyncWrite, Initializer};
use iovec::IoVec;
use tokio::net::TcpStream;
use tokio_io::{AsyncRead, AsyncWrite};
#[derive(Debug)]
@@ -677,42 +670,46 @@ mod addr_stream {
}
}
impl Read for AddrStream {
#[inline]
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.inner.read(buf)
}
}
impl Write for AddrStream {
#[inline]
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.inner.write(buf)
}
#[inline]
fn flush(&mut self ) -> io::Result<()> {
self.inner.flush()
}
}
impl AsyncRead for AddrStream {
#[inline]
fn poll_read(&mut self, cx: &mut task::Context, buf: &mut [u8]) -> Poll<usize, io::Error> {
self.inner.poll_read(cx, buf)
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
self.inner.prepare_uninitialized_buffer(buf)
}
#[inline]
unsafe fn initializer(&self) -> Initializer {
AsyncRead::initializer(&self.inner)
}
#[inline]
fn poll_vectored_read(&mut self, cx: &mut task::Context, vec: &mut [&mut IoVec]) -> Poll<usize, io::Error> {
self.inner.poll_vectored_read(cx, vec)
fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
self.inner.read_buf(buf)
}
}
impl AsyncWrite for AddrStream {
#[inline]
fn poll_write(&mut self, cx: &mut task::Context, buf: &[u8]) -> Poll<usize, io::Error> {
self.inner.poll_write(cx, buf)
fn shutdown(&mut self) -> Poll<(), io::Error> {
AsyncWrite::shutdown(&mut self.inner)
}
#[inline]
fn poll_flush(&mut self, cx: &mut task::Context) -> Poll<(), io::Error> {
self.inner.poll_flush(cx)
}
#[inline]
fn poll_close(&mut self, cx: &mut task::Context) -> Poll<(), io::Error> {
self.inner.poll_close(cx)
}
#[inline]
fn poll_vectored_write(&mut self, cx: &mut task::Context, vec: &[&IoVec]) -> Poll<usize, io::Error> {
self.inner.poll_vectored_write(cx, vec)
fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
self.inner.write_buf(buf)
}
}
}
@@ -730,7 +727,7 @@ struct WaitUntilZero {
struct Info {
active: usize,
blocker: Option<Waker>,
blocker: Option<Task>,
}
impl<S: Service> Service for NotifyService<S> {
@@ -753,8 +750,8 @@ impl<S> Drop for NotifyService<S> {
let mut info = info.lock().unwrap();
info.active -= 1;
if info.active == 0 {
if let Some(waker) = info.blocker.take() {
waker.wake();
if let Some(task) = info.blocker.take() {
task.notify();
}
}
}
@@ -764,13 +761,13 @@ impl Future for WaitUntilZero {
type Item = ();
type Error = io::Error;
fn poll(&mut self, cx: &mut task::Context) -> Poll<(), io::Error> {
fn poll(&mut self) -> Poll<(), io::Error> {
let mut info = self.info.lock().unwrap();
if info.active == 0 {
Ok(().into())
} else {
info.blocker = Some(cx.waker().clone());
Ok(Async::Pending)
info.blocker = Some(task::current());
Ok(Async::NotReady)
}
}
}

64
src/server/service.rs Normal file
View File

@@ -0,0 +1,64 @@
use std::marker::PhantomData;
use std::sync::Arc;
use futures::IntoFuture;
use tokio_service::{NewService, Service};
/// Create a `Service` from a function.
pub fn service_fn<F, R, S>(f: F) -> ServiceFn<F, R>
where
F: Fn(R) -> S,
S: IntoFuture,
{
ServiceFn {
f: f,
_req: PhantomData,
}
}
/// Create a `NewService` by sharing references of `service.
pub fn const_service<S>(service: S) -> ConstService<S> {
ConstService {
svc: Arc::new(service),
}
}
#[derive(Debug)]
pub struct ServiceFn<F, R> {
f: F,
_req: PhantomData<fn() -> R>,
}
impl<F, R, S> Service for ServiceFn<F, R>
where
F: Fn(R) -> S,
S: IntoFuture,
{
type Request = R;
type Response = S::Item;
type Error = S::Error;
type Future = S::Future;
fn call(&self, req: Self::Request) -> Self::Future {
(self.f)(req).into_future()
}
}
#[derive(Debug)]
pub struct ConstService<S> {
svc: Arc<S>,
}
impl<S> NewService for ConstService<S>
where
S: Service,
{
type Request = S::Request;
type Response = S::Response;
type Error = S::Error;
type Instance = Arc<S>;
fn new_service(&self) -> ::std::io::Result<Self::Instance> {
Ok(self.svc.clone())
}
}

View File

@@ -1,116 +0,0 @@
use std::marker::PhantomData;
use std::sync::Arc;
use futures::{Future, IntoFuture};
/// An asynchronous function from `Request` to a `Response`.
pub trait Service {
/// Requests handled by the service.
type Request;
/// Responses given by the service.
type Response;
/// Errors produced by the service.
type Error;
/// The future response value.
type Future: Future<Item = Self::Response, Error = Self::Error>;
/// Process the request and return the response asynchronously.
fn call(&self, req: Self::Request) -> Self::Future;
}
impl<S: Service + ?Sized> Service for Arc<S> {
type Request = S::Request;
type Response = S::Response;
type Error = S::Error;
type Future = S::Future;
fn call(&self, request: S::Request) -> S::Future {
(**self).call(request)
}
}
/// Creates new `Service` values.
pub trait NewService {
/// Requests handled by the service.
type Request;
/// Responses given by the service.
type Response;
/// Errors produced by the service.
type Error;
/// The `Service` value created by this factory.
type Instance: Service<Request = Self::Request, Response = Self::Response, Error = Self::Error>;
/// Create and return a new service value.
fn new_service(&self) -> ::std::io::Result<Self::Instance>;
}
impl<F, R> NewService for F
where F: Fn() -> ::std::io::Result<R>,
R: Service,
{
type Request = R::Request;
type Response = R::Response;
type Error = R::Error;
type Instance = R;
fn new_service(&self) -> ::std::io::Result<R> {
(*self)()
}
}
/// Create a `Service` from a function.
pub fn service_fn<F, R, S>(f: F) -> ServiceFn<F, R>
where
F: Fn(R) -> S,
S: IntoFuture,
{
ServiceFn {
f: f,
_req: PhantomData,
}
}
/// Create a `NewService` by sharing references of `service.
pub fn const_service<S>(service: S) -> ConstService<S> {
ConstService {
svc: Arc::new(service),
}
}
#[derive(Debug)]
pub struct ServiceFn<F, R> {
f: F,
_req: PhantomData<fn() -> R>,
}
impl<F, R, S> Service for ServiceFn<F, R>
where
F: Fn(R) -> S,
S: IntoFuture,
{
type Request = R;
type Response = S::Item;
type Error = S::Error;
type Future = S::Future;
fn call(&self, req: Self::Request) -> Self::Future {
(self.f)(req).into_future()
}
}
#[derive(Debug)]
pub struct ConstService<S> {
svc: Arc<S>,
}
impl<S> NewService for ConstService<S>
where
S: Service,
{
type Request = S::Request;
type Response = S::Response;
type Error = S::Error;
type Instance = Arc<S>;
fn new_service(&self) -> ::std::io::Result<Self::Instance> {
Ok(self.svc.clone())
}
}