From 5356776834715a17879cbe6203d209000935e6f7 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Tue, 10 Sep 2019 11:39:50 -0700 Subject: [PATCH] refine async API - Converted `Response::text` and `Response::json` to `async fn` - Added `Response::bytes` async fn as a counterpat to `text`. - Added `Response::chunk` async fn to stream chunks of the response body. - Added `From for Body` to allow piping a response as a request body. - Removed `Decoder` from public API - Removed body accessor methods from `Response` - Removed `Chunk` type, replaced with `bytes::Bytes`. - Removed public `impl Stream for Body`. --- examples/simple.rs | 2 +- src/async_impl/body.rs | 207 +++++------------------- src/async_impl/decoder.rs | 41 ++--- src/async_impl/mod.rs | 4 +- src/async_impl/multipart.rs | 31 ++-- src/async_impl/response.rs | 303 +++++++++++++++++++++--------------- src/blocking/response.rs | 17 +- src/lib.rs | 6 +- tests/blocking.rs | 12 +- tests/client.rs | 82 ++++++++-- tests/timeouts.rs | 2 +- 11 files changed, 337 insertions(+), 370 deletions(-) diff --git a/examples/simple.rs b/examples/simple.rs index c42f83e..0652cbf 100644 --- a/examples/simple.rs +++ b/examples/simple.rs @@ -2,7 +2,7 @@ #[tokio::main] async fn main() -> Result<(), reqwest::Error> { - let mut res = reqwest::Client::new() + let res = reqwest::Client::new() .get("https://hyper.rs") .send() .await?; diff --git a/src/async_impl/body.rs b/src/async_impl/body.rs index 9afbc25..faa182f 100644 --- a/src/async_impl/body.rs +++ b/src/async_impl/body.rs @@ -1,4 +1,4 @@ -use bytes::{Buf, Bytes}; +use bytes::Bytes; use futures::Stream; use hyper::body::Payload; use std::fmt; @@ -7,11 +7,14 @@ use std::pin::Pin; use std::task::{Context, Poll}; use tokio::timer::Delay; -/// An asynchronous `Stream`. +/// An asynchronous request body. pub struct Body { inner: Inner, } +// The `Stream` trait isn't stable, so the impl isn't public. +pub(crate) struct ImplStream(Body); + enum Inner { Reusable(Bytes), Hyper { @@ -21,13 +24,6 @@ enum Inner { } impl Body { - pub(crate) fn content_length(&self) -> Option { - match self.inner { - Inner::Reusable(ref bytes) => Some(bytes.len() as u64), - Inner::Hyper { ref body, .. } => body.size_hint().exact(), - } - } - /// Wrap a futures `Stream` in a box inside `Body`. /// /// # Example @@ -56,14 +52,12 @@ impl Body { Body::wrap(hyper::body::Body::wrap_stream(stream)) } - #[inline] pub(crate) fn response(body: hyper::Body, timeout: Option) -> Body { Body { inner: Inner::Hyper { body, timeout }, } } - #[inline] pub(crate) fn wrap(body: hyper::Body) -> Body { Body { inner: Inner::Hyper { @@ -73,19 +67,16 @@ impl Body { } } - #[inline] pub(crate) fn empty() -> Body { Body::wrap(hyper::Body::empty()) } - #[inline] pub(crate) fn reusable(chunk: Bytes) -> Body { Body { inner: Inner::Reusable(chunk), } } - #[inline] pub(crate) fn into_hyper(self) -> (Option, hyper::Body) { match self.inner { Inner::Reusable(chunk) => (Some(chunk.clone()), chunk.into()), @@ -96,44 +87,15 @@ impl Body { } } - fn inner(self: Pin<&mut Self>) -> Pin<&mut Inner> { - unsafe { Pin::map_unchecked_mut(self, |x| &mut x.inner) } + pub(crate) fn into_stream(self) -> ImplStream { + ImplStream(self) } -} -impl Stream for Body { - type Item = Result; - - #[inline] - fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - let opt_try_chunk = match self.inner().get_mut() { - Inner::Hyper { - ref mut body, - ref mut timeout, - } => { - if let Some(ref mut timeout) = timeout { - if let Poll::Ready(()) = Pin::new(timeout).poll(cx) { - return Poll::Ready(Some(Err(crate::error::timedout(None)))); - } - } - futures::ready!(Pin::new(body).poll_data(cx)).map(|opt_chunk| { - opt_chunk - .map(|c| Chunk { inner: c }) - .map_err(crate::error::from) - }) - } - Inner::Reusable(ref mut bytes) => { - if bytes.is_empty() { - None - } else { - let chunk = Chunk::from_chunk(bytes.clone()); - *bytes = Bytes::new(); - Some(Ok(chunk)) - } - } - }; - - Poll::Ready(opt_try_chunk) + pub(crate) fn content_length(&self) -> Option { + match self.inner { + Inner::Reusable(ref bytes) => Some(bytes.len() as u64), + Inner::Hyper { ref body, .. } => body.size_hint().exact(), + } } } @@ -172,126 +134,41 @@ impl From<&'static str> for Body { } } -/// A chunk of bytes for a `Body`. -/// -/// A `Chunk` can be treated like `&[u8]`. -#[derive(Default)] -pub struct Chunk { - inner: hyper::Chunk, -} - -impl Chunk { - #[inline] - pub(crate) fn from_chunk(chunk: Bytes) -> Chunk { - Chunk { - inner: hyper::Chunk::from(chunk), - } - } -} - -impl Buf for Chunk { - fn bytes(&self) -> &[u8] { - self.inner.bytes() - } - - fn remaining(&self) -> usize { - self.inner.remaining() - } - - fn advance(&mut self, n: usize) { - self.inner.advance(n); - } -} - -impl AsRef<[u8]> for Chunk { - #[inline] - fn as_ref(&self) -> &[u8] { - &*self - } -} - -impl std::ops::Deref for Chunk { - type Target = [u8]; - #[inline] - fn deref(&self) -> &Self::Target { - self.inner.as_ref() - } -} - -impl Extend for Chunk { - fn extend(&mut self, iter: T) - where - T: IntoIterator, - { - self.inner.extend(iter) - } -} - -impl IntoIterator for Chunk { - type Item = u8; - //XXX: exposing type from hyper! - type IntoIter = ::IntoIter; - fn into_iter(self) -> Self::IntoIter { - self.inner.into_iter() - } -} - -impl From> for Chunk { - fn from(v: Vec) -> Chunk { - Chunk { inner: v.into() } - } -} - -impl From<&'static [u8]> for Chunk { - fn from(slice: &'static [u8]) -> Chunk { - Chunk { - inner: slice.into(), - } - } -} - -impl From for Chunk { - fn from(s: String) -> Chunk { - Chunk { inner: s.into() } - } -} - -impl From<&'static str> for Chunk { - fn from(slice: &'static str) -> Chunk { - Chunk { - inner: slice.into(), - } - } -} - -impl From for Chunk { - fn from(bytes: Bytes) -> Chunk { - Chunk { - inner: bytes.into(), - } - } -} - -impl From for Bytes { - fn from(chunk: Chunk) -> Bytes { - chunk.inner.into() - } -} - -impl From for hyper::Chunk { - fn from(val: Chunk) -> hyper::Chunk { - val.inner - } -} - impl fmt::Debug for Body { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("Body").finish() } } -impl fmt::Debug for Chunk { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - fmt::Debug::fmt(&self.inner, f) +// ===== impl ImplStream ===== + +impl Stream for ImplStream { + type Item = Result; + + #[inline] + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let opt_try_chunk = match self.0.inner { + Inner::Hyper { + ref mut body, + ref mut timeout, + } => { + if let Some(ref mut timeout) = timeout { + if let Poll::Ready(()) = Pin::new(timeout).poll(cx) { + return Poll::Ready(Some(Err(crate::error::timedout(None)))); + } + } + futures::ready!(Pin::new(body).poll_data(cx)) + .map(|opt_chunk| opt_chunk.map(Into::into).map_err(crate::error::from)) + } + Inner::Reusable(ref mut bytes) => { + if bytes.is_empty() { + None + } else { + Some(Ok(std::mem::replace(bytes, Bytes::new()))) + } + } + }; + + Poll::Ready(opt_try_chunk) } } diff --git a/src/async_impl/decoder.rs b/src/async_impl/decoder.rs index ff56c4e..b1eb814 100644 --- a/src/async_impl/decoder.rs +++ b/src/async_impl/decoder.rs @@ -17,36 +17,38 @@ use std::mem; use std::pin::Pin; use std::task::{Context, Poll}; +use async_compression::stream::GzipDecoder; use bytes::Bytes; +use futures::stream::Peekable; use futures::Stream; use hyper::header::{CONTENT_ENCODING, CONTENT_LENGTH, TRANSFER_ENCODING}; use hyper::HeaderMap; use log::warn; -use super::{Body, Chunk}; +use super::Body; use crate::error; /// A response decompressor over a non-blocking stream of chunks. /// /// The inner decoder may be constructed asynchronously. -pub struct Decoder { +pub(crate) struct Decoder { inner: Inner, } enum Inner { /// A `PlainText` decoder just returns the response content as is. - PlainText(Body), + PlainText(super::body::ImplStream), /// A `Gzip` decoder will uncompress the gzipped response content before returning it. - Gzip(async_compression::stream::GzipDecoder>), + Gzip(GzipDecoder>), /// A decoder that doesn't have a value yet. Pending(Pending), } /// A future attempt to poll the response body for EOF so we know whether to use gzip or not. -struct Pending(futures::stream::Peekable); +struct Pending(Peekable); -struct BodyBytes(Body); +struct IoStream(super::body::ImplStream); impl fmt::Debug for Decoder { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { @@ -59,9 +61,9 @@ impl Decoder { /// /// This decoder will produce a single 0 byte chunk. #[inline] - pub fn empty() -> Decoder { + pub(crate) fn empty() -> Decoder { Decoder { - inner: Inner::PlainText(Body::empty()), + inner: Inner::PlainText(Body::empty().into_stream()), } } @@ -70,7 +72,7 @@ impl Decoder { /// This decoder will emit the underlying chunks as-is. fn plain_text(body: Body) -> Decoder { Decoder { - inner: Inner::PlainText(body), + inner: Inner::PlainText(body.into_stream()), } } @@ -81,7 +83,7 @@ impl Decoder { use futures::stream::StreamExt; Decoder { - inner: Inner::Pending(Pending(BodyBytes(body).peekable())), + inner: Inner::Pending(Pending(IoStream(body.into_stream()).peekable())), } } @@ -128,7 +130,7 @@ impl Decoder { } impl Stream for Decoder { - type Item = Result; + type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { // Do a read or poll for a pending decoder value. @@ -141,7 +143,7 @@ impl Stream for Decoder { Inner::PlainText(ref mut body) => return Pin::new(body).poll_next(cx), Inner::Gzip(ref mut decoder) => { return match futures::ready!(Pin::new(decoder).poll_next(cx)) { - Some(Ok(bytes)) => Poll::Ready(Some(Ok(bytes.into()))), + Some(Ok(bytes)) => Poll::Ready(Some(Ok(bytes))), Some(Err(err)) => Poll::Ready(Some(Err(crate::error::from_io(err)))), None => Poll::Ready(None), } @@ -169,22 +171,23 @@ impl Future for Pending { .expect("just peeked Some") .unwrap_err())); } - None => return Poll::Ready(Ok(Inner::PlainText(Body::empty()))), + None => return Poll::Ready(Ok(Inner::PlainText(Body::empty().into_stream()))), }; - let body = mem::replace(&mut self.0, BodyBytes(Body::empty()).peekable()); - Poll::Ready(Ok(Inner::Gzip( - async_compression::stream::GzipDecoder::new(body), - ))) + let body = mem::replace( + &mut self.0, + IoStream(Body::empty().into_stream()).peekable(), + ); + Poll::Ready(Ok(Inner::Gzip(GzipDecoder::new(body)))) } } -impl Stream for BodyBytes { +impl Stream for IoStream { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { match futures::ready!(Pin::new(&mut self.0).poll_next(cx)) { - Some(Ok(chunk)) => Poll::Ready(Some(Ok(chunk.into()))), + Some(Ok(chunk)) => Poll::Ready(Some(Ok(chunk))), Some(Err(err)) => Poll::Ready(Some(Err(err.into_io()))), None => Poll::Ready(None), } diff --git a/src/async_impl/mod.rs b/src/async_impl/mod.rs index 8218d83..9f3cd00 100644 --- a/src/async_impl/mod.rs +++ b/src/async_impl/mod.rs @@ -1,6 +1,6 @@ -pub use self::body::{Body, Chunk}; +pub use self::body::Body; pub use self::client::{Client, ClientBuilder}; -pub use self::decoder::Decoder; +pub(crate) use self::decoder::Decoder; pub use self::request::{Request, RequestBuilder}; pub use self::response::{Response, ResponseBuilderExt}; diff --git a/src/async_impl/multipart.rs b/src/async_impl/multipart.rs index 96c9c94..b360035 100644 --- a/src/async_impl/multipart.rs +++ b/src/async_impl/multipart.rs @@ -7,7 +7,7 @@ use mime_guess::Mime; use percent_encoding::{self, AsciiSet, NON_ALPHANUMERIC}; use uuid::Uuid; -use futures::{Stream, StreamExt}; +use futures::StreamExt; use super::Body; @@ -137,7 +137,7 @@ impl Form { hyper::Body::wrap_stream( boundary .chain(header) - .chain(hyper::Body::wrap_stream(part.value)) + .chain(hyper::Body::wrap_stream(part.value.into_stream())) .chain(hyper::Body::from("\r\n".to_owned())), ) } @@ -190,15 +190,8 @@ impl Part { } /// Makes a new parameter from an arbitrary stream. - pub fn stream(value: T) -> Part - where - T: Stream> + Send + Sync + 'static, - E: std::error::Error + Send + Sync + 'static, - hyper::Chunk: std::convert::From, - { - Part::new(Body::wrap(hyper::Body::wrap_stream( - value.map(|chunk| chunk.into()), - ))) + pub fn stream>(value: T) -> Part { + Part::new(value.into()) } fn new(value: Body) -> Part { @@ -500,21 +493,17 @@ mod tests { let mut form = Form::new() .part( "reader1", - Part::stream(futures::stream::once(futures::future::ready::< - Result, - >(Ok( - hyper::Chunk::from("part1".to_owned()), - )))), + Part::stream(Body::wrap_stream(futures::stream::once( + futures::future::ready::>(Ok("part1".to_owned())), + ))), ) .part("key1", Part::text("value1")) .part("key2", Part::text("value2").mime(mime::IMAGE_BMP)) .part( "reader2", - Part::stream(futures::stream::once(futures::future::ready::< - Result, - >(Ok( - hyper::Chunk::from("part2".to_owned()), - )))), + Part::stream(Body::wrap_stream(futures::stream::once( + futures::future::ready::>(Ok("part2".to_owned())), + ))), ) .part("key3", Part::text("value3").file_name("filename")); form.inner.boundary = "boundary".to_string(); diff --git a/src/async_impl/response.rs b/src/async_impl/response.rs index 4f7e39c..f7d285f 100644 --- a/src/async_impl/response.rs +++ b/src/async_impl/response.rs @@ -1,13 +1,10 @@ use std::borrow::Cow; use std::fmt; -use std::marker::PhantomData; -use std::mem; use std::net::SocketAddr; -use std::pin::Pin; -use std::task::{Context, Poll}; +use bytes::Bytes; use encoding_rs::{Encoding, UTF_8}; -use futures::{Future, FutureExt, TryStreamExt}; +use futures::{StreamExt, TryStreamExt}; use http; use hyper::client::connect::HttpInfo; use hyper::header::CONTENT_LENGTH; @@ -21,12 +18,8 @@ use url::Url; use super::body::Body; use super::Decoder; -use crate::async_impl::Chunk; use crate::cookie; -/// https://github.com/rust-lang-nursery/futures-rs/issues/1812 -type ConcatDecoder = Pin> + Send>>; - /// A Response to a submitted `Request`. pub struct Response { status: StatusCode, @@ -71,6 +64,12 @@ impl Response { self.status } + /// Get the HTTP `Version` of this `Response`. + #[inline] + pub fn version(&self) -> Version { + self.version + } + /// Get the `Headers` of this `Response`. #[inline] pub fn headers(&self) -> &HeaderMap { @@ -83,6 +82,20 @@ impl Response { &mut self.headers } + /// Get the content-length of this response, if known. + /// + /// Reasons it may not be known: + /// + /// - The server didn't send a `content-length` header. + /// - The response is gzipped and automatically decoded (thus changing + /// the actual decoded length). + pub fn content_length(&self) -> Option { + self.headers() + .get(CONTENT_LENGTH) + .and_then(|ct_len| ct_len.to_str().ok()) + .and_then(|ct_len| ct_len.parse().ok()) + } + /// Retrieve the cookies contained in the response. /// /// Note that invalid 'Set-Cookie' headers will be ignored. @@ -103,57 +116,55 @@ impl Response { .map(|info| info.remote_addr()) } - /// Get the content-length of this response, if known. + // body methods + + /// Get the full response text. /// - /// Reasons it may not be known: + /// This method decodes the response body with BOM sniffing + /// and with malformed sequences replaced with the REPLACEMENT CHARACTER. + /// Encoding is determinated from the `charset` parameter of `Content-Type` header, + /// and defaults to `utf-8` if not presented. /// - /// - The server didn't send a `content-length` header. - /// - The response is gzipped and automatically decoded (thus changing - /// the actual decoded length). - pub fn content_length(&self) -> Option { - self.headers() - .get(CONTENT_LENGTH) - .and_then(|ct_len| ct_len.to_str().ok()) - .and_then(|ct_len| ct_len.parse().ok()) - } - - /// Consumes the response, returning the body - pub fn into_body(self) -> Decoder { - self.body - } - - /// Get a reference to the response body. - #[inline] - pub fn body(&self) -> &Decoder { - &self.body - } - - /// Get a mutable reference to the response body. + /// # Example /// - /// The chunks from the body may be decoded, depending on the `gzip` - /// option on the `ClientBuilder`. - #[inline] - pub fn body_mut(&mut self) -> &mut Decoder { - &mut self.body + /// ``` + /// # async fn run() -> Result<(), Box> { + /// let content = reqwest::get("http://httpbin.org/range/26") + /// .await? + /// .text() + /// .await?; + /// + /// println!("text: {:?}", content); + /// # Ok(()) + /// # } + /// ``` + pub async fn text(self) -> crate::Result { + self.text_with_charset("utf-8").await } - /// Get the HTTP `Version` of this `Response`. - #[inline] - pub fn version(&self) -> Version { - self.version - } - - /// Get the response text - pub fn text(&mut self) -> impl Future> { - self.text_with_charset("utf-8") - } - - /// Get the response text given a specific encoding - pub fn text_with_charset( - &mut self, - default_encoding: &str, - ) -> impl Future> { - let body = mem::replace(&mut self.body, Decoder::empty()); + /// Get the full response text given a specific encoding. + /// + /// This method decodes the response body with BOM sniffing + /// and with malformed sequences replaced with the REPLACEMENT CHARACTER. + /// You can provide a default encoding for decoding the raw message, while the + /// `charset` parameter of `Content-Type` header is still prioritized. For more information + /// about the possible encoding name, please go to + /// https://docs.rs/encoding_rs/0.8.17/encoding_rs/#relationship-with-windows-code-pages + /// + /// # Example + /// + /// ``` + /// # async fn run() -> Result<(), Box> { + /// let content = reqwest::get("http://httpbin.org/range/26") + /// .await? + /// .text_with_charset("utf-8") + /// .await?; + /// + /// println!("text: {:?}", content); + /// # Ok(()) + /// # } + /// ``` + pub async fn text_with_charset(self, default_encoding: &str) -> crate::Result { let content_type = self .headers .get(crate::header::CONTENT_TYPE) @@ -164,23 +175,106 @@ impl Response { .and_then(|mime| mime.get_param("charset").map(|charset| charset.as_str())) .unwrap_or(default_encoding); let encoding = Encoding::for_label(encoding_name.as_bytes()).unwrap_or(UTF_8); - Text { - concat: body.try_concat().boxed(), - encoding, + + let full = self.bytes().await?; + + let (text, _, _) = encoding.decode(&full); + if let Cow::Owned(s) = text { + return Ok(s); + } + unsafe { + // decoding returned Cow::Borrowed, meaning these bytes + // are already valid utf8 + Ok(String::from_utf8_unchecked(full.to_vec())) } } - /// Try to deserialize the response body as JSON using `serde`. - #[inline] - pub fn json(&mut self) -> impl Future> { - let body = mem::replace(&mut self.body, Decoder::empty()); + /// Try to deserialize the response body as JSON. + /// + /// # Examples + /// + /// ``` + /// # extern crate reqwest; + /// # extern crate serde; + /// # + /// # use reqwest::Error; + /// # use serde::Deserialize; + /// # + /// #[derive(Deserialize)] + /// struct Ip { + /// origin: String, + /// } + /// + /// # async fn run() -> Result<(), Error> { + /// let ip = reqwest::get("http://httpbin.org/ip") + /// .await? + /// .json::() + /// .await?; + /// + /// println!("ip: {}", ip.origin); + /// # Ok(()) + /// # } + /// # + /// # fn main() { } + /// ``` + /// + /// # Errors + /// + /// This method fails whenever the response body is not in JSON format + /// or it cannot be properly deserialized to target type `T`. For more + /// details please see [`serde_json::from_reader`]. + /// [`serde_json::from_reader`]: https://docs.serde.rs/serde_json/fn.from_reader.html + pub async fn json(self) -> crate::Result { + let full = self.bytes().await?; - Json { - concat: body.try_concat().boxed(), - _marker: PhantomData, + serde_json::from_slice(&full).map_err(crate::error::from) + } + + /// Get the full response body as `Bytes`. + /// + /// # Example + /// + /// ``` + /// # async fn run() -> Result<(), Box> { + /// let bytes = reqwest::get("http://httpbin.org/ip") + /// .await? + /// .bytes() + /// .await?; + /// + /// println!("bytes: {:?}", bytes); + /// # Ok(()) + /// # } + /// ``` + pub async fn bytes(self) -> crate::Result { + self.body.try_concat().await + } + + /// Stream a chunk of the response body. + /// + /// When the response body has been exhausted, this will return `None`. + /// + /// # Example + /// + /// ``` + /// # async fn run() -> Result<(), Box> { + /// let mut res = reqwest::get("https://hyper.rs").await?; + /// + /// while let Some(chunk) = res.chunk().await? { + /// println!("Chunk: {:?}", chunk); + /// } + /// # Ok(()) + /// # } + /// ``` + pub async fn chunk(&mut self) -> crate::Result> { + if let Some(item) = self.body.next().await { + Ok(Some(item?)) + } else { + Ok(None) } } + // util methods + /// Turn a response into an error if the server returned an error. /// /// # Example @@ -202,7 +296,6 @@ impl Response { /// } /// # fn main() {} /// ``` - #[inline] pub fn error_for_status(self) -> crate::Result { if self.status.is_client_error() || self.status.is_server_error() { Err(crate::error::status_code(*self.url, self.status)) @@ -232,7 +325,6 @@ impl Response { /// } /// # fn main() {} /// ``` - #[inline] pub fn error_for_status_ref(&self) -> crate::Result<&Self> { if self.status.is_client_error() || self.status.is_server_error() { Err(crate::error::status_code(*self.url.clone(), self.status)) @@ -240,6 +332,17 @@ impl Response { Ok(self) } } + + // private + + // The Response's body is an implementation detail. + // You no longer need to get a reference to it, there are async methods + // on the `Response` itself. + // + // This method is just used by the blocking API. + pub(crate) fn body_mut(&mut self) -> &mut Decoder { + &mut self.body + } } impl fmt::Debug for Response { @@ -273,68 +376,10 @@ impl> From> for Response { } } -/// A JSON object. -struct Json { - concat: ConcatDecoder, - _marker: PhantomData, -} - -impl Json { - fn concat(self: Pin<&mut Self>) -> Pin<&mut ConcatDecoder> { - unsafe { Pin::map_unchecked_mut(self, |x| &mut x.concat) } - } -} - -impl Future for Json { - type Output = Result; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match futures::ready!(self.concat().as_mut().poll(cx)) { - Err(e) => Poll::Ready(Err(e)), - Ok(chunk) => { - let t = serde_json::from_slice(&chunk).map_err(crate::error::from); - Poll::Ready(t) - } - } - } -} - -impl fmt::Debug for Json { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("Json").finish() - } -} - -//#[derive(Debug)] -struct Text { - concat: ConcatDecoder, - encoding: &'static Encoding, -} - -impl Text { - fn concat(self: Pin<&mut Self>) -> Pin<&mut ConcatDecoder> { - unsafe { Pin::map_unchecked_mut(self, |x| &mut x.concat) } - } -} - -impl Future for Text { - type Output = Result; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match futures::ready!(self.as_mut().concat().as_mut().poll(cx)) { - Err(e) => Poll::Ready(Err(e)), - Ok(chunk) => { - let (text, _, _) = self.as_mut().encoding.decode(&chunk); - if let Cow::Owned(s) = text { - return Poll::Ready(Ok(s)); - } - unsafe { - // decoding returned Cow::Borrowed, meaning these bytes - // are already valid utf8 - Poll::Ready(Ok(String::from_utf8_unchecked(chunk.to_vec()))) - } - } - } +/// A `Response` can be piped as the `Body` of another request. +impl From for Body { + fn from(r: Response) -> Body { + Body::wrap_stream(r.body) } } diff --git a/src/blocking/response.rs b/src/blocking/response.rs index 798a622..49a5b22 100644 --- a/src/blocking/response.rs +++ b/src/blocking/response.rs @@ -203,8 +203,7 @@ impl Response { /// or it cannot be properly deserialized to target type `T`. For more /// details please see [`serde_json::from_reader`]. /// [`serde_json::from_reader`]: https://docs.serde.rs/serde_json/fn.from_reader.html - #[inline] - pub fn json(&mut self) -> crate::Result { + pub fn json(self) -> crate::Result { wait::timeout(self.inner.json(), self.timeout).map_err(|e| match e { wait::Waited::TimedOut => crate::error::timedout(None), wait::Waited::Executor(e) => crate::error::from(e), @@ -228,12 +227,7 @@ impl Response { /// # Ok(()) /// # } /// ``` - /// - /// # Note - /// - /// This consumes the body. Trying to read more, or use of `response.json()` - /// will return empty values. - pub fn text(&mut self) -> crate::Result { + pub fn text(self) -> crate::Result { self.text_with_charset("utf-8") } @@ -256,12 +250,7 @@ impl Response { /// # Ok(()) /// # } /// ``` - /// - /// # Note - /// - /// This consumes the body. Trying to read more, or use of `response.json()` - /// will return empty values. - pub fn text_with_charset(&mut self, default_encoding: &str) -> crate::Result { + pub fn text_with_charset(self, default_encoding: &str) -> crate::Result { wait::timeout(self.inner.text_with_charset(default_encoding), self.timeout).map_err(|e| { match e { wait::Waited::TimedOut => crate::error::timedout(None), diff --git a/src/lib.rs b/src/lib.rs index 524ec34..c45ae78 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -187,8 +187,8 @@ pub use hyper::{StatusCode, Version}; pub use url::ParseError as UrlError; pub use url::Url; -pub use self::r#async::{ - multipart, Body, Client, ClientBuilder, Decoder, Request, RequestBuilder, Response, +pub use self::async_impl::{ + multipart, Body, Client, ClientBuilder, Request, RequestBuilder, Response, }; //pub use self::body::Body; //pub use self::client::{Client, ClientBuilder}; @@ -223,7 +223,7 @@ mod tls; #[deprecated(note = "types moved to top of crate")] pub mod r#async { pub use crate::async_impl::{ - multipart, Body, Chunk, Client, ClientBuilder, Decoder, Request, RequestBuilder, Response, + multipart, Body, Client, ClientBuilder, Request, RequestBuilder, Response, }; } diff --git a/tests/blocking.rs b/tests/blocking.rs index d41c0f4..646cd6e 100644 --- a/tests/blocking.rs +++ b/tests/blocking.rs @@ -22,7 +22,7 @@ fn test_response_text() { }; let url = format!("http://{}/text", server.addr()); - let mut res = reqwest::blocking::get(&url).unwrap(); + let res = reqwest::blocking::get(&url).unwrap(); assert_eq!(res.url().as_str(), &url); assert_eq!(res.status(), reqwest::StatusCode::OK); assert_eq!(res.headers().get(reqwest::header::SERVER).unwrap(), &"test"); @@ -57,7 +57,7 @@ fn test_response_non_utf_8_text() { }; let url = format!("http://{}/text", server.addr()); - let mut res = reqwest::blocking::get(&url).unwrap(); + let res = reqwest::blocking::get(&url).unwrap(); assert_eq!(res.url().as_str(), &url); assert_eq!(res.status(), reqwest::StatusCode::OK); assert_eq!(res.headers().get(reqwest::header::SERVER).unwrap(), &"test"); @@ -92,7 +92,7 @@ fn test_response_json() { }; let url = format!("http://{}/json", server.addr()); - let mut res = reqwest::blocking::get(&url).unwrap(); + let res = reqwest::blocking::get(&url).unwrap(); assert_eq!(res.url().as_str(), &url); assert_eq!(res.status(), reqwest::StatusCode::OK); assert_eq!(res.headers().get(reqwest::header::SERVER).unwrap(), &"test"); @@ -126,7 +126,7 @@ fn test_response_copy_to() { }; let url = format!("http://{}/1", server.addr()); - let mut res = reqwest::blocking::get(&url).unwrap(); + let res = reqwest::blocking::get(&url).unwrap(); assert_eq!(res.url().as_str(), &url); assert_eq!(res.status(), reqwest::StatusCode::OK); assert_eq!(res.headers().get(reqwest::header::SERVER).unwrap(), &"test"); @@ -158,7 +158,7 @@ fn test_get() { }; let url = format!("http://{}/1", server.addr()); - let mut res = reqwest::blocking::get(&url).unwrap(); + let res = reqwest::blocking::get(&url).unwrap(); assert_eq!(res.url().as_str(), &url); assert_eq!(res.status(), reqwest::StatusCode::OK); @@ -194,7 +194,7 @@ fn test_post() { }; let url = format!("http://{}/2", server.addr()); - let mut res = reqwest::blocking::Client::new() + let res = reqwest::blocking::Client::new() .post(&url) .body("Hello") .send() diff --git a/tests/client.rs b/tests/client.rs index 598e812..aaa1621 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -4,8 +4,6 @@ mod support; use std::io::Write; use std::time::Duration; -use futures::TryStreamExt; - use reqwest::multipart::{Form, Part}; use reqwest::{Body, Client}; @@ -44,7 +42,7 @@ async fn response_text() { let client = Client::new(); - let mut res = client + let res = client .get(&format!("http://{}/text", server.addr())) .send() .await @@ -76,7 +74,7 @@ async fn response_json() { let client = Client::new(); - let mut res = client + let res = client .get(&format!("http://{}/json", server.addr())) .send() .await @@ -89,9 +87,12 @@ async fn response_json() { async fn multipart() { let _ = env_logger::try_init(); - let stream = futures::stream::once(futures::future::ready::>(Ok( - hyper::Chunk::from("part1 part2".to_owned()), - ))); + let stream = reqwest::Body::wrap_stream(futures::stream::once(futures::future::ready(Ok::< + _, + reqwest::Error, + >( + "part1 part2".to_owned(), + )))); let part = Part::stream(stream); let form = Form::new().text("foo", "bar").part("part_stream", part); @@ -221,7 +222,7 @@ async fn response_timeout() { let url = format!("http://{}/slow", server.addr()); let res = client.get(&url).send().await.expect("Failed to get"); - let body: Result<_, _> = res.into_body().try_concat().await; + let body = res.text().await; let err = body.unwrap_err(); @@ -269,7 +270,7 @@ async fn gzip_case(response_size: usize, chunk_size: usize) { let client = Client::new(); - let mut res = client + let res = client .get(&format!("http://{}/gzip", server.addr())) .send() .await @@ -323,3 +324,66 @@ async fn body_stream() { assert_eq!(res.url().as_str(), &url); assert_eq!(res.status(), reqwest::StatusCode::OK); } + +#[tokio::test] +async fn body_pipe_response() { + let _ = env_logger::try_init(); + + let server = server! { + request: b"\ + GET /get HTTP/1.1\r\n\ + user-agent: $USERAGENT\r\n\ + accept: */*\r\n\ + accept-encoding: gzip\r\n\ + host: $HOST\r\n\ + \r\n\ + ", + response: b"\ + HTTP/1.1 200 OK\r\n\ + Server: pipe\r\n\ + Content-Length: 7\r\n\ + \r\n\ + pipe me\ + "; + + request: b"\ + POST /pipe HTTP/1.1\r\n\ + user-agent: $USERAGENT\r\n\ + accept: */*\r\n\ + accept-encoding: gzip\r\n\ + host: $HOST\r\n\ + transfer-encoding: chunked\r\n\ + \r\n\ + 7\r\n\ + pipe me\r\n\ + 0\r\n\r\n\ + ", + response: b"\ + HTTP/1.1 200 OK\r\n\ + Server: pipe\r\n\ + Content-Length: 0\r\n\ + \r\n\ + " + }; + + let client = Client::new(); + + let res1 = client + .get(&format!("http://{}/get", server.addr())) + .send() + .await + .expect("get1"); + + assert_eq!(res1.status(), reqwest::StatusCode::OK); + assert_eq!(res1.content_length(), Some(7)); + + // and now ensure we can "pipe" the response to another request + let res2 = client + .post(&format!("http://{}/pipe", server.addr())) + .body(res1) + .send() + .await + .expect("res2"); + + assert_eq!(res2.status(), reqwest::StatusCode::OK); +} diff --git a/tests/timeouts.rs b/tests/timeouts.rs index 541c183..94d714c 100644 --- a/tests/timeouts.rs +++ b/tests/timeouts.rs @@ -142,7 +142,7 @@ fn test_read_timeout() { }; let url = format!("http://{}/read-timeout", server.addr()); - let mut res = reqwest::blocking::Client::builder() + let res = reqwest::blocking::Client::builder() .timeout(Duration::from_millis(500)) .build() .unwrap()