Update tokio and hyper alphas

This commit is contained in:
Sean McArthur
2019-09-25 11:11:03 -07:00
parent f71227d968
commit 6413a4349e
7 changed files with 240 additions and 80 deletions

View File

@@ -1,10 +1,12 @@
use bytes::Bytes;
use futures_core::Stream;
use hyper::body::Payload;
use std::fmt;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use bytes::Bytes;
use futures_core::Stream;
use futures_util::TryStreamExt;
use http_body::Body as HttpBody;
use tokio::timer::Delay;
/// An asynchronous request body.
@@ -17,12 +19,22 @@ pub(crate) struct ImplStream(Body);
enum Inner {
Reusable(Bytes),
Hyper {
body: hyper::Body,
Streaming {
body: Pin<
Box<
dyn HttpBody<Data = hyper::Chunk, Error = Box<dyn std::error::Error + Send + Sync>>
+ Send
+ Sync,
>,
>,
timeout: Option<Delay>,
},
}
struct WrapStream<S>(S);
struct WrapHyper(hyper::Body);
impl Body {
/// Wrap a futures `Stream` in a box inside `Body`.
///
@@ -49,27 +61,38 @@ impl Body {
S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
hyper::Chunk: From<S::Ok>,
{
Body::wrap(hyper::body::Body::wrap_stream(stream))
}
pub(crate) fn response(body: hyper::Body, timeout: Option<Delay>) -> Body {
let body = Box::pin(WrapStream(
stream.map_ok(hyper::Chunk::from).map_err(Into::into),
));
Body {
inner: Inner::Hyper { body, timeout },
}
}
pub(crate) fn wrap(body: hyper::Body) -> Body {
Body {
inner: Inner::Hyper {
inner: Inner::Streaming {
body,
timeout: None,
},
}
}
#[cfg(any(feature = "blocking", feature = "gzip",))]
pub(crate) fn response(body: hyper::Body, timeout: Option<Delay>) -> Body {
Body {
inner: Inner::Streaming {
body: Box::pin(WrapHyper(body)),
timeout,
},
}
}
#[cfg(feature = "blocking")]
pub(crate) fn wrap(body: hyper::Body) -> Body {
Body {
inner: Inner::Streaming {
body: Box::pin(WrapHyper(body)),
timeout: None,
},
}
}
pub(crate) fn empty() -> Body {
Body::wrap(hyper::Body::empty())
Body::reusable(Bytes::new())
}
pub(crate) fn reusable(chunk: Bytes) -> Body {
@@ -78,14 +101,13 @@ impl Body {
}
}
pub(crate) fn into_hyper(self) -> (Option<Bytes>, hyper::Body) {
match self.inner {
Inner::Reusable(chunk) => (Some(chunk.clone()), chunk.into()),
Inner::Hyper { body, timeout } => {
debug_assert!(timeout.is_none());
(None, body)
}
}
pub(crate) fn try_reuse(self) -> (Option<Bytes>, Self) {
let reuse = match self.inner {
Inner::Reusable(ref chunk) => Some(chunk.clone()),
Inner::Streaming { .. } => None,
};
(reuse, self)
}
pub(crate) fn into_stream(self) -> ImplStream {
@@ -95,7 +117,7 @@ impl Body {
pub(crate) fn content_length(&self) -> Option<u64> {
match self.inner {
Inner::Reusable(ref bytes) => Some(bytes.len() as u64),
Inner::Hyper { ref body, .. } => body.size_hint().exact(),
Inner::Streaming { ref body, .. } => body.size_hint().exact(),
}
}
}
@@ -143,13 +165,71 @@ impl fmt::Debug for Body {
// ===== impl ImplStream =====
impl HttpBody for ImplStream {
type Data = hyper::Chunk;
type Error = crate::Error;
fn poll_data(
mut self: Pin<&mut Self>,
cx: &mut Context,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
let opt_try_chunk = match self.0.inner {
Inner::Streaming {
ref mut body,
ref mut timeout,
} => {
if let Some(ref mut timeout) = timeout {
if let Poll::Ready(()) = Pin::new(timeout).poll(cx) {
return Poll::Ready(Some(Err(crate::error::body(crate::error::TimedOut))));
}
}
futures_core::ready!(Pin::new(body).poll_data(cx))
.map(|opt_chunk| opt_chunk.map(Into::into).map_err(crate::error::body))
}
Inner::Reusable(ref mut bytes) => {
if bytes.is_empty() {
None
} else {
Some(Ok(std::mem::replace(bytes, Bytes::new()).into()))
}
}
};
Poll::Ready(opt_try_chunk)
}
fn poll_trailers(
self: Pin<&mut Self>,
_cx: &mut Context,
) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
Poll::Ready(Ok(None))
}
fn is_end_stream(&self) -> bool {
match self.0.inner {
Inner::Streaming { ref body, .. } => body.is_end_stream(),
Inner::Reusable(ref bytes) => bytes.is_empty(),
}
}
fn size_hint(&self) -> http_body::SizeHint {
match self.0.inner {
Inner::Streaming { ref body, .. } => body.size_hint(),
Inner::Reusable(ref bytes) => {
let mut hint = http_body::SizeHint::default();
hint.set_exact(bytes.len() as u64);
hint
}
}
}
}
impl Stream for ImplStream {
type Item = Result<Bytes, crate::Error>;
#[inline]
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let opt_try_chunk = match self.0.inner {
Inner::Hyper {
Inner::Streaming {
ref mut body,
ref mut timeout,
} => {
@@ -173,3 +253,67 @@ impl Stream for ImplStream {
Poll::Ready(opt_try_chunk)
}
}
// ===== impl WrapStream =====
impl<S, D, E> HttpBody for WrapStream<S>
where
S: Stream<Item = Result<D, E>>,
D: Into<hyper::Chunk>,
E: Into<Box<dyn std::error::Error + Send + Sync>>,
{
type Data = hyper::Chunk;
type Error = E;
fn poll_data(
self: Pin<&mut Self>,
cx: &mut Context,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
// safe pin projection
let item =
futures_core::ready!(
unsafe { Pin::new_unchecked(&mut self.get_unchecked_mut().0) }.poll_next(cx)?
);
Poll::Ready(item.map(|val| Ok(val.into())))
}
fn poll_trailers(
self: Pin<&mut Self>,
_cx: &mut Context,
) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
Poll::Ready(Ok(None))
}
}
// ===== impl WrapHyper =====
impl HttpBody for WrapHyper {
type Data = hyper::Chunk;
type Error = Box<dyn std::error::Error + Send + Sync>;
fn poll_data(
mut self: Pin<&mut Self>,
cx: &mut Context,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
// safe pin projection
Pin::new(&mut self.0)
.poll_data(cx)
.map(|opt| opt.map(|res| res.map_err(Into::into)))
}
fn poll_trailers(
self: Pin<&mut Self>,
_cx: &mut Context,
) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
Poll::Ready(Ok(None))
}
fn is_end_stream(&self) -> bool {
self.0.is_end_stream()
}
fn size_hint(&self) -> http_body::SizeHint {
self.0.size_hint()
}
}