make body return borrowed decoder
This commit is contained in:
@@ -1,17 +1,40 @@
|
|||||||
#![deny(warnings)]
|
#![allow(warnings)] // remove when error_chain is fixed
|
||||||
|
|
||||||
extern crate futures;
|
extern crate futures;
|
||||||
extern crate reqwest;
|
extern crate reqwest;
|
||||||
extern crate tokio_core;
|
extern crate tokio_core;
|
||||||
|
#[macro_use]
|
||||||
|
extern crate error_chain;
|
||||||
|
|
||||||
use futures::Future;
|
use std::mem;
|
||||||
|
use std::io::{self, Cursor};
|
||||||
|
use futures::{Future, Stream};
|
||||||
|
use reqwest::unstable::async::{Client, Decoder};
|
||||||
|
|
||||||
|
error_chain! {
|
||||||
|
foreign_links {
|
||||||
|
ReqError(reqwest::Error);
|
||||||
|
IoError(io::Error);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn main() {
|
fn main() {
|
||||||
let mut core = tokio_core::reactor::Core::new().unwrap();
|
let mut core = tokio_core::reactor::Core::new().unwrap();
|
||||||
let client = reqwest::unstable::async::Client::new(&core.handle()).unwrap();
|
let client = Client::new(&core.handle()).unwrap();
|
||||||
|
|
||||||
let work = client.get("https://hyper.rs").unwrap().send().map(|res| {
|
let work = client.get("https://hyper.rs").unwrap()
|
||||||
println!("{}", res.status());
|
.send()
|
||||||
});
|
.map_err(|e| Error::from(e))
|
||||||
|
.and_then(|mut res| {
|
||||||
|
println!("{}", res.status());
|
||||||
|
|
||||||
|
let body = mem::replace(res.body_mut(), Decoder::empty());
|
||||||
|
body.concat2().map_err(Into::into)
|
||||||
|
})
|
||||||
|
.and_then(|body| {
|
||||||
|
let mut body = Cursor::new(body);
|
||||||
|
io::copy(&mut body, &mut io::stdout()).map_err(Into::into)
|
||||||
|
});
|
||||||
|
|
||||||
core.run(work).unwrap();
|
core.run(work).unwrap();
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -66,6 +66,13 @@ pub struct Chunk {
|
|||||||
inner: ::hyper::Chunk,
|
inner: ::hyper::Chunk,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl AsRef<[u8]> for Chunk {
|
||||||
|
#[inline]
|
||||||
|
fn as_ref(&self) -> &[u8] {
|
||||||
|
&*self
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl ::std::ops::Deref for Chunk {
|
impl ::std::ops::Deref for Chunk {
|
||||||
type Target = [u8];
|
type Target = [u8];
|
||||||
#[inline]
|
#[inline]
|
||||||
|
|||||||
@@ -82,6 +82,19 @@ impl fmt::Debug for Decoder {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl Decoder {
|
impl Decoder {
|
||||||
|
/// An empty decoder.
|
||||||
|
///
|
||||||
|
/// This decoder will produce a single 0 byte chunk.
|
||||||
|
#[inline]
|
||||||
|
pub fn empty() -> Decoder {
|
||||||
|
Decoder {
|
||||||
|
inner: Inner::PlainText(body::empty())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A plain text decoder.
|
||||||
|
///
|
||||||
|
/// This decoder will emit the underlying chunks as-is.
|
||||||
#[inline]
|
#[inline]
|
||||||
fn plain_text(body: Body) -> Decoder {
|
fn plain_text(body: Body) -> Decoder {
|
||||||
Decoder {
|
Decoder {
|
||||||
@@ -89,6 +102,9 @@ impl Decoder {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// A gzip decoder.
|
||||||
|
///
|
||||||
|
/// This decoder will buffer and decompress chunks that are gzipped.
|
||||||
#[inline]
|
#[inline]
|
||||||
fn gzip(mut body: Body) -> Decoder {
|
fn gzip(mut body: Body) -> Decoder {
|
||||||
Decoder {
|
Decoder {
|
||||||
@@ -385,14 +401,6 @@ impl<S> ReadableChunks<S>
|
|||||||
|
|
||||||
// pub(crate)
|
// pub(crate)
|
||||||
|
|
||||||
#[inline]
|
|
||||||
pub fn take(decoder: &mut Decoder) -> Decoder {
|
|
||||||
let inner = mem::replace(&mut decoder.inner, Inner::PlainText(body::empty()));
|
|
||||||
Decoder {
|
|
||||||
inner: inner,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Constructs a Decoder from a hyper request.
|
/// Constructs a Decoder from a hyper request.
|
||||||
///
|
///
|
||||||
/// A decoder is just a wrapper around the hyper request that knows
|
/// A decoder is just a wrapper around the hyper request that knows
|
||||||
|
|||||||
@@ -63,17 +63,17 @@ impl Response {
|
|||||||
///
|
///
|
||||||
/// This function will replace the body on the response with an empty one.
|
/// This function will replace the body on the response with an empty one.
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn body(&mut self) -> Decoder {
|
pub fn body(&self) -> &Decoder {
|
||||||
decoder::take(&mut self.body)
|
&self.body
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Try to deserialize the response body as JSON using `serde`.
|
/// Try to deserialize the response body as JSON using `serde`.
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn json<T: DeserializeOwned>(&mut self) -> Json<T> {
|
pub fn json<T: DeserializeOwned>(&mut self) -> Json<T> {
|
||||||
let body = self.body().concat2();
|
let body = mem::replace(&mut self.body, Decoder::empty());
|
||||||
|
|
||||||
Json {
|
Json {
|
||||||
concat: body,
|
concat: body.concat2(),
|
||||||
_marker: PhantomData,
|
_marker: PhantomData,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -172,6 +172,7 @@ pub mod unstable {
|
|||||||
pub use ::async_impl::{
|
pub use ::async_impl::{
|
||||||
Body,
|
Body,
|
||||||
Chunk,
|
Chunk,
|
||||||
|
Decoder,
|
||||||
Client,
|
Client,
|
||||||
ClientBuilder,
|
ClientBuilder,
|
||||||
Request,
|
Request,
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
|
use std::mem;
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
use std::io::{self, Read};
|
use std::io::{self, Read};
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
@@ -182,7 +183,7 @@ impl Response {
|
|||||||
/// ```
|
/// ```
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn error_for_status(self) -> ::Result<Self> {
|
pub fn error_for_status(self) -> ::Result<Self> {
|
||||||
let Response { inner, body, _thread_handle } = self;
|
let Response { body, inner, _thread_handle } = self;
|
||||||
inner.error_for_status().map(move |inner| {
|
inner.error_for_status().map(move |inner| {
|
||||||
Response {
|
Response {
|
||||||
inner: inner,
|
inner: inner,
|
||||||
@@ -227,8 +228,9 @@ impl Stream for WaitBody {
|
|||||||
// pub(crate)
|
// pub(crate)
|
||||||
|
|
||||||
pub fn new(mut res: async_impl::Response, timeout: Option<Duration>, thread: KeepCoreThreadAlive) -> Response {
|
pub fn new(mut res: async_impl::Response, timeout: Option<Duration>, thread: KeepCoreThreadAlive) -> Response {
|
||||||
|
let body = mem::replace(res.body_mut(), async_impl::Decoder::empty());
|
||||||
let body = async_impl::ReadableChunks::new(WaitBody {
|
let body = async_impl::ReadableChunks::new(WaitBody {
|
||||||
inner: wait::stream(res.body(), timeout)
|
inner: wait::stream(body, timeout)
|
||||||
});
|
});
|
||||||
|
|
||||||
Response {
|
Response {
|
||||||
|
|||||||
@@ -9,6 +9,8 @@ extern crate libflate;
|
|||||||
#[macro_use]
|
#[macro_use]
|
||||||
mod support;
|
mod support;
|
||||||
|
|
||||||
|
use std::mem;
|
||||||
|
use reqwest::unstable::async::{Client, Decoder};
|
||||||
use futures::{Future, Stream};
|
use futures::{Future, Stream};
|
||||||
use tokio_core::reactor::Core;
|
use tokio_core::reactor::Core;
|
||||||
use std::io::Write;
|
use std::io::Write;
|
||||||
@@ -59,12 +61,15 @@ fn test_gzip(response_size: usize, chunk_size: usize) {
|
|||||||
|
|
||||||
let mut core = Core::new().unwrap();
|
let mut core = Core::new().unwrap();
|
||||||
|
|
||||||
let client = reqwest::unstable::async::Client::new(&core.handle()).unwrap();
|
let client = Client::new(&core.handle()).unwrap();
|
||||||
|
|
||||||
let res_future = client.get(&format!("http://{}/gzip", server.addr()))
|
let res_future = client.get(&format!("http://{}/gzip", server.addr()))
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.send()
|
.send()
|
||||||
.and_then(|mut res| res.body().concat2())
|
.and_then(|mut res| {
|
||||||
|
let body = mem::replace(res.body_mut(), Decoder::empty());
|
||||||
|
body.concat2()
|
||||||
|
})
|
||||||
.and_then(|buf| {
|
.and_then(|buf| {
|
||||||
let body = ::std::str::from_utf8(&buf).unwrap();
|
let body = ::std::str::from_utf8(&buf).unwrap();
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user