feat(body): Update Payload to be a trait alias of http_body::Body (#1908)

This commit is contained in:
Lucio Franco
2019-08-22 14:13:27 -07:00
committed by Sean McArthur
parent 49b12c415d
commit 79c32f8953
9 changed files with 84 additions and 100 deletions

View File

@@ -27,7 +27,7 @@ futures-core-preview = { version = "0.3.0-alpha.18" }
futures-channel-preview = { version = "0.3.0-alpha.18" } futures-channel-preview = { version = "0.3.0-alpha.18" }
futures-util-preview = { version = "0.3.0-alpha.18" } futures-util-preview = { version = "0.3.0-alpha.18" }
http = "0.1.15" http = "0.1.15"
http-body = "0.1" http-body = { git = "https://github.com/hyperium/http-body" }
httparse = "1.0" httparse = "1.0"
h2 = { git = "https://github.com/hyperium/h2" } h2 = { git = "https://github.com/hyperium/h2" }
iovec = "0.1" iovec = "0.1"
@@ -37,7 +37,6 @@ net2 = { version = "0.2.32", optional = true }
pin-utils = "0.1.0-alpha.4" pin-utils = "0.1.0-alpha.4"
time = "0.1" time = "0.1"
tokio = { version = "0.2.0-alpha.2", optional = true, default-features = false, features = ["rt-full"] } tokio = { version = "0.2.0-alpha.2", optional = true, default-features = false, features = ["rt-full"] }
#tokio-buf = "0.2.0-alpha.1"
tower-service = "=0.3.0-alpha.1" tower-service = "=0.3.0-alpha.1"
tokio-executor = "0.2.0-alpha.2" tokio-executor = "0.2.0-alpha.2"
tokio-io = "0.2.0-alpha.2" tokio-io = "0.2.0-alpha.2"

View File

@@ -6,7 +6,7 @@ use bytes::Bytes;
use futures_core::{Stream, TryStream}; use futures_core::{Stream, TryStream};
use futures_channel::{mpsc, oneshot}; use futures_channel::{mpsc, oneshot};
use futures_util::TryStreamExt; use futures_util::TryStreamExt;
//use tokio_buf::SizeHint; use http_body::{SizeHint, Body as HttpBody};
use http::HeaderMap; use http::HeaderMap;
use crate::common::{Future, Never, Pin, Poll, task}; use crate::common::{Future, Never, Pin, Poll, task};
@@ -296,7 +296,7 @@ impl Default for Body {
} }
} }
impl Payload for Body { impl HttpBody for Body {
type Data = Chunk; type Data = Chunk;
type Error = crate::Error; type Error = crate::Error;
@@ -304,14 +304,13 @@ impl Payload for Body {
self.poll_eof(cx) self.poll_eof(cx)
} }
fn poll_trailers(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Result<HeaderMap, Self::Error>>> { fn poll_trailers(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Result<Option<HeaderMap>, Self::Error>> {
match self.kind { match self.kind {
Kind::H2 { recv: ref mut h2, .. } => match ready!(h2.poll_trailers(cx)) { Kind::H2 { recv: ref mut h2, .. } => match ready!(h2.poll_trailers(cx)) {
Ok(Some(t)) => Poll::Ready(Some(Ok(t))), Ok(t) => Poll::Ready(Ok(t)),
Err(e) => Poll::Ready(Some(Err(crate::Error::new_h2(e)))), Err(e) => Poll::Ready(Err(crate::Error::new_h2(e))),
Ok(None) => Poll::Ready(None),
}, },
_ => Poll::Ready(None), _ => Poll::Ready(Ok(None)),
} }
} }
@@ -324,21 +323,26 @@ impl Payload for Body {
} }
} }
fn content_length(&self) -> Option<u64> { fn size_hint(&self) -> SizeHint {
match self.kind { match self.kind {
Kind::Once(Some(ref val)) => Some(val.len() as u64), Kind::Once(Some(ref val)) => {
Kind::Once(None) => Some(0), let mut hint = SizeHint::default();
Kind::Wrapped(..) => None, hint.set_exact(val.len() as u64);
Kind::Chan { content_length, .. } | Kind::H2 { content_length, .. } => content_length, hint
} },
} Kind::Once(None) => {
SizeHint::default()
},
Kind::Wrapped(..) => SizeHint::default(),
Kind::Chan { content_length, .. } | Kind::H2 { content_length, .. } => {
let mut hint = SizeHint::default();
// We can improve the performance of `Body` when we know it is a Once kind. if let Some(content_length) = content_length {
#[doc(hidden)] hint.set_exact(content_length as u64);
fn __hyper_full_data(&mut self, _: FullDataArg) -> FullDataRet<Self::Data> { }
match self.kind {
Kind::Once(ref mut val) => FullDataRet(val.take()), hint
_ => FullDataRet(None), },
} }
} }
} }
@@ -363,43 +367,11 @@ impl fmt::Debug for Body {
} }
} }
/*
impl ::http_body::Body for Body {
type Data = Chunk;
type Error = crate::Error;
fn poll_data(&mut self) -> Poll<Option<Self::Data>, Self::Error> {
<Self as Payload>::poll_data(self)
}
fn poll_trailers(&mut self) -> Poll<Option<HeaderMap>, Self::Error> {
<Self as Payload>::poll_trailers(self)
}
fn is_end_stream(&self) -> bool {
<Self as Payload>::is_end_stream(self)
}
fn size_hint(&self) -> SizeHint {
let mut hint = SizeHint::default();
let content_length = <Self as Payload>::content_length(self);
if let Some(size) = content_length {
hint.set_upper(size);
hint.set_lower(size)
}
hint
}
}
*/
impl Stream for Body { impl Stream for Body {
type Item = crate::Result<Chunk>; type Item = crate::Result<Chunk>;
fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
self.poll_data(cx) HttpBody::poll_data(self, cx)
} }
} }

View File

@@ -5,12 +5,13 @@ use http::HeaderMap;
use crate::common::{Pin, Poll, task}; use crate::common::{Pin, Poll, task};
use super::internal::{FullDataArg, FullDataRet}; use super::internal::{FullDataArg, FullDataRet};
use http_body::{Body as HttpBody, SizeHint};
/// This trait represents a streaming body of a `Request` or `Response`. /// This trait represents a streaming body of a `Request` or `Response`.
/// ///
/// The built-in implementation of this trait is [`Body`](::Body), in case you /// The built-in implementation of this trait is [`Body`](::Body), in case you
/// don't need to customize a send stream for your own application. /// don't need to customize a send stream for your own application.
pub trait Payload: Send + 'static { pub trait Payload: sealed::Sealed + Send + 'static {
/// A buffer of bytes representing a single chunk of a body. /// A buffer of bytes representing a single chunk of a body.
type Data: Buf + Send; type Data: Buf + Send;
@@ -28,9 +29,9 @@ pub trait Payload: Send + 'static {
/// This should **only** be called after `poll_data` has ended. /// This should **only** be called after `poll_data` has ended.
/// ///
/// Note: Trailers aren't currently used for HTTP/1, only for HTTP/2. /// Note: Trailers aren't currently used for HTTP/1, only for HTTP/2.
fn poll_trailers(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Result<HeaderMap, Self::Error>>> { fn poll_trailers(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Result<Option<HeaderMap>, Self::Error>> {
drop(cx); drop(cx);
Poll::Ready(None) Poll::Ready(Ok(None))
} }
/// A hint that the `Body` is complete, and doesn't need to be polled more. /// A hint that the `Body` is complete, and doesn't need to be polled more.
@@ -48,27 +49,56 @@ pub trait Payload: Send + 'static {
false false
} }
/// Return a length of the total bytes that will be streamed, if known. /// Returns a `SizeHint` providing an upper and lower bound on the possible size.
/// ///
/// If an exact size of bytes is known, this would allow hyper to send a /// If there is an exact size of bytes known, this would allow hyper to
/// `Content-Length` header automatically, not needing to fall back to /// send a `Content-Length` header automatically, not needing to fall back to
/// `Transfer-Encoding: chunked`. /// `TransferEncoding: chunked`.
/// ///
/// This does not need to be kept updated after polls, it will only be /// This does not need to be kept updated after polls, it will only be
/// called once to create the headers. /// called once to create the headers.
fn content_length(&self) -> Option<u64> { fn size_hint(&self) -> SizeHint {
None SizeHint::default()
}
}
impl<T> Payload for T
where
T: HttpBody + Send + 'static,
T::Data: Send,
T::Error: Into<Box<dyn StdError + Send + Sync>>,
{
type Data = T::Data;
type Error = T::Error;
fn poll_data(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Result<Self::Data, Self::Error>>> {
HttpBody::poll_data(self, cx)
} }
// This API is unstable, and is impossible to use outside of hyper. Some fn poll_trailers(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Result<Option<HeaderMap>, Self::Error>> {
// form of it may become stable in a later version. HttpBody::poll_trailers(self, cx)
//
// The only thing a user *could* do is reference the method, but DON'T
// DO THAT! :)
#[doc(hidden)]
fn __hyper_full_data(&mut self, _: FullDataArg) -> FullDataRet<Self::Data> {
FullDataRet(None)
} }
fn is_end_stream(&self) -> bool {
HttpBody::is_end_stream(self)
}
fn size_hint(&self) -> SizeHint {
HttpBody::size_hint(self)
}
}
impl<T> sealed::Sealed for T
where
T: HttpBody + Send + 'static,
T::Data: Send,
T::Error: Into<Box<dyn StdError + Send + Sync>>,
{
}
mod sealed {
pub trait Sealed {}
} }
/* /*

View File

@@ -251,19 +251,11 @@ where
if let Some(msg) = ready!(self.dispatch.poll_msg(cx)) { if let Some(msg) = ready!(self.dispatch.poll_msg(cx)) {
let (head, mut body) = msg.map_err(crate::Error::new_user_service)?; let (head, mut body) = msg.map_err(crate::Error::new_user_service)?;
// Check if the body knows its full data immediately.
//
// If so, we can skip a bit of bookkeeping that streaming
// bodies need to do.
if let Some(full) = body.__hyper_full_data(FullDataArg(())).0 {
self.conn.write_full_msg(head, full);
return Poll::Ready(Ok(()));
}
let body_type = if body.is_end_stream() { let body_type = if body.is_end_stream() {
self.body_rx.set(None); self.body_rx.set(None);
None None
} else { } else {
let btype = body.content_length() let btype = body.size_hint().exact()
.map(BodyLength::Known) .map(BodyLength::Known)
.or_else(|| Some(BodyLength::Unknown)); .or_else(|| Some(BodyLength::Unknown));
self.body_rx.set(Some(body)); self.body_rx.set(Some(body));

View File

@@ -125,7 +125,7 @@ where
let (head, body) = req.into_parts(); let (head, body) = req.into_parts();
let mut req = ::http::Request::from_parts(head, ()); let mut req = ::http::Request::from_parts(head, ());
super::strip_connection_headers(req.headers_mut(), true); super::strip_connection_headers(req.headers_mut(), true);
if let Some(len) = body.content_length() { if let Some(len) = body.size_hint().exact() {
headers::set_content_length_if_missing(req.headers_mut(), len); headers::set_content_length_if_missing(req.headers_mut(), len);
} }
let eos = body.is_end_stream(); let eos = body.is_end_stream();

View File

@@ -178,17 +178,17 @@ where
} }
match ready!(Pin::new(&mut self.stream).poll_trailers(cx)) { match ready!(Pin::new(&mut self.stream).poll_trailers(cx)) {
Some(Ok(trailers)) => { Ok(Some(trailers)) => {
self.body_tx self.body_tx
.send_trailers(trailers) .send_trailers(trailers)
.map_err(crate::Error::new_body_write)?; .map_err(crate::Error::new_body_write)?;
return Poll::Ready(Ok(())); return Poll::Ready(Ok(()));
} }
Some(Err(e)) => return Poll::Ready(Err(self.on_user_err(e))), Ok(None) => {
None => {
// There were no trailers, so send an empty DATA frame... // There were no trailers, so send an empty DATA frame...
return Poll::Ready(self.send_eos_frame()); return Poll::Ready(self.send_eos_frame());
} }
Err(e) => return Poll::Ready(Err(self.on_user_err(e))),
} }
} }
} }

View File

@@ -289,19 +289,10 @@ where
} }
// automatically set Content-Length from body... // automatically set Content-Length from body...
if let Some(len) = body.content_length() { if let Some(len) = body.size_hint().exact() {
headers::set_content_length_if_missing(res.headers_mut(), len); headers::set_content_length_if_missing(res.headers_mut(), len);
} }
if let Some(full) = body.__hyper_full_data(FullDataArg(())).0 {
let mut body_tx = reply!(false);
let buf = SendBuf(Some(full));
body_tx
.send_data(buf, true)
.map_err(crate::Error::new_body_write)?;
return Poll::Ready(Ok(()));
}
if !body.is_end_stream() { if !body.is_end_stream() {
let body_tx = reply!(false); let body_tx = reply!(false);
H2StreamState::Body(PipeToSendStream::new(body, body_tx)) H2StreamState::Body(PipeToSendStream::new(body, body_tx))

View File

@@ -1710,7 +1710,7 @@ mod conn {
.unwrap(); .unwrap();
let res = client.send_request(req).and_then(move |mut res| { let res = client.send_request(req).and_then(move |mut res| {
assert_eq!(res.status(), hyper::StatusCode::OK); assert_eq!(res.status(), hyper::StatusCode::OK);
assert_eq!(res.body().content_length(), Some(5)); assert_eq!(res.body().size_hint().exact(), Some(5));
assert!(!res.body().is_end_stream()); assert!(!res.body().is_end_stream());
poll_fn(move |ctx| Pin::new(res.body_mut()).poll_data(ctx)).map(Option::unwrap) poll_fn(move |ctx| Pin::new(res.body_mut()).poll_data(ctx)).map(Option::unwrap)
}); });

View File

@@ -298,7 +298,7 @@ mod response_body_lengths {
#[test] #[test]
fn http2_auto_response_with_known_length() { fn http2_auto_response_with_known_length() {
use hyper::body::Payload; use http_body::Body;
let server = serve(); let server = serve();
let addr_str = format!("http://{}", server.addr()); let addr_str = format!("http://{}", server.addr());
@@ -317,7 +317,7 @@ mod response_body_lengths {
.get(uri) .get(uri)
.map_ok(|res| { .map_ok(|res| {
assert_eq!(res.headers().get("content-length").unwrap(), "13"); assert_eq!(res.headers().get("content-length").unwrap(), "13");
assert_eq!(res.body().content_length(), Some(13)); assert_eq!(res.body().size_hint().exact(), Some(13));
() ()
}) })
.map_err(|_e| ()) .map_err(|_e| ())
@@ -326,7 +326,7 @@ mod response_body_lengths {
#[test] #[test]
fn http2_auto_response_with_conflicting_lengths() { fn http2_auto_response_with_conflicting_lengths() {
use hyper::body::Payload; use http_body::Body;
let server = serve(); let server = serve();
let addr_str = format!("http://{}", server.addr()); let addr_str = format!("http://{}", server.addr());
@@ -348,7 +348,7 @@ mod response_body_lengths {
.get(uri) .get(uri)
.map_ok(|res| { .map_ok(|res| {
assert_eq!(res.headers().get("content-length").unwrap(), "10"); assert_eq!(res.headers().get("content-length").unwrap(), "10");
assert_eq!(res.body().content_length(), Some(10)); assert_eq!(res.body().size_hint().exact(), Some(10));
() ()
}) })
.map_err(|_e| ()) .map_err(|_e| ())