From 2cb70c872a586890b102f76df43393a7bccc7a88 Mon Sep 17 00:00:00 2001 From: Ashley Mannix Date: Sat, 19 Aug 2017 16:32:00 +1000 Subject: [PATCH] make body return borrowed decoder --- examples/async.rs | 35 +++++++++++++++++++++++++++++------ src/async_impl/body.rs | 7 +++++++ src/async_impl/decoder.rs | 24 ++++++++++++++++-------- src/async_impl/response.rs | 10 +++++----- src/lib.rs | 1 + src/response.rs | 6 ++++-- tests/async.rs | 9 +++++++-- 7 files changed, 69 insertions(+), 23 deletions(-) diff --git a/examples/async.rs b/examples/async.rs index 06f0378..3cafb5f 100644 --- a/examples/async.rs +++ b/examples/async.rs @@ -1,17 +1,40 @@ -#![deny(warnings)] +#![allow(warnings)] // remove when error_chain is fixed + extern crate futures; extern crate reqwest; extern crate tokio_core; +#[macro_use] +extern crate error_chain; -use futures::Future; +use std::mem; +use std::io::{self, Cursor}; +use futures::{Future, Stream}; +use reqwest::unstable::async::{Client, Decoder}; + +error_chain! { + foreign_links { + ReqError(reqwest::Error); + IoError(io::Error); + } +} fn main() { let mut core = tokio_core::reactor::Core::new().unwrap(); - let client = reqwest::unstable::async::Client::new(&core.handle()).unwrap(); + let client = Client::new(&core.handle()).unwrap(); - let work = client.get("https://hyper.rs").unwrap().send().map(|res| { - println!("{}", res.status()); - }); + let work = client.get("https://hyper.rs").unwrap() + .send() + .map_err(|e| Error::from(e)) + .and_then(|mut res| { + println!("{}", res.status()); + + let body = mem::replace(res.body_mut(), Decoder::empty()); + body.concat2().map_err(Into::into) + }) + .and_then(|body| { + let mut body = Cursor::new(body); + io::copy(&mut body, &mut io::stdout()).map_err(Into::into) + }); core.run(work).unwrap(); } diff --git a/src/async_impl/body.rs b/src/async_impl/body.rs index 7c7bf59..de8ea97 100644 --- a/src/async_impl/body.rs +++ b/src/async_impl/body.rs @@ -66,6 +66,13 @@ pub struct Chunk { inner: ::hyper::Chunk, } +impl AsRef<[u8]> for Chunk { + #[inline] + fn as_ref(&self) -> &[u8] { + &*self + } +} + impl ::std::ops::Deref for Chunk { type Target = [u8]; #[inline] diff --git a/src/async_impl/decoder.rs b/src/async_impl/decoder.rs index 724c1ca..441c8cc 100644 --- a/src/async_impl/decoder.rs +++ b/src/async_impl/decoder.rs @@ -82,6 +82,19 @@ impl fmt::Debug for Decoder { } impl Decoder { + /// An empty decoder. + /// + /// This decoder will produce a single 0 byte chunk. + #[inline] + pub fn empty() -> Decoder { + Decoder { + inner: Inner::PlainText(body::empty()) + } + } + + /// A plain text decoder. + /// + /// This decoder will emit the underlying chunks as-is. #[inline] fn plain_text(body: Body) -> Decoder { Decoder { @@ -89,6 +102,9 @@ impl Decoder { } } + /// A gzip decoder. + /// + /// This decoder will buffer and decompress chunks that are gzipped. #[inline] fn gzip(mut body: Body) -> Decoder { Decoder { @@ -385,14 +401,6 @@ impl ReadableChunks // pub(crate) -#[inline] -pub fn take(decoder: &mut Decoder) -> Decoder { - let inner = mem::replace(&mut decoder.inner, Inner::PlainText(body::empty())); - Decoder { - inner: inner, - } -} - /// Constructs a Decoder from a hyper request. /// /// A decoder is just a wrapper around the hyper request that knows diff --git a/src/async_impl/response.rs b/src/async_impl/response.rs index c0080e0..92dc8e3 100644 --- a/src/async_impl/response.rs +++ b/src/async_impl/response.rs @@ -63,17 +63,17 @@ impl Response { /// /// This function will replace the body on the response with an empty one. #[inline] - pub fn body(&mut self) -> Decoder { - decoder::take(&mut self.body) + pub fn body(&self) -> &Decoder { + &self.body } /// Try to deserialize the response body as JSON using `serde`. #[inline] pub fn json(&mut self) -> Json { - let body = self.body().concat2(); - + let body = mem::replace(&mut self.body, Decoder::empty()); + Json { - concat: body, + concat: body.concat2(), _marker: PhantomData, } } diff --git a/src/lib.rs b/src/lib.rs index 30e4f59..f723148 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -172,6 +172,7 @@ pub mod unstable { pub use ::async_impl::{ Body, Chunk, + Decoder, Client, ClientBuilder, Request, diff --git a/src/response.rs b/src/response.rs index cdc7827..86d588c 100644 --- a/src/response.rs +++ b/src/response.rs @@ -1,3 +1,4 @@ +use std::mem; use std::fmt; use std::io::{self, Read}; use std::time::Duration; @@ -182,7 +183,7 @@ impl Response { /// ``` #[inline] pub fn error_for_status(self) -> ::Result { - let Response { inner, body, _thread_handle } = self; + let Response { body, inner, _thread_handle } = self; inner.error_for_status().map(move |inner| { Response { inner: inner, @@ -227,8 +228,9 @@ impl Stream for WaitBody { // pub(crate) pub fn new(mut res: async_impl::Response, timeout: Option, thread: KeepCoreThreadAlive) -> Response { + let body = mem::replace(res.body_mut(), async_impl::Decoder::empty()); let body = async_impl::ReadableChunks::new(WaitBody { - inner: wait::stream(res.body(), timeout) + inner: wait::stream(body, timeout) }); Response { diff --git a/tests/async.rs b/tests/async.rs index 90af803..6a556ef 100644 --- a/tests/async.rs +++ b/tests/async.rs @@ -9,6 +9,8 @@ extern crate libflate; #[macro_use] mod support; +use std::mem; +use reqwest::unstable::async::{Client, Decoder}; use futures::{Future, Stream}; use tokio_core::reactor::Core; use std::io::Write; @@ -59,12 +61,15 @@ fn test_gzip(response_size: usize, chunk_size: usize) { let mut core = Core::new().unwrap(); - let client = reqwest::unstable::async::Client::new(&core.handle()).unwrap(); + let client = Client::new(&core.handle()).unwrap(); let res_future = client.get(&format!("http://{}/gzip", server.addr())) .unwrap() .send() - .and_then(|mut res| res.body().concat2()) + .and_then(|mut res| { + let body = mem::replace(res.body_mut(), Decoder::empty()); + body.concat2() + }) .and_then(|buf| { let body = ::std::str::from_utf8(&buf).unwrap();