feat(lib): update to std::future::Future

BREAKING CHANGE: All usage of async traits (`Future`, `Stream`,
`AsyncRead`, `AsyncWrite`, etc) are updated to newer versions.
This commit is contained in:
Sean McArthur
2019-07-09 15:37:43 -07:00
parent da9b0319ef
commit 8f4b05ae78
37 changed files with 1526 additions and 1548 deletions

View File

@@ -1,13 +1,12 @@
use std::error::Error as StdError;
use bytes::{Buf, Bytes};
use futures::{Async, Future, Poll, Stream};
use http::{Request, Response, StatusCode};
use tokio_io::{AsyncRead, AsyncWrite};
use crate::body::{Body, Payload};
use crate::body::internal::FullDataArg;
use crate::common::{Never, YieldNow};
use crate::common::{Future, Never, Poll, Pin, Unpin, task};
use crate::proto::{BodyLength, DecodedLength, Conn, Dispatched, MessageHead, RequestHead, RequestLine, ResponseHead};
use super::Http1Transaction;
use crate::service::Service;
@@ -16,12 +15,8 @@ pub(crate) struct Dispatcher<D, Bs: Payload, I, T> {
conn: Conn<I, Bs::Data, T>,
dispatch: D,
body_tx: Option<crate::body::Sender>,
body_rx: Option<Bs>,
body_rx: Pin<Box<Option<Bs>>>,
is_closing: bool,
/// If the poll loop reaches its max spin count, it will yield by notifying
/// the task immediately. This will cache that `Task`, since it usually is
/// the same one.
yield_now: YieldNow,
}
pub(crate) trait Dispatch {
@@ -29,14 +24,14 @@ pub(crate) trait Dispatch {
type PollBody;
type PollError;
type RecvItem;
fn poll_msg(&mut self) -> Poll<Option<(Self::PollItem, Self::PollBody)>, Self::PollError>;
fn poll_msg(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<Result<(Self::PollItem, Self::PollBody), Self::PollError>>>;
fn recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, Body)>) -> crate::Result<()>;
fn poll_ready(&mut self) -> Poll<(), ()>;
fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), ()>>;
fn should_poll(&self) -> bool;
}
pub struct Server<S: Service> {
in_flight: Option<S::Future>,
in_flight: Pin<Box<Option<S::Future>>>,
pub(crate) service: S,
}
@@ -49,10 +44,10 @@ type ClientRx<B> = crate::client::dispatch::Receiver<Request<B>, Response<Body>>
impl<D, Bs, I, T> Dispatcher<D, Bs, I, T>
where
D: Dispatch<PollItem=MessageHead<T::Outgoing>, PollBody=Bs, RecvItem=MessageHead<T::Incoming>>,
D: Dispatch<PollItem=MessageHead<T::Outgoing>, PollBody=Bs, RecvItem=MessageHead<T::Incoming>> + Unpin,
D::PollError: Into<Box<dyn StdError + Send + Sync>>,
I: AsyncRead + AsyncWrite,
T: Http1Transaction,
I: AsyncRead + AsyncWrite + Unpin,
T: Http1Transaction + Unpin,
Bs: Payload,
{
pub fn new(dispatch: D, conn: Conn<I, Bs::Data, T>) -> Self {
@@ -60,9 +55,8 @@ where
conn: conn,
dispatch: dispatch,
body_tx: None,
body_rx: None,
body_rx: Box::pin(None),
is_closing: false,
yield_now: YieldNow::new(),
}
}
@@ -80,55 +74,74 @@ where
///
/// This is useful for old-style HTTP upgrades, but ignores
/// newer-style upgrade API.
pub fn poll_without_shutdown(&mut self) -> Poll<(), crate::Error> {
self.poll_catch(false)
.map(|x| {
x.map(|ds| if let Dispatched::Upgrade(pending) = ds {
pending.manual();
})
})
pub(crate) fn poll_without_shutdown(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>>
where
Self: Unpin,
{
Pin::new(self).poll_catch(cx, false).map_ok(|ds| {
if let Dispatched::Upgrade(pending) = ds {
pending.manual();
}
})
}
fn poll_catch(&mut self, should_shutdown: bool) -> Poll<Dispatched, crate::Error> {
self.poll_inner(should_shutdown).or_else(|e| {
fn poll_catch(&mut self, cx: &mut task::Context<'_>, should_shutdown: bool) -> Poll<crate::Result<Dispatched>> {
Poll::Ready(ready!(self.poll_inner(cx, should_shutdown)).or_else(|e| {
// An error means we're shutting down either way.
// We just try to give the error to the user,
// and close the connection with an Ok. If we
// cannot give it to the user, then return the Err.
self.dispatch.recv_msg(Err(e))?;
Ok(Async::Ready(Dispatched::Shutdown))
})
Ok(Dispatched::Shutdown)
}))
}
fn poll_inner(&mut self, should_shutdown: bool) -> Poll<Dispatched, crate::Error> {
fn poll_inner(&mut self, cx: &mut task::Context<'_>, should_shutdown: bool) -> Poll<crate::Result<Dispatched>> {
T::update_date();
try_ready!(self.poll_loop());
ready!(self.poll_loop(cx))?;
loop {
self.poll_read(cx)?;
self.poll_write(cx)?;
self.poll_flush(cx)?;
// This could happen if reading paused before blocking on IO,
// such as getting to the end of a framed message, but then
// writing/flushing set the state back to Init. In that case,
// if the read buffer still had bytes, we'd want to try poll_read
// again, or else we wouldn't ever be woken up again.
//
// Using this instead of task::current() and notify() inside
// the Conn is noticeably faster in pipelined benchmarks.
if !self.conn.wants_read_again() {
break;
}
}
if self.is_done() {
if let Some(pending) = self.conn.pending_upgrade() {
self.conn.take_error()?;
return Ok(Async::Ready(Dispatched::Upgrade(pending)));
return Poll::Ready(Ok(Dispatched::Upgrade(pending)));
} else if should_shutdown {
try_ready!(self.conn.shutdown().map_err(crate::Error::new_shutdown));
ready!(self.conn.poll_shutdown(cx)).map_err(crate::Error::new_shutdown)?;
}
self.conn.take_error()?;
Ok(Async::Ready(Dispatched::Shutdown))
Poll::Ready(Ok(Dispatched::Shutdown))
} else {
Ok(Async::NotReady)
Poll::Pending
}
}
fn poll_loop(&mut self) -> Poll<(), crate::Error> {
fn poll_loop(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
// Limit the looping on this connection, in case it is ready far too
// often, so that other futures don't starve.
//
// 16 was chosen arbitrarily, as that is number of pipelined requests
// benchmarks often use. Perhaps it should be a config option instead.
for _ in 0..16 {
self.poll_read()?;
self.poll_write()?;
self.poll_flush()?;
self.poll_read(cx)?;
self.poll_write(cx)?;
self.poll_flush(cx)?;
// This could happen if reading paused before blocking on IO,
// such as getting to the end of a framed message, but then
@@ -140,45 +153,39 @@ where
// the Conn is noticeably faster in pipelined benchmarks.
if !self.conn.wants_read_again() {
//break;
return Ok(Async::Ready(()));
return Poll::Ready(Ok(()));
}
}
trace!("poll_loop yielding (self = {:p})", self);
match self.yield_now.poll_yield() {
Ok(Async::NotReady) => Ok(Async::NotReady),
// maybe with `!` this can be cleaner...
// but for now, just doing this to eliminate branches
Ok(Async::Ready(never)) |
Err(never) => match never {}
}
task::yield_now(cx).map(|never| match never {})
}
fn poll_read(&mut self) -> Poll<(), crate::Error> {
fn poll_read(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
loop {
if self.is_closing {
return Ok(Async::Ready(()));
return Poll::Ready(Ok(()));
} else if self.conn.can_read_head() {
try_ready!(self.poll_read_head());
ready!(self.poll_read_head(cx))?;
} else if let Some(mut body) = self.body_tx.take() {
if self.conn.can_read_body() {
match body.poll_ready() {
Ok(Async::Ready(())) => (),
Ok(Async::NotReady) => {
match body.poll_ready(cx) {
Poll::Ready(Ok(())) => (),
Poll::Pending => {
self.body_tx = Some(body);
return Ok(Async::NotReady);
return Poll::Pending;
},
Err(_canceled) => {
Poll::Ready(Err(_canceled)) => {
// user doesn't care about the body
// so we should stop reading
trace!("body receiver dropped before eof, closing");
self.conn.close_read();
return Ok(Async::Ready(()));
return Poll::Ready(Ok(()));
}
}
match self.conn.read_body() {
Ok(Async::Ready(Some(chunk))) => {
match self.conn.poll_read_body(cx) {
Poll::Ready(Some(Ok(chunk))) => {
match body.send_data(chunk) {
Ok(()) => {
self.body_tx = Some(body);
@@ -191,14 +198,14 @@ where
}
}
},
Ok(Async::Ready(None)) => {
Poll::Ready(None) => {
// just drop, the body will close automatically
},
Ok(Async::NotReady) => {
Poll::Pending => {
self.body_tx = Some(body);
return Ok(Async::NotReady);
return Poll::Pending;
}
Err(e) => {
Poll::Ready(Some(Err(e))) => {
body.send_error(crate::Error::new_body(e));
}
}
@@ -206,25 +213,24 @@ where
// just drop, the body will close automatically
}
} else {
return self.conn.read_keep_alive();
return self.conn.poll_read_keep_alive(cx);
}
}
}
fn poll_read_head(&mut self) -> Poll<(), crate::Error> {
fn poll_read_head(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
// can dispatch receive, or does it still care about, an incoming message?
match self.dispatch.poll_ready() {
Ok(Async::Ready(())) => (),
Ok(Async::NotReady) => return Ok(Async::NotReady), // service might not be ready
match ready!(self.dispatch.poll_ready(cx)) {
Ok(()) => (),
Err(()) => {
trace!("dispatch no longer receiving messages");
self.close();
return Ok(Async::Ready(()));
return Poll::Ready(Ok(()));
}
}
// dispatch is ready for a message, try to read one
match self.conn.read_head() {
Ok(Async::Ready(Some((head, body_len, wants_upgrade)))) => {
match ready!(self.conn.poll_read_head(cx)) {
Some(Ok((head, body_len, wants_upgrade))) => {
let mut body = match body_len {
DecodedLength::ZERO => Body::empty(),
other => {
@@ -237,67 +243,78 @@ where
body.set_on_upgrade(self.conn.on_upgrade());
}
self.dispatch.recv_msg(Ok((head, body)))?;
Ok(Async::Ready(()))
}
Ok(Async::Ready(None)) => {
// read eof, conn will start to shutdown automatically
Ok(Async::Ready(()))
}
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(err) => {
Poll::Ready(Ok(()))
},
Some(Err(err)) => {
debug!("read_head error: {}", err);
self.dispatch.recv_msg(Err(err))?;
// if here, the dispatcher gave the user the error
// somewhere else. we still need to shutdown, but
// not as a second error.
Ok(Async::Ready(()))
Poll::Ready(Ok(()))
},
None => {
// read eof, conn will start to shutdown automatically
Poll::Ready(Ok(()))
}
}
}
fn poll_write(&mut self) -> Poll<(), crate::Error> {
fn poll_write(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
loop {
if self.is_closing {
return Ok(Async::Ready(()));
return Poll::Ready(Ok(()));
} else if self.body_rx.is_none() && self.conn.can_write_head() && self.dispatch.should_poll() {
if let Some((head, mut body)) = try_ready!(self.dispatch.poll_msg().map_err(crate::Error::new_user_service)) {
if let Some(msg) = ready!(self.dispatch.poll_msg(cx)) {
let (head, mut body) = msg.map_err(crate::Error::new_user_service)?;
// Check if the body knows its full data immediately.
//
// If so, we can skip a bit of bookkeeping that streaming
// bodies need to do.
if let Some(full) = body.__hyper_full_data(FullDataArg(())).0 {
self.conn.write_full_msg(head, full);
return Ok(Async::Ready(()));
return Poll::Ready(Ok(()));
}
let body_type = if body.is_end_stream() {
self.body_rx = None;
self.body_rx.set(None);
None
} else {
let btype = body.content_length()
.map(BodyLength::Known)
.or_else(|| Some(BodyLength::Unknown));
self.body_rx = Some(body);
self.body_rx.set(Some(body));
btype
};
self.conn.write_head(head, body_type);
} else {
self.close();
return Ok(Async::Ready(()));
return Poll::Ready(Ok(()));
}
} else if !self.conn.can_buffer_body() {
try_ready!(self.poll_flush());
} else if let Some(mut body) = self.body_rx.take() {
if !self.conn.can_write_body() {
trace!(
"no more write body allowed, user body is_end_stream = {}",
body.is_end_stream(),
);
continue;
}
match body.poll_data().map_err(crate::Error::new_user_body)? {
Async::Ready(Some(chunk)) => {
ready!(self.poll_flush(cx))?;
} else {
// A new scope is needed :(
if let (Some(mut body), clear_body) = OptGuard::new(self.body_rx.as_mut()).guard_mut() {
debug_assert!(!*clear_body, "opt guard defaults to keeping body");
if !self.conn.can_write_body() {
trace!(
"no more write body allowed, user body is_end_stream = {}",
body.is_end_stream(),
);
*clear_body = true;
continue;
}
let item = ready!(body.as_mut().poll_data(cx));
if let Some(item) = item {
let chunk = item.map_err(|e| {
*clear_body = true;
crate::Error::new_user_body(e)
})?;
let eos = body.is_end_stream();
if eos {
*clear_body = true;
if chunk.remaining() == 0 {
trace!("discarding empty chunk");
self.conn.end_body();
@@ -305,30 +322,25 @@ where
self.conn.write_body_and_end(chunk);
}
} else {
self.body_rx = Some(body);
if chunk.remaining() == 0 {
trace!("discarding empty chunk");
continue;
}
self.conn.write_body(chunk);
}
},
Async::Ready(None) => {
} else {
*clear_body = true;
self.conn.end_body();
},
Async::NotReady => {
self.body_rx = Some(body);
return Ok(Async::NotReady);
}
} else {
return Poll::Pending;
}
} else {
return Ok(Async::NotReady);
}
}
}
fn poll_flush(&mut self) -> Poll<(), crate::Error> {
self.conn.flush().map_err(|err| {
fn poll_flush(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
self.conn.poll_flush(cx).map_err(|err| {
debug!("error writing: {}", err);
crate::Error::new_body_write(err)
})
@@ -360,35 +372,65 @@ where
impl<D, Bs, I, T> Future for Dispatcher<D, Bs, I, T>
where
D: Dispatch<PollItem=MessageHead<T::Outgoing>, PollBody=Bs, RecvItem=MessageHead<T::Incoming>>,
D: Dispatch<PollItem=MessageHead<T::Outgoing>, PollBody=Bs, RecvItem=MessageHead<T::Incoming>> + Unpin,
D::PollError: Into<Box<dyn StdError + Send + Sync>>,
I: AsyncRead + AsyncWrite,
T: Http1Transaction,
I: AsyncRead + AsyncWrite + Unpin,
T: Http1Transaction + Unpin,
Bs: Payload,
{
type Item = Dispatched;
type Error = crate::Error;
type Output = crate::Result<Dispatched>;
#[inline]
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.poll_catch(true)
fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
self.poll_catch(cx, true)
}
}
// ===== impl OptGuard =====
/// A drop guard to allow a mutable borrow of an Option while being able to
/// set whether the `Option` should be cleared on drop.
struct OptGuard<'a, T>(Pin<&'a mut Option<T>>, bool);
impl<'a, T> OptGuard<'a, T> {
fn new(pin: Pin<&'a mut Option<T>>) -> Self {
OptGuard(pin, false)
}
fn guard_mut(&mut self) -> (Option<Pin<&mut T>>, &mut bool) {
(self.0.as_mut().as_pin_mut(), &mut self.1)
}
}
impl<'a, T> Drop for OptGuard<'a, T> {
fn drop(&mut self) {
if self.1 {
self.0.set(None);
}
}
}
// ===== impl Server =====
impl<S> Server<S> where S: Service {
impl<S> Server<S>
where
S: Service,
{
pub fn new(service: S) -> Server<S> {
Server {
in_flight: None,
in_flight: Box::pin(None),
service: service,
}
}
pub fn into_service(self) -> S {
self.service
}
}
// Service is never pinned
impl<S: Service> Unpin for Server<S> {}
impl<S, Bs> Dispatch for Server<S>
where
S: Service<ReqBody=Body, ResBody=Bs>,
@@ -400,25 +442,23 @@ where
type PollError = S::Error;
type RecvItem = RequestHead;
fn poll_msg(&mut self) -> Poll<Option<(Self::PollItem, Self::PollBody)>, Self::PollError> {
if let Some(mut fut) = self.in_flight.take() {
let resp = match fut.poll()? {
Async::Ready(res) => res,
Async::NotReady => {
self.in_flight = Some(fut);
return Ok(Async::NotReady);
}
};
fn poll_msg(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<Result<(Self::PollItem, Self::PollBody), Self::PollError>>> {
let ret = if let Some(ref mut fut) = self.in_flight.as_mut().as_pin_mut() {
let resp = ready!(fut.as_mut().poll(cx)?);
let (parts, body) = resp.into_parts();
let head = MessageHead {
version: parts.version,
subject: parts.status,
headers: parts.headers,
};
Ok(Async::Ready(Some((head, body))))
Poll::Ready(Some(Ok((head, body))))
} else {
unreachable!("poll_msg shouldn't be called if no inflight");
}
};
// Since in_flight finished, remove it
self.in_flight.set(None);
ret
}
fn recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, Body)>) -> crate::Result<()> {
@@ -428,15 +468,16 @@ where
*req.uri_mut() = msg.subject.1;
*req.headers_mut() = msg.headers;
*req.version_mut() = msg.version;
self.in_flight = Some(self.service.call(req));
let fut = self.service.call(req);
self.in_flight.set(Some(fut));
Ok(())
}
fn poll_ready(&mut self) -> Poll<(), ()> {
fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), ()>> {
if self.in_flight.is_some() {
Ok(Async::NotReady)
Poll::Pending
} else {
self.service.poll_ready()
self.service.poll_ready(cx)
.map_err(|_e| {
// FIXME: return error value.
trace!("service closed");
@@ -470,16 +511,16 @@ where
type PollError = Never;
type RecvItem = ResponseHead;
fn poll_msg(&mut self) -> Poll<Option<(Self::PollItem, Self::PollBody)>, Never> {
match self.rx.poll() {
Ok(Async::Ready(Some((req, mut cb)))) => {
fn poll_msg(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<Result<(Self::PollItem, Self::PollBody), Never>>> {
match self.rx.poll_next(cx) {
Poll::Ready(Some((req, mut cb))) => {
// check that future hasn't been canceled already
match cb.poll_cancel().expect("poll_cancel cannot error") {
Async::Ready(()) => {
match cb.poll_cancel(cx) {
Poll::Ready(()) => {
trace!("request canceled");
Ok(Async::Ready(None))
Poll::Ready(None)
},
Async::NotReady => {
Poll::Pending => {
let (parts, body) = req.into_parts();
let head = RequestHead {
version: parts.version,
@@ -487,17 +528,16 @@ where
headers: parts.headers,
};
self.callback = Some(cb);
Ok(Async::Ready(Some((head, body))))
Poll::Ready(Some(Ok((head, body))))
}
}
},
Ok(Async::Ready(None)) => {
Poll::Ready(None) => {
trace!("client tx closed");
// user has dropped sender handle
Ok(Async::Ready(None))
Poll::Ready(None)
},
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(never) => match never {},
Poll::Pending => Poll::Pending,
}
}
@@ -522,30 +562,32 @@ where
if let Some(cb) = self.callback.take() {
let _ = cb.send(Err((err, None)));
Ok(())
} else if let Ok(Async::Ready(Some((req, cb)))) = self.rx.poll() {
trace!("canceling queued request with connection error: {}", err);
// in this case, the message was never even started, so it's safe to tell
// the user that the request was completely canceled
let _ = cb.send(Err((crate::Error::new_canceled().with(err), Some(req))));
Ok(())
} else {
Err(err)
self.rx.close();
if let Some((req, cb)) = self.rx.try_recv() {
trace!("canceling queued request with connection error: {}", err);
// in this case, the message was never even started, so it's safe to tell
// the user that the request was completely canceled
let _ = cb.send(Err((crate::Error::new_canceled().with(err), Some(req))));
Ok(())
} else {
Err(err)
}
}
}
}
}
fn poll_ready(&mut self) -> Poll<(), ()> {
fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), ()>> {
match self.callback {
Some(ref mut cb) => match cb.poll_cancel() {
Ok(Async::Ready(())) => {
Some(ref mut cb) => match cb.poll_cancel(cx) {
Poll::Ready(()) => {
trace!("callback receiver has dropped");
Err(())
Poll::Ready(Err(()))
},
Ok(Async::NotReady) => Ok(Async::Ready(())),
Err(_) => unreachable!("oneshot poll_cancel cannot error"),
Poll::Pending => Poll::Ready(Ok(())),
},
None => Err(()),
None => Poll::Ready(Err(())),
}
}