From 79c32f89530e47735155eb9bd19466bcb6aec90d Mon Sep 17 00:00:00 2001 From: Lucio Franco Date: Thu, 22 Aug 2019 14:13:27 -0700 Subject: [PATCH] feat(body): Update `Payload` to be a trait alias of `http_body::Body` (#1908) --- Cargo.toml | 3 +- src/body/body.rs | 78 +++++++++++++--------------------------- src/body/payload.rs | 64 ++++++++++++++++++++++++--------- src/proto/h1/dispatch.rs | 10 +----- src/proto/h2/client.rs | 2 +- src/proto/h2/mod.rs | 6 ++-- src/proto/h2/server.rs | 11 +----- tests/client.rs | 2 +- tests/server.rs | 8 ++--- 9 files changed, 84 insertions(+), 100 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 3568f31a..54903ebb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,7 +27,7 @@ futures-core-preview = { version = "0.3.0-alpha.18" } futures-channel-preview = { version = "0.3.0-alpha.18" } futures-util-preview = { version = "0.3.0-alpha.18" } http = "0.1.15" -http-body = "0.1" +http-body = { git = "https://github.com/hyperium/http-body" } httparse = "1.0" h2 = { git = "https://github.com/hyperium/h2" } iovec = "0.1" @@ -37,7 +37,6 @@ net2 = { version = "0.2.32", optional = true } pin-utils = "0.1.0-alpha.4" time = "0.1" tokio = { version = "0.2.0-alpha.2", optional = true, default-features = false, features = ["rt-full"] } -#tokio-buf = "0.2.0-alpha.1" tower-service = "=0.3.0-alpha.1" tokio-executor = "0.2.0-alpha.2" tokio-io = "0.2.0-alpha.2" diff --git a/src/body/body.rs b/src/body/body.rs index d147cf69..e6e9f12c 100644 --- a/src/body/body.rs +++ b/src/body/body.rs @@ -6,7 +6,7 @@ use bytes::Bytes; use futures_core::{Stream, TryStream}; use futures_channel::{mpsc, oneshot}; use futures_util::TryStreamExt; -//use tokio_buf::SizeHint; +use http_body::{SizeHint, Body as HttpBody}; use http::HeaderMap; use crate::common::{Future, Never, Pin, Poll, task}; @@ -296,7 +296,7 @@ impl Default for Body { } } -impl Payload for Body { +impl HttpBody for Body { type Data = Chunk; type Error = crate::Error; @@ -304,14 +304,13 @@ impl Payload for Body { self.poll_eof(cx) } - fn poll_trailers(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll>> { + fn poll_trailers(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll, Self::Error>> { match self.kind { Kind::H2 { recv: ref mut h2, .. } => match ready!(h2.poll_trailers(cx)) { - Ok(Some(t)) => Poll::Ready(Some(Ok(t))), - Err(e) => Poll::Ready(Some(Err(crate::Error::new_h2(e)))), - Ok(None) => Poll::Ready(None), + Ok(t) => Poll::Ready(Ok(t)), + Err(e) => Poll::Ready(Err(crate::Error::new_h2(e))), }, - _ => Poll::Ready(None), + _ => Poll::Ready(Ok(None)), } } @@ -324,21 +323,26 @@ impl Payload for Body { } } - fn content_length(&self) -> Option { + fn size_hint(&self) -> SizeHint { match self.kind { - Kind::Once(Some(ref val)) => Some(val.len() as u64), - Kind::Once(None) => Some(0), - Kind::Wrapped(..) => None, - Kind::Chan { content_length, .. } | Kind::H2 { content_length, .. } => content_length, - } - } + Kind::Once(Some(ref val)) => { + let mut hint = SizeHint::default(); + hint.set_exact(val.len() as u64); + hint + }, + Kind::Once(None) => { + SizeHint::default() + }, + Kind::Wrapped(..) => SizeHint::default(), + Kind::Chan { content_length, .. } | Kind::H2 { content_length, .. } => { + let mut hint = SizeHint::default(); - // We can improve the performance of `Body` when we know it is a Once kind. - #[doc(hidden)] - fn __hyper_full_data(&mut self, _: FullDataArg) -> FullDataRet { - match self.kind { - Kind::Once(ref mut val) => FullDataRet(val.take()), - _ => FullDataRet(None), + if let Some(content_length) = content_length { + hint.set_exact(content_length as u64); + } + + hint + }, } } } @@ -363,43 +367,11 @@ impl fmt::Debug for Body { } } -/* -impl ::http_body::Body for Body { - type Data = Chunk; - type Error = crate::Error; - - fn poll_data(&mut self) -> Poll, Self::Error> { - ::poll_data(self) - } - - fn poll_trailers(&mut self) -> Poll, Self::Error> { - ::poll_trailers(self) - } - - fn is_end_stream(&self) -> bool { - ::is_end_stream(self) - } - - fn size_hint(&self) -> SizeHint { - let mut hint = SizeHint::default(); - - let content_length = ::content_length(self); - - if let Some(size) = content_length { - hint.set_upper(size); - hint.set_lower(size) - } - - hint - } -} -*/ - impl Stream for Body { type Item = crate::Result; fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { - self.poll_data(cx) + HttpBody::poll_data(self, cx) } } diff --git a/src/body/payload.rs b/src/body/payload.rs index fe770121..88c2d741 100644 --- a/src/body/payload.rs +++ b/src/body/payload.rs @@ -5,12 +5,13 @@ use http::HeaderMap; use crate::common::{Pin, Poll, task}; use super::internal::{FullDataArg, FullDataRet}; +use http_body::{Body as HttpBody, SizeHint}; /// This trait represents a streaming body of a `Request` or `Response`. /// /// The built-in implementation of this trait is [`Body`](::Body), in case you /// don't need to customize a send stream for your own application. -pub trait Payload: Send + 'static { +pub trait Payload: sealed::Sealed + Send + 'static { /// A buffer of bytes representing a single chunk of a body. type Data: Buf + Send; @@ -28,9 +29,9 @@ pub trait Payload: Send + 'static { /// This should **only** be called after `poll_data` has ended. /// /// Note: Trailers aren't currently used for HTTP/1, only for HTTP/2. - fn poll_trailers(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll>> { + fn poll_trailers(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll, Self::Error>> { drop(cx); - Poll::Ready(None) + Poll::Ready(Ok(None)) } /// A hint that the `Body` is complete, and doesn't need to be polled more. @@ -48,27 +49,56 @@ pub trait Payload: Send + 'static { false } - /// Return a length of the total bytes that will be streamed, if known. + /// Returns a `SizeHint` providing an upper and lower bound on the possible size. /// - /// If an exact size of bytes is known, this would allow hyper to send a - /// `Content-Length` header automatically, not needing to fall back to - /// `Transfer-Encoding: chunked`. + /// If there is an exact size of bytes known, this would allow hyper to + /// send a `Content-Length` header automatically, not needing to fall back to + /// `TransferEncoding: chunked`. /// /// This does not need to be kept updated after polls, it will only be /// called once to create the headers. - fn content_length(&self) -> Option { - None + fn size_hint(&self) -> SizeHint { + SizeHint::default() + } +} + + +impl Payload for T +where + T: HttpBody + Send + 'static, + T::Data: Send, + T::Error: Into>, +{ + type Data = T::Data; + type Error = T::Error; + + fn poll_data(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll>> { + HttpBody::poll_data(self, cx) } - // This API is unstable, and is impossible to use outside of hyper. Some - // form of it may become stable in a later version. - // - // The only thing a user *could* do is reference the method, but DON'T - // DO THAT! :) - #[doc(hidden)] - fn __hyper_full_data(&mut self, _: FullDataArg) -> FullDataRet { - FullDataRet(None) + fn poll_trailers(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll, Self::Error>> { + HttpBody::poll_trailers(self, cx) } + + fn is_end_stream(&self) -> bool { + HttpBody::is_end_stream(self) + } + + fn size_hint(&self) -> SizeHint { + HttpBody::size_hint(self) + } +} + +impl sealed::Sealed for T +where + T: HttpBody + Send + 'static, + T::Data: Send, + T::Error: Into>, +{ +} + +mod sealed { + pub trait Sealed {} } /* diff --git a/src/proto/h1/dispatch.rs b/src/proto/h1/dispatch.rs index 28c5a5d3..3e56c68b 100644 --- a/src/proto/h1/dispatch.rs +++ b/src/proto/h1/dispatch.rs @@ -251,19 +251,11 @@ where if let Some(msg) = ready!(self.dispatch.poll_msg(cx)) { let (head, mut body) = msg.map_err(crate::Error::new_user_service)?; - // Check if the body knows its full data immediately. - // - // If so, we can skip a bit of bookkeeping that streaming - // bodies need to do. - if let Some(full) = body.__hyper_full_data(FullDataArg(())).0 { - self.conn.write_full_msg(head, full); - return Poll::Ready(Ok(())); - } let body_type = if body.is_end_stream() { self.body_rx.set(None); None } else { - let btype = body.content_length() + let btype = body.size_hint().exact() .map(BodyLength::Known) .or_else(|| Some(BodyLength::Unknown)); self.body_rx.set(Some(body)); diff --git a/src/proto/h2/client.rs b/src/proto/h2/client.rs index aa338363..d05dd1c3 100644 --- a/src/proto/h2/client.rs +++ b/src/proto/h2/client.rs @@ -125,7 +125,7 @@ where let (head, body) = req.into_parts(); let mut req = ::http::Request::from_parts(head, ()); super::strip_connection_headers(req.headers_mut(), true); - if let Some(len) = body.content_length() { + if let Some(len) = body.size_hint().exact() { headers::set_content_length_if_missing(req.headers_mut(), len); } let eos = body.is_end_stream(); diff --git a/src/proto/h2/mod.rs b/src/proto/h2/mod.rs index 696fb015..a4d1a509 100644 --- a/src/proto/h2/mod.rs +++ b/src/proto/h2/mod.rs @@ -178,17 +178,17 @@ where } match ready!(Pin::new(&mut self.stream).poll_trailers(cx)) { - Some(Ok(trailers)) => { + Ok(Some(trailers)) => { self.body_tx .send_trailers(trailers) .map_err(crate::Error::new_body_write)?; return Poll::Ready(Ok(())); } - Some(Err(e)) => return Poll::Ready(Err(self.on_user_err(e))), - None => { + Ok(None) => { // There were no trailers, so send an empty DATA frame... return Poll::Ready(self.send_eos_frame()); } + Err(e) => return Poll::Ready(Err(self.on_user_err(e))), } } } diff --git a/src/proto/h2/server.rs b/src/proto/h2/server.rs index 8f0dc526..702073c1 100644 --- a/src/proto/h2/server.rs +++ b/src/proto/h2/server.rs @@ -289,19 +289,10 @@ where } // automatically set Content-Length from body... - if let Some(len) = body.content_length() { + if let Some(len) = body.size_hint().exact() { headers::set_content_length_if_missing(res.headers_mut(), len); } - if let Some(full) = body.__hyper_full_data(FullDataArg(())).0 { - let mut body_tx = reply!(false); - let buf = SendBuf(Some(full)); - body_tx - .send_data(buf, true) - .map_err(crate::Error::new_body_write)?; - return Poll::Ready(Ok(())); - } - if !body.is_end_stream() { let body_tx = reply!(false); H2StreamState::Body(PipeToSendStream::new(body, body_tx)) diff --git a/tests/client.rs b/tests/client.rs index c88c6f8d..92682532 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -1710,7 +1710,7 @@ mod conn { .unwrap(); let res = client.send_request(req).and_then(move |mut res| { assert_eq!(res.status(), hyper::StatusCode::OK); - assert_eq!(res.body().content_length(), Some(5)); + assert_eq!(res.body().size_hint().exact(), Some(5)); assert!(!res.body().is_end_stream()); poll_fn(move |ctx| Pin::new(res.body_mut()).poll_data(ctx)).map(Option::unwrap) }); diff --git a/tests/server.rs b/tests/server.rs index 5f379473..cb201d96 100644 --- a/tests/server.rs +++ b/tests/server.rs @@ -298,7 +298,7 @@ mod response_body_lengths { #[test] fn http2_auto_response_with_known_length() { - use hyper::body::Payload; + use http_body::Body; let server = serve(); let addr_str = format!("http://{}", server.addr()); @@ -317,7 +317,7 @@ mod response_body_lengths { .get(uri) .map_ok(|res| { assert_eq!(res.headers().get("content-length").unwrap(), "13"); - assert_eq!(res.body().content_length(), Some(13)); + assert_eq!(res.body().size_hint().exact(), Some(13)); () }) .map_err(|_e| ()) @@ -326,7 +326,7 @@ mod response_body_lengths { #[test] fn http2_auto_response_with_conflicting_lengths() { - use hyper::body::Payload; + use http_body::Body; let server = serve(); let addr_str = format!("http://{}", server.addr()); @@ -348,7 +348,7 @@ mod response_body_lengths { .get(uri) .map_ok(|res| { assert_eq!(res.headers().get("content-length").unwrap(), "10"); - assert_eq!(res.body().content_length(), Some(10)); + assert_eq!(res.body().size_hint().exact(), Some(10)); () }) .map_err(|_e| ())