feat(body): implement HttpBody for Request and Response

When the body type of a `Request` or `Response` implements `HttpBody`,
the `Request` or `Response` itself now implements `HttpBody`.

This allows writing things like `hyper::body::aggregate(req)` instead of
`hyper::body::aggregate(req.into_body())`.

Closes #2067
This commit is contained in:
Sean McArthur
2019-12-13 10:19:10 -08:00
parent bfda390617
commit 4b6099c7aa
11 changed files with 21 additions and 21 deletions

View File

@@ -25,7 +25,7 @@ futures-core = { version = "0.3", default-features = false }
futures-channel = "0.3" futures-channel = "0.3"
futures-util = { version = "0.3", default-features = false } futures-util = { version = "0.3", default-features = false }
http = "0.2" http = "0.2"
http-body = "0.3" http-body = "0.3.1"
httparse = "1.0" httparse = "1.0"
h2 = "0.2.1" h2 = "0.2.1"
itoa = "0.4.1" itoa = "0.4.1"

View File

@@ -42,7 +42,7 @@ async fn fetch_url(url: hyper::Uri) -> Result<()> {
// Stream the body, writing each chunk to stdout as we get it // Stream the body, writing each chunk to stdout as we get it
// (instead of buffering and printing at the end). // (instead of buffering and printing at the end).
while let Some(next) = res.body_mut().data().await { while let Some(next) = res.data().await {
let chunk = next?; let chunk = next?;
io::stdout().write_all(&chunk).await?; io::stdout().write_all(&chunk).await?;
} }

View File

@@ -30,7 +30,7 @@ async fn fetch_json(url: hyper::Uri) -> Result<Vec<User>> {
let res = client.get(url).await?; let res = client.get(url).await?;
// asynchronously aggregate the chunks of the body // asynchronously aggregate the chunks of the body
let body = hyper::body::aggregate(res.into_body()).await?; let body = hyper::body::aggregate(res).await?;
// try to parse as json with serde_json // try to parse as json with serde_json
let users = serde_json::from_reader(body.reader())?; let users = serde_json::from_reader(body.reader())?;

View File

@@ -17,7 +17,7 @@ async fn param_example(req: Request<Body>) -> Result<Response<Body>, hyper::Erro
(&Method::GET, "/") | (&Method::GET, "/post") => Ok(Response::new(INDEX.into())), (&Method::GET, "/") | (&Method::GET, "/post") => Ok(Response::new(INDEX.into())),
(&Method::POST, "/post") => { (&Method::POST, "/post") => {
// Concatenate the body... // Concatenate the body...
let b = hyper::body::to_bytes(req.into_body()).await?; let b = hyper::body::to_bytes(req).await?;
// Parse the request body. form_urlencoded::parse // Parse the request body. form_urlencoded::parse
// always succeeds, but in general parsing may // always succeeds, but in general parsing may
// fail (for example, an invalid post of json), so // fail (for example, an invalid post of json), so

View File

@@ -40,7 +40,7 @@ async fn client_request_response(client: &Client<HttpConnector>) -> Result<Respo
async fn api_post_response(req: Request<Body>) -> Result<Response<Body>> { async fn api_post_response(req: Request<Body>) -> Result<Response<Body>> {
// Aggregate the body... // Aggregate the body...
let whole_body = hyper::body::aggregate(req.into_body()).await?; let whole_body = hyper::body::aggregate(req).await?;
// Decode as JSON... // Decode as JSON...
let mut data: serde_json::Value = serde_json::from_reader(whole_body.reader())?; let mut data: serde_json::Value = serde_json::from_reader(whole_body.reader())?;
// Change the JSON... // Change the JSON...

View File

@@ -18,7 +18,7 @@ type BodySender = mpsc::Sender<Result<Bytes, crate::Error>>;
/// A stream of `Bytes`, used when receiving bodies. /// A stream of `Bytes`, used when receiving bodies.
/// ///
/// A good default [`HttpBody`](crates::body::HttpBody) to use in many /// A good default [`HttpBody`](crate::body::HttpBody) to use in many
/// applications. /// applications.
#[must_use = "streams do nothing unless polled"] #[must_use = "streams do nothing unless polled"]
pub struct Body { pub struct Body {

View File

@@ -5,7 +5,7 @@ use super::HttpBody;
/// Concatenate the buffers from a body into a single `Bytes` asynchronously. /// Concatenate the buffers from a body into a single `Bytes` asynchronously.
/// ///
/// This may require copying the data into a single buffer. If you don't need /// This may require copying the data into a single buffer. If you don't need
/// a contiguous buffer, prefer the [`aggregate`](crate::body::aggregate) /// a contiguous buffer, prefer the [`aggregate`](crate::body::aggregate())
/// function. /// function.
pub async fn to_bytes<T>(body: T) -> Result<Bytes, T::Error> pub async fn to_bytes<T>(body: T) -> Result<Bytes, T::Error>
where where

View File

@@ -40,7 +40,7 @@
//! println!("status: {}", res.status()); //! println!("status: {}", res.status());
//! //!
//! // Concatenate the body stream into a single buffer... //! // Concatenate the body stream into a single buffer...
//! let buf = hyper::body::to_bytes(res.into_body()).await?; //! let buf = hyper::body::to_bytes(res).await?;
//! //!
//! println!("body: {:?}", buf); //! println!("body: {:?}", buf);
//! # Ok(()) //! # Ok(())

View File

@@ -146,7 +146,7 @@ macro_rules! test {
); );
)* )*
let body = rt.block_on(concat(res.into_body())) let body = rt.block_on(concat(res))
.expect("body concat wait"); .expect("body concat wait");
let expected_res_body = Option::<&[u8]>::from($response_body) let expected_res_body = Option::<&[u8]>::from($response_body)
@@ -1065,7 +1065,7 @@ mod dispatch_impl {
.request(req) .request(req)
.and_then(move |res| { .and_then(move |res| {
assert_eq!(res.status(), hyper::StatusCode::OK); assert_eq!(res.status(), hyper::StatusCode::OK);
concat(res.into_body()) concat(res)
}) })
.map_ok(|_| ()) .map_ok(|_| ())
}; };
@@ -1128,7 +1128,7 @@ mod dispatch_impl {
.unwrap(); .unwrap();
let res = client.request(req).and_then(move |res| { let res = client.request(req).and_then(move |res| {
assert_eq!(res.status(), hyper::StatusCode::OK); assert_eq!(res.status(), hyper::StatusCode::OK);
concat(res.into_body()) concat(res)
}); });
let rx = rx1.expect("thread panicked"); let rx = rx1.expect("thread panicked");
@@ -1296,7 +1296,7 @@ mod dispatch_impl {
.unwrap(); .unwrap();
let res = client.request(req).and_then(move |res| { let res = client.request(req).and_then(move |res| {
assert_eq!(res.status(), hyper::StatusCode::OK); assert_eq!(res.status(), hyper::StatusCode::OK);
concat(res.into_body()) concat(res)
}); });
let rx = rx1.expect("thread panicked"); let rx = rx1.expect("thread panicked");
@@ -1342,7 +1342,7 @@ mod dispatch_impl {
.unwrap(); .unwrap();
let res = client.request(req).and_then(move |res| { let res = client.request(req).and_then(move |res| {
assert_eq!(res.status(), hyper::StatusCode::OK); assert_eq!(res.status(), hyper::StatusCode::OK);
concat(res.into_body()) concat(res)
}); });
let rx = rx1.expect("thread panicked"); let rx = rx1.expect("thread panicked");
@@ -2098,7 +2098,7 @@ mod conn {
let res = client.send_request(req).and_then(move |res| { let res = client.send_request(req).and_then(move |res| {
assert_eq!(res.status(), hyper::StatusCode::OK); assert_eq!(res.status(), hyper::StatusCode::OK);
concat(res.into_body()) concat(res)
}); });
let rx = rx1.expect("thread panicked"); let rx = rx1.expect("thread panicked");
let rx = rx.then(|_| tokio::time::delay_for(Duration::from_millis(200))); let rx = rx.then(|_| tokio::time::delay_for(Duration::from_millis(200)));
@@ -2144,7 +2144,7 @@ mod conn {
let res = client.send_request(req).and_then(move |res| { let res = client.send_request(req).and_then(move |res| {
assert_eq!(res.status(), hyper::StatusCode::OK); assert_eq!(res.status(), hyper::StatusCode::OK);
concat(res.into_body()) concat(res)
}); });
let rx = rx1.expect("thread panicked"); let rx = rx1.expect("thread panicked");
let rx = rx.then(|_| tokio::time::delay_for(Duration::from_millis(200))); let rx = rx.then(|_| tokio::time::delay_for(Duration::from_millis(200)));
@@ -2184,7 +2184,7 @@ mod conn {
.unwrap(); .unwrap();
let res1 = client.send_request(req).and_then(move |res| { let res1 = client.send_request(req).and_then(move |res| {
assert_eq!(res.status(), hyper::StatusCode::OK); assert_eq!(res.status(), hyper::StatusCode::OK);
concat(res.into_body()) concat(res)
}); });
// pipelined request will hit NotReady, and thus should return an Error::Cancel // pipelined request will hit NotReady, and thus should return an Error::Cancel
@@ -2258,7 +2258,7 @@ mod conn {
let res = client.send_request(req).and_then(move |res| { let res = client.send_request(req).and_then(move |res| {
assert_eq!(res.status(), hyper::StatusCode::SWITCHING_PROTOCOLS); assert_eq!(res.status(), hyper::StatusCode::SWITCHING_PROTOCOLS);
assert_eq!(res.headers()["Upgrade"], "foobar"); assert_eq!(res.headers()["Upgrade"], "foobar");
concat(res.into_body()) concat(res)
}); });
let rx = rx1.expect("thread panicked"); let rx = rx1.expect("thread panicked");
@@ -2348,7 +2348,7 @@ mod conn {
.send_request(req) .send_request(req)
.and_then(move |res| { .and_then(move |res| {
assert_eq!(res.status(), hyper::StatusCode::OK); assert_eq!(res.status(), hyper::StatusCode::OK);
concat(res.into_body()) concat(res)
}) })
.map_ok(|body| { .map_ok(|body| {
assert_eq!(body.as_ref(), b""); assert_eq!(body.as_ref(), b"");

View File

@@ -1887,7 +1887,7 @@ impl tower_service::Service<Request<Body>> for TestService {
let replies = self.reply.clone(); let replies = self.reply.clone();
Box::pin(async move { Box::pin(async move {
while let Some(chunk) = req.body_mut().data().await { while let Some(chunk) = req.data().await {
match chunk { match chunk {
Ok(chunk) => { Ok(chunk) => {
tx.send(Msg::Chunk(chunk.to_vec())).unwrap(); tx.send(Msg::Chunk(chunk.to_vec())).unwrap();

View File

@@ -355,7 +355,7 @@ async fn async_test(cfg: __TestConfig) {
func(&req.headers()); func(&req.headers());
} }
let sbody = sreq.body; let sbody = sreq.body;
hyper::body::to_bytes(req.into_body()).map_ok(move |body| { hyper::body::to_bytes(req).map_ok(move |body| {
assert_eq!(body.as_ref(), sbody.as_slice(), "client body"); assert_eq!(body.as_ref(), sbody.as_slice(), "client body");
let mut res = Response::builder() let mut res = Response::builder()
@@ -410,7 +410,7 @@ async fn async_test(cfg: __TestConfig) {
for func in &cheaders { for func in &cheaders {
func(&res.headers()); func(&res.headers());
} }
hyper::body::to_bytes(res.into_body()) hyper::body::to_bytes(res)
}) })
.map_ok(move |body| { .map_ok(move |body| {
assert_eq!(body.as_ref(), cbody.as_slice(), "server body"); assert_eq!(body.as_ref(), cbody.as_slice(), "server body");