refine async API

- Converted `Response::text` and `Response::json` to `async fn`
- Added `Response::bytes` async fn as a counterpat to `text`.
- Added `Response::chunk` async fn to stream chunks of the response body.
- Added `From<Response> for Body` to allow piping a response as a request body.
- Removed `Decoder` from public API
- Removed body accessor methods from `Response`
- Removed `Chunk` type, replaced with `bytes::Bytes`.
- Removed public `impl Stream for Body`.
This commit is contained in:
Sean McArthur
2019-09-10 11:39:50 -07:00
parent 87a09322d6
commit 5356776834
11 changed files with 337 additions and 370 deletions

View File

@@ -2,7 +2,7 @@
#[tokio::main]
async fn main() -> Result<(), reqwest::Error> {
let mut res = reqwest::Client::new()
let res = reqwest::Client::new()
.get("https://hyper.rs")
.send()
.await?;

View File

@@ -1,4 +1,4 @@
use bytes::{Buf, Bytes};
use bytes::Bytes;
use futures::Stream;
use hyper::body::Payload;
use std::fmt;
@@ -7,11 +7,14 @@ use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::timer::Delay;
/// An asynchronous `Stream`.
/// An asynchronous request body.
pub struct Body {
inner: Inner,
}
// The `Stream` trait isn't stable, so the impl isn't public.
pub(crate) struct ImplStream(Body);
enum Inner {
Reusable(Bytes),
Hyper {
@@ -21,13 +24,6 @@ enum Inner {
}
impl Body {
pub(crate) fn content_length(&self) -> Option<u64> {
match self.inner {
Inner::Reusable(ref bytes) => Some(bytes.len() as u64),
Inner::Hyper { ref body, .. } => body.size_hint().exact(),
}
}
/// Wrap a futures `Stream` in a box inside `Body`.
///
/// # Example
@@ -56,14 +52,12 @@ impl Body {
Body::wrap(hyper::body::Body::wrap_stream(stream))
}
#[inline]
pub(crate) fn response(body: hyper::Body, timeout: Option<Delay>) -> Body {
Body {
inner: Inner::Hyper { body, timeout },
}
}
#[inline]
pub(crate) fn wrap(body: hyper::Body) -> Body {
Body {
inner: Inner::Hyper {
@@ -73,19 +67,16 @@ impl Body {
}
}
#[inline]
pub(crate) fn empty() -> Body {
Body::wrap(hyper::Body::empty())
}
#[inline]
pub(crate) fn reusable(chunk: Bytes) -> Body {
Body {
inner: Inner::Reusable(chunk),
}
}
#[inline]
pub(crate) fn into_hyper(self) -> (Option<Bytes>, hyper::Body) {
match self.inner {
Inner::Reusable(chunk) => (Some(chunk.clone()), chunk.into()),
@@ -96,44 +87,15 @@ impl Body {
}
}
fn inner(self: Pin<&mut Self>) -> Pin<&mut Inner> {
unsafe { Pin::map_unchecked_mut(self, |x| &mut x.inner) }
pub(crate) fn into_stream(self) -> ImplStream {
ImplStream(self)
}
}
impl Stream for Body {
type Item = Result<Chunk, crate::Error>;
#[inline]
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let opt_try_chunk = match self.inner().get_mut() {
Inner::Hyper {
ref mut body,
ref mut timeout,
} => {
if let Some(ref mut timeout) = timeout {
if let Poll::Ready(()) = Pin::new(timeout).poll(cx) {
return Poll::Ready(Some(Err(crate::error::timedout(None))));
}
}
futures::ready!(Pin::new(body).poll_data(cx)).map(|opt_chunk| {
opt_chunk
.map(|c| Chunk { inner: c })
.map_err(crate::error::from)
})
}
Inner::Reusable(ref mut bytes) => {
if bytes.is_empty() {
None
} else {
let chunk = Chunk::from_chunk(bytes.clone());
*bytes = Bytes::new();
Some(Ok(chunk))
}
}
};
Poll::Ready(opt_try_chunk)
pub(crate) fn content_length(&self) -> Option<u64> {
match self.inner {
Inner::Reusable(ref bytes) => Some(bytes.len() as u64),
Inner::Hyper { ref body, .. } => body.size_hint().exact(),
}
}
}
@@ -172,126 +134,41 @@ impl From<&'static str> for Body {
}
}
/// A chunk of bytes for a `Body`.
///
/// A `Chunk` can be treated like `&[u8]`.
#[derive(Default)]
pub struct Chunk {
inner: hyper::Chunk,
}
impl Chunk {
#[inline]
pub(crate) fn from_chunk(chunk: Bytes) -> Chunk {
Chunk {
inner: hyper::Chunk::from(chunk),
}
}
}
impl Buf for Chunk {
fn bytes(&self) -> &[u8] {
self.inner.bytes()
}
fn remaining(&self) -> usize {
self.inner.remaining()
}
fn advance(&mut self, n: usize) {
self.inner.advance(n);
}
}
impl AsRef<[u8]> for Chunk {
#[inline]
fn as_ref(&self) -> &[u8] {
&*self
}
}
impl std::ops::Deref for Chunk {
type Target = [u8];
#[inline]
fn deref(&self) -> &Self::Target {
self.inner.as_ref()
}
}
impl Extend<u8> for Chunk {
fn extend<T>(&mut self, iter: T)
where
T: IntoIterator<Item = u8>,
{
self.inner.extend(iter)
}
}
impl IntoIterator for Chunk {
type Item = u8;
//XXX: exposing type from hyper!
type IntoIter = <hyper::Chunk as IntoIterator>::IntoIter;
fn into_iter(self) -> Self::IntoIter {
self.inner.into_iter()
}
}
impl From<Vec<u8>> for Chunk {
fn from(v: Vec<u8>) -> Chunk {
Chunk { inner: v.into() }
}
}
impl From<&'static [u8]> for Chunk {
fn from(slice: &'static [u8]) -> Chunk {
Chunk {
inner: slice.into(),
}
}
}
impl From<String> for Chunk {
fn from(s: String) -> Chunk {
Chunk { inner: s.into() }
}
}
impl From<&'static str> for Chunk {
fn from(slice: &'static str) -> Chunk {
Chunk {
inner: slice.into(),
}
}
}
impl From<Bytes> for Chunk {
fn from(bytes: Bytes) -> Chunk {
Chunk {
inner: bytes.into(),
}
}
}
impl From<Chunk> for Bytes {
fn from(chunk: Chunk) -> Bytes {
chunk.inner.into()
}
}
impl From<Chunk> for hyper::Chunk {
fn from(val: Chunk) -> hyper::Chunk {
val.inner
}
}
impl fmt::Debug for Body {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Body").finish()
}
}
impl fmt::Debug for Chunk {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
fmt::Debug::fmt(&self.inner, f)
// ===== impl ImplStream =====
impl Stream for ImplStream {
type Item = Result<Bytes, crate::Error>;
#[inline]
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let opt_try_chunk = match self.0.inner {
Inner::Hyper {
ref mut body,
ref mut timeout,
} => {
if let Some(ref mut timeout) = timeout {
if let Poll::Ready(()) = Pin::new(timeout).poll(cx) {
return Poll::Ready(Some(Err(crate::error::timedout(None))));
}
}
futures::ready!(Pin::new(body).poll_data(cx))
.map(|opt_chunk| opt_chunk.map(Into::into).map_err(crate::error::from))
}
Inner::Reusable(ref mut bytes) => {
if bytes.is_empty() {
None
} else {
Some(Ok(std::mem::replace(bytes, Bytes::new())))
}
}
};
Poll::Ready(opt_try_chunk)
}
}

View File

@@ -17,36 +17,38 @@ use std::mem;
use std::pin::Pin;
use std::task::{Context, Poll};
use async_compression::stream::GzipDecoder;
use bytes::Bytes;
use futures::stream::Peekable;
use futures::Stream;
use hyper::header::{CONTENT_ENCODING, CONTENT_LENGTH, TRANSFER_ENCODING};
use hyper::HeaderMap;
use log::warn;
use super::{Body, Chunk};
use super::Body;
use crate::error;
/// A response decompressor over a non-blocking stream of chunks.
///
/// The inner decoder may be constructed asynchronously.
pub struct Decoder {
pub(crate) struct Decoder {
inner: Inner,
}
enum Inner {
/// A `PlainText` decoder just returns the response content as is.
PlainText(Body),
PlainText(super::body::ImplStream),
/// A `Gzip` decoder will uncompress the gzipped response content before returning it.
Gzip(async_compression::stream::GzipDecoder<futures::stream::Peekable<BodyBytes>>),
Gzip(GzipDecoder<Peekable<IoStream>>),
/// A decoder that doesn't have a value yet.
Pending(Pending),
}
/// A future attempt to poll the response body for EOF so we know whether to use gzip or not.
struct Pending(futures::stream::Peekable<BodyBytes>);
struct Pending(Peekable<IoStream>);
struct BodyBytes(Body);
struct IoStream(super::body::ImplStream);
impl fmt::Debug for Decoder {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
@@ -59,9 +61,9 @@ impl Decoder {
///
/// This decoder will produce a single 0 byte chunk.
#[inline]
pub fn empty() -> Decoder {
pub(crate) fn empty() -> Decoder {
Decoder {
inner: Inner::PlainText(Body::empty()),
inner: Inner::PlainText(Body::empty().into_stream()),
}
}
@@ -70,7 +72,7 @@ impl Decoder {
/// This decoder will emit the underlying chunks as-is.
fn plain_text(body: Body) -> Decoder {
Decoder {
inner: Inner::PlainText(body),
inner: Inner::PlainText(body.into_stream()),
}
}
@@ -81,7 +83,7 @@ impl Decoder {
use futures::stream::StreamExt;
Decoder {
inner: Inner::Pending(Pending(BodyBytes(body).peekable())),
inner: Inner::Pending(Pending(IoStream(body.into_stream()).peekable())),
}
}
@@ -128,7 +130,7 @@ impl Decoder {
}
impl Stream for Decoder {
type Item = Result<Chunk, error::Error>;
type Item = Result<Bytes, error::Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
// Do a read or poll for a pending decoder value.
@@ -141,7 +143,7 @@ impl Stream for Decoder {
Inner::PlainText(ref mut body) => return Pin::new(body).poll_next(cx),
Inner::Gzip(ref mut decoder) => {
return match futures::ready!(Pin::new(decoder).poll_next(cx)) {
Some(Ok(bytes)) => Poll::Ready(Some(Ok(bytes.into()))),
Some(Ok(bytes)) => Poll::Ready(Some(Ok(bytes))),
Some(Err(err)) => Poll::Ready(Some(Err(crate::error::from_io(err)))),
None => Poll::Ready(None),
}
@@ -169,22 +171,23 @@ impl Future for Pending {
.expect("just peeked Some")
.unwrap_err()));
}
None => return Poll::Ready(Ok(Inner::PlainText(Body::empty()))),
None => return Poll::Ready(Ok(Inner::PlainText(Body::empty().into_stream()))),
};
let body = mem::replace(&mut self.0, BodyBytes(Body::empty()).peekable());
Poll::Ready(Ok(Inner::Gzip(
async_compression::stream::GzipDecoder::new(body),
)))
let body = mem::replace(
&mut self.0,
IoStream(Body::empty().into_stream()).peekable(),
);
Poll::Ready(Ok(Inner::Gzip(GzipDecoder::new(body))))
}
}
impl Stream for BodyBytes {
impl Stream for IoStream {
type Item = Result<Bytes, std::io::Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
match futures::ready!(Pin::new(&mut self.0).poll_next(cx)) {
Some(Ok(chunk)) => Poll::Ready(Some(Ok(chunk.into()))),
Some(Ok(chunk)) => Poll::Ready(Some(Ok(chunk))),
Some(Err(err)) => Poll::Ready(Some(Err(err.into_io()))),
None => Poll::Ready(None),
}

View File

@@ -1,6 +1,6 @@
pub use self::body::{Body, Chunk};
pub use self::body::Body;
pub use self::client::{Client, ClientBuilder};
pub use self::decoder::Decoder;
pub(crate) use self::decoder::Decoder;
pub use self::request::{Request, RequestBuilder};
pub use self::response::{Response, ResponseBuilderExt};

View File

@@ -7,7 +7,7 @@ use mime_guess::Mime;
use percent_encoding::{self, AsciiSet, NON_ALPHANUMERIC};
use uuid::Uuid;
use futures::{Stream, StreamExt};
use futures::StreamExt;
use super::Body;
@@ -137,7 +137,7 @@ impl Form {
hyper::Body::wrap_stream(
boundary
.chain(header)
.chain(hyper::Body::wrap_stream(part.value))
.chain(hyper::Body::wrap_stream(part.value.into_stream()))
.chain(hyper::Body::from("\r\n".to_owned())),
)
}
@@ -190,15 +190,8 @@ impl Part {
}
/// Makes a new parameter from an arbitrary stream.
pub fn stream<T, I, E>(value: T) -> Part
where
T: Stream<Item = Result<I, E>> + Send + Sync + 'static,
E: std::error::Error + Send + Sync + 'static,
hyper::Chunk: std::convert::From<I>,
{
Part::new(Body::wrap(hyper::Body::wrap_stream(
value.map(|chunk| chunk.into()),
)))
pub fn stream<T: Into<Body>>(value: T) -> Part {
Part::new(value.into())
}
fn new(value: Body) -> Part {
@@ -500,21 +493,17 @@ mod tests {
let mut form = Form::new()
.part(
"reader1",
Part::stream(futures::stream::once(futures::future::ready::<
Result<hyper::Chunk, hyper::Error>,
>(Ok(
hyper::Chunk::from("part1".to_owned()),
)))),
Part::stream(Body::wrap_stream(futures::stream::once(
futures::future::ready::<Result<String, crate::Error>>(Ok("part1".to_owned())),
))),
)
.part("key1", Part::text("value1"))
.part("key2", Part::text("value2").mime(mime::IMAGE_BMP))
.part(
"reader2",
Part::stream(futures::stream::once(futures::future::ready::<
Result<hyper::Chunk, hyper::Error>,
>(Ok(
hyper::Chunk::from("part2".to_owned()),
)))),
Part::stream(Body::wrap_stream(futures::stream::once(
futures::future::ready::<Result<String, crate::Error>>(Ok("part2".to_owned())),
))),
)
.part("key3", Part::text("value3").file_name("filename"));
form.inner.boundary = "boundary".to_string();

View File

@@ -1,13 +1,10 @@
use std::borrow::Cow;
use std::fmt;
use std::marker::PhantomData;
use std::mem;
use std::net::SocketAddr;
use std::pin::Pin;
use std::task::{Context, Poll};
use bytes::Bytes;
use encoding_rs::{Encoding, UTF_8};
use futures::{Future, FutureExt, TryStreamExt};
use futures::{StreamExt, TryStreamExt};
use http;
use hyper::client::connect::HttpInfo;
use hyper::header::CONTENT_LENGTH;
@@ -21,12 +18,8 @@ use url::Url;
use super::body::Body;
use super::Decoder;
use crate::async_impl::Chunk;
use crate::cookie;
/// https://github.com/rust-lang-nursery/futures-rs/issues/1812
type ConcatDecoder = Pin<Box<dyn Future<Output = Result<Chunk, crate::Error>> + Send>>;
/// A Response to a submitted `Request`.
pub struct Response {
status: StatusCode,
@@ -71,6 +64,12 @@ impl Response {
self.status
}
/// Get the HTTP `Version` of this `Response`.
#[inline]
pub fn version(&self) -> Version {
self.version
}
/// Get the `Headers` of this `Response`.
#[inline]
pub fn headers(&self) -> &HeaderMap {
@@ -83,6 +82,20 @@ impl Response {
&mut self.headers
}
/// Get the content-length of this response, if known.
///
/// Reasons it may not be known:
///
/// - The server didn't send a `content-length` header.
/// - The response is gzipped and automatically decoded (thus changing
/// the actual decoded length).
pub fn content_length(&self) -> Option<u64> {
self.headers()
.get(CONTENT_LENGTH)
.and_then(|ct_len| ct_len.to_str().ok())
.and_then(|ct_len| ct_len.parse().ok())
}
/// Retrieve the cookies contained in the response.
///
/// Note that invalid 'Set-Cookie' headers will be ignored.
@@ -103,57 +116,55 @@ impl Response {
.map(|info| info.remote_addr())
}
/// Get the content-length of this response, if known.
// body methods
/// Get the full response text.
///
/// Reasons it may not be known:
/// This method decodes the response body with BOM sniffing
/// and with malformed sequences replaced with the REPLACEMENT CHARACTER.
/// Encoding is determinated from the `charset` parameter of `Content-Type` header,
/// and defaults to `utf-8` if not presented.
///
/// - The server didn't send a `content-length` header.
/// - The response is gzipped and automatically decoded (thus changing
/// the actual decoded length).
pub fn content_length(&self) -> Option<u64> {
self.headers()
.get(CONTENT_LENGTH)
.and_then(|ct_len| ct_len.to_str().ok())
.and_then(|ct_len| ct_len.parse().ok())
}
/// Consumes the response, returning the body
pub fn into_body(self) -> Decoder {
self.body
}
/// Get a reference to the response body.
#[inline]
pub fn body(&self) -> &Decoder {
&self.body
}
/// Get a mutable reference to the response body.
/// # Example
///
/// The chunks from the body may be decoded, depending on the `gzip`
/// option on the `ClientBuilder`.
#[inline]
pub fn body_mut(&mut self) -> &mut Decoder {
&mut self.body
/// ```
/// # async fn run() -> Result<(), Box<dyn std::error::Error>> {
/// let content = reqwest::get("http://httpbin.org/range/26")
/// .await?
/// .text()
/// .await?;
///
/// println!("text: {:?}", content);
/// # Ok(())
/// # }
/// ```
pub async fn text(self) -> crate::Result<String> {
self.text_with_charset("utf-8").await
}
/// Get the HTTP `Version` of this `Response`.
#[inline]
pub fn version(&self) -> Version {
self.version
}
/// Get the response text
pub fn text(&mut self) -> impl Future<Output = Result<String, crate::Error>> {
self.text_with_charset("utf-8")
}
/// Get the response text given a specific encoding
pub fn text_with_charset(
&mut self,
default_encoding: &str,
) -> impl Future<Output = Result<String, crate::Error>> {
let body = mem::replace(&mut self.body, Decoder::empty());
/// Get the full response text given a specific encoding.
///
/// This method decodes the response body with BOM sniffing
/// and with malformed sequences replaced with the REPLACEMENT CHARACTER.
/// You can provide a default encoding for decoding the raw message, while the
/// `charset` parameter of `Content-Type` header is still prioritized. For more information
/// about the possible encoding name, please go to
/// https://docs.rs/encoding_rs/0.8.17/encoding_rs/#relationship-with-windows-code-pages
///
/// # Example
///
/// ```
/// # async fn run() -> Result<(), Box<dyn std::error::Error>> {
/// let content = reqwest::get("http://httpbin.org/range/26")
/// .await?
/// .text_with_charset("utf-8")
/// .await?;
///
/// println!("text: {:?}", content);
/// # Ok(())
/// # }
/// ```
pub async fn text_with_charset(self, default_encoding: &str) -> crate::Result<String> {
let content_type = self
.headers
.get(crate::header::CONTENT_TYPE)
@@ -164,23 +175,106 @@ impl Response {
.and_then(|mime| mime.get_param("charset").map(|charset| charset.as_str()))
.unwrap_or(default_encoding);
let encoding = Encoding::for_label(encoding_name.as_bytes()).unwrap_or(UTF_8);
Text {
concat: body.try_concat().boxed(),
encoding,
let full = self.bytes().await?;
let (text, _, _) = encoding.decode(&full);
if let Cow::Owned(s) = text {
return Ok(s);
}
unsafe {
// decoding returned Cow::Borrowed, meaning these bytes
// are already valid utf8
Ok(String::from_utf8_unchecked(full.to_vec()))
}
}
/// Try to deserialize the response body as JSON using `serde`.
#[inline]
pub fn json<T: DeserializeOwned>(&mut self) -> impl Future<Output = Result<T, crate::Error>> {
let body = mem::replace(&mut self.body, Decoder::empty());
/// Try to deserialize the response body as JSON.
///
/// # Examples
///
/// ```
/// # extern crate reqwest;
/// # extern crate serde;
/// #
/// # use reqwest::Error;
/// # use serde::Deserialize;
/// #
/// #[derive(Deserialize)]
/// struct Ip {
/// origin: String,
/// }
///
/// # async fn run() -> Result<(), Error> {
/// let ip = reqwest::get("http://httpbin.org/ip")
/// .await?
/// .json::<Ip>()
/// .await?;
///
/// println!("ip: {}", ip.origin);
/// # Ok(())
/// # }
/// #
/// # fn main() { }
/// ```
///
/// # Errors
///
/// This method fails whenever the response body is not in JSON format
/// or it cannot be properly deserialized to target type `T`. For more
/// details please see [`serde_json::from_reader`].
/// [`serde_json::from_reader`]: https://docs.serde.rs/serde_json/fn.from_reader.html
pub async fn json<T: DeserializeOwned>(self) -> crate::Result<T> {
let full = self.bytes().await?;
Json {
concat: body.try_concat().boxed(),
_marker: PhantomData,
serde_json::from_slice(&full).map_err(crate::error::from)
}
/// Get the full response body as `Bytes`.
///
/// # Example
///
/// ```
/// # async fn run() -> Result<(), Box<dyn std::error::Error>> {
/// let bytes = reqwest::get("http://httpbin.org/ip")
/// .await?
/// .bytes()
/// .await?;
///
/// println!("bytes: {:?}", bytes);
/// # Ok(())
/// # }
/// ```
pub async fn bytes(self) -> crate::Result<Bytes> {
self.body.try_concat().await
}
/// Stream a chunk of the response body.
///
/// When the response body has been exhausted, this will return `None`.
///
/// # Example
///
/// ```
/// # async fn run() -> Result<(), Box<dyn std::error::Error>> {
/// let mut res = reqwest::get("https://hyper.rs").await?;
///
/// while let Some(chunk) = res.chunk().await? {
/// println!("Chunk: {:?}", chunk);
/// }
/// # Ok(())
/// # }
/// ```
pub async fn chunk(&mut self) -> crate::Result<Option<Bytes>> {
if let Some(item) = self.body.next().await {
Ok(Some(item?))
} else {
Ok(None)
}
}
// util methods
/// Turn a response into an error if the server returned an error.
///
/// # Example
@@ -202,7 +296,6 @@ impl Response {
/// }
/// # fn main() {}
/// ```
#[inline]
pub fn error_for_status(self) -> crate::Result<Self> {
if self.status.is_client_error() || self.status.is_server_error() {
Err(crate::error::status_code(*self.url, self.status))
@@ -232,7 +325,6 @@ impl Response {
/// }
/// # fn main() {}
/// ```
#[inline]
pub fn error_for_status_ref(&self) -> crate::Result<&Self> {
if self.status.is_client_error() || self.status.is_server_error() {
Err(crate::error::status_code(*self.url.clone(), self.status))
@@ -240,6 +332,17 @@ impl Response {
Ok(self)
}
}
// private
// The Response's body is an implementation detail.
// You no longer need to get a reference to it, there are async methods
// on the `Response` itself.
//
// This method is just used by the blocking API.
pub(crate) fn body_mut(&mut self) -> &mut Decoder {
&mut self.body
}
}
impl fmt::Debug for Response {
@@ -273,68 +376,10 @@ impl<T: Into<Body>> From<http::Response<T>> for Response {
}
}
/// A JSON object.
struct Json<T> {
concat: ConcatDecoder,
_marker: PhantomData<T>,
}
impl<T> Json<T> {
fn concat(self: Pin<&mut Self>) -> Pin<&mut ConcatDecoder> {
unsafe { Pin::map_unchecked_mut(self, |x| &mut x.concat) }
}
}
impl<T: DeserializeOwned> Future for Json<T> {
type Output = Result<T, crate::Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match futures::ready!(self.concat().as_mut().poll(cx)) {
Err(e) => Poll::Ready(Err(e)),
Ok(chunk) => {
let t = serde_json::from_slice(&chunk).map_err(crate::error::from);
Poll::Ready(t)
}
}
}
}
impl<T> fmt::Debug for Json<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Json").finish()
}
}
//#[derive(Debug)]
struct Text {
concat: ConcatDecoder,
encoding: &'static Encoding,
}
impl Text {
fn concat(self: Pin<&mut Self>) -> Pin<&mut ConcatDecoder> {
unsafe { Pin::map_unchecked_mut(self, |x| &mut x.concat) }
}
}
impl Future for Text {
type Output = Result<String, crate::Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match futures::ready!(self.as_mut().concat().as_mut().poll(cx)) {
Err(e) => Poll::Ready(Err(e)),
Ok(chunk) => {
let (text, _, _) = self.as_mut().encoding.decode(&chunk);
if let Cow::Owned(s) = text {
return Poll::Ready(Ok(s));
}
unsafe {
// decoding returned Cow::Borrowed, meaning these bytes
// are already valid utf8
Poll::Ready(Ok(String::from_utf8_unchecked(chunk.to_vec())))
}
}
}
/// A `Response` can be piped as the `Body` of another request.
impl From<Response> for Body {
fn from(r: Response) -> Body {
Body::wrap_stream(r.body)
}
}

View File

@@ -203,8 +203,7 @@ impl Response {
/// or it cannot be properly deserialized to target type `T`. For more
/// details please see [`serde_json::from_reader`].
/// [`serde_json::from_reader`]: https://docs.serde.rs/serde_json/fn.from_reader.html
#[inline]
pub fn json<T: DeserializeOwned>(&mut self) -> crate::Result<T> {
pub fn json<T: DeserializeOwned>(self) -> crate::Result<T> {
wait::timeout(self.inner.json(), self.timeout).map_err(|e| match e {
wait::Waited::TimedOut => crate::error::timedout(None),
wait::Waited::Executor(e) => crate::error::from(e),
@@ -228,12 +227,7 @@ impl Response {
/// # Ok(())
/// # }
/// ```
///
/// # Note
///
/// This consumes the body. Trying to read more, or use of `response.json()`
/// will return empty values.
pub fn text(&mut self) -> crate::Result<String> {
pub fn text(self) -> crate::Result<String> {
self.text_with_charset("utf-8")
}
@@ -256,12 +250,7 @@ impl Response {
/// # Ok(())
/// # }
/// ```
///
/// # Note
///
/// This consumes the body. Trying to read more, or use of `response.json()`
/// will return empty values.
pub fn text_with_charset(&mut self, default_encoding: &str) -> crate::Result<String> {
pub fn text_with_charset(self, default_encoding: &str) -> crate::Result<String> {
wait::timeout(self.inner.text_with_charset(default_encoding), self.timeout).map_err(|e| {
match e {
wait::Waited::TimedOut => crate::error::timedout(None),

View File

@@ -187,8 +187,8 @@ pub use hyper::{StatusCode, Version};
pub use url::ParseError as UrlError;
pub use url::Url;
pub use self::r#async::{
multipart, Body, Client, ClientBuilder, Decoder, Request, RequestBuilder, Response,
pub use self::async_impl::{
multipart, Body, Client, ClientBuilder, Request, RequestBuilder, Response,
};
//pub use self::body::Body;
//pub use self::client::{Client, ClientBuilder};
@@ -223,7 +223,7 @@ mod tls;
#[deprecated(note = "types moved to top of crate")]
pub mod r#async {
pub use crate::async_impl::{
multipart, Body, Chunk, Client, ClientBuilder, Decoder, Request, RequestBuilder, Response,
multipart, Body, Client, ClientBuilder, Request, RequestBuilder, Response,
};
}

View File

@@ -22,7 +22,7 @@ fn test_response_text() {
};
let url = format!("http://{}/text", server.addr());
let mut res = reqwest::blocking::get(&url).unwrap();
let res = reqwest::blocking::get(&url).unwrap();
assert_eq!(res.url().as_str(), &url);
assert_eq!(res.status(), reqwest::StatusCode::OK);
assert_eq!(res.headers().get(reqwest::header::SERVER).unwrap(), &"test");
@@ -57,7 +57,7 @@ fn test_response_non_utf_8_text() {
};
let url = format!("http://{}/text", server.addr());
let mut res = reqwest::blocking::get(&url).unwrap();
let res = reqwest::blocking::get(&url).unwrap();
assert_eq!(res.url().as_str(), &url);
assert_eq!(res.status(), reqwest::StatusCode::OK);
assert_eq!(res.headers().get(reqwest::header::SERVER).unwrap(), &"test");
@@ -92,7 +92,7 @@ fn test_response_json() {
};
let url = format!("http://{}/json", server.addr());
let mut res = reqwest::blocking::get(&url).unwrap();
let res = reqwest::blocking::get(&url).unwrap();
assert_eq!(res.url().as_str(), &url);
assert_eq!(res.status(), reqwest::StatusCode::OK);
assert_eq!(res.headers().get(reqwest::header::SERVER).unwrap(), &"test");
@@ -126,7 +126,7 @@ fn test_response_copy_to() {
};
let url = format!("http://{}/1", server.addr());
let mut res = reqwest::blocking::get(&url).unwrap();
let res = reqwest::blocking::get(&url).unwrap();
assert_eq!(res.url().as_str(), &url);
assert_eq!(res.status(), reqwest::StatusCode::OK);
assert_eq!(res.headers().get(reqwest::header::SERVER).unwrap(), &"test");
@@ -158,7 +158,7 @@ fn test_get() {
};
let url = format!("http://{}/1", server.addr());
let mut res = reqwest::blocking::get(&url).unwrap();
let res = reqwest::blocking::get(&url).unwrap();
assert_eq!(res.url().as_str(), &url);
assert_eq!(res.status(), reqwest::StatusCode::OK);
@@ -194,7 +194,7 @@ fn test_post() {
};
let url = format!("http://{}/2", server.addr());
let mut res = reqwest::blocking::Client::new()
let res = reqwest::blocking::Client::new()
.post(&url)
.body("Hello")
.send()

View File

@@ -4,8 +4,6 @@ mod support;
use std::io::Write;
use std::time::Duration;
use futures::TryStreamExt;
use reqwest::multipart::{Form, Part};
use reqwest::{Body, Client};
@@ -44,7 +42,7 @@ async fn response_text() {
let client = Client::new();
let mut res = client
let res = client
.get(&format!("http://{}/text", server.addr()))
.send()
.await
@@ -76,7 +74,7 @@ async fn response_json() {
let client = Client::new();
let mut res = client
let res = client
.get(&format!("http://{}/json", server.addr()))
.send()
.await
@@ -89,9 +87,12 @@ async fn response_json() {
async fn multipart() {
let _ = env_logger::try_init();
let stream = futures::stream::once(futures::future::ready::<Result<_, hyper::Error>>(Ok(
hyper::Chunk::from("part1 part2".to_owned()),
)));
let stream = reqwest::Body::wrap_stream(futures::stream::once(futures::future::ready(Ok::<
_,
reqwest::Error,
>(
"part1 part2".to_owned(),
))));
let part = Part::stream(stream);
let form = Form::new().text("foo", "bar").part("part_stream", part);
@@ -221,7 +222,7 @@ async fn response_timeout() {
let url = format!("http://{}/slow", server.addr());
let res = client.get(&url).send().await.expect("Failed to get");
let body: Result<_, _> = res.into_body().try_concat().await;
let body = res.text().await;
let err = body.unwrap_err();
@@ -269,7 +270,7 @@ async fn gzip_case(response_size: usize, chunk_size: usize) {
let client = Client::new();
let mut res = client
let res = client
.get(&format!("http://{}/gzip", server.addr()))
.send()
.await
@@ -323,3 +324,66 @@ async fn body_stream() {
assert_eq!(res.url().as_str(), &url);
assert_eq!(res.status(), reqwest::StatusCode::OK);
}
#[tokio::test]
async fn body_pipe_response() {
let _ = env_logger::try_init();
let server = server! {
request: b"\
GET /get HTTP/1.1\r\n\
user-agent: $USERAGENT\r\n\
accept: */*\r\n\
accept-encoding: gzip\r\n\
host: $HOST\r\n\
\r\n\
",
response: b"\
HTTP/1.1 200 OK\r\n\
Server: pipe\r\n\
Content-Length: 7\r\n\
\r\n\
pipe me\
";
request: b"\
POST /pipe HTTP/1.1\r\n\
user-agent: $USERAGENT\r\n\
accept: */*\r\n\
accept-encoding: gzip\r\n\
host: $HOST\r\n\
transfer-encoding: chunked\r\n\
\r\n\
7\r\n\
pipe me\r\n\
0\r\n\r\n\
",
response: b"\
HTTP/1.1 200 OK\r\n\
Server: pipe\r\n\
Content-Length: 0\r\n\
\r\n\
"
};
let client = Client::new();
let res1 = client
.get(&format!("http://{}/get", server.addr()))
.send()
.await
.expect("get1");
assert_eq!(res1.status(), reqwest::StatusCode::OK);
assert_eq!(res1.content_length(), Some(7));
// and now ensure we can "pipe" the response to another request
let res2 = client
.post(&format!("http://{}/pipe", server.addr()))
.body(res1)
.send()
.await
.expect("res2");
assert_eq!(res2.status(), reqwest::StatusCode::OK);
}

View File

@@ -142,7 +142,7 @@ fn test_read_timeout() {
};
let url = format!("http://{}/read-timeout", server.addr());
let mut res = reqwest::blocking::Client::builder()
let res = reqwest::blocking::Client::builder()
.timeout(Duration::from_millis(500))
.build()
.unwrap()