From 665b4fe7181c8342963fc14c13fa2ab548490c10 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Tue, 20 Jun 2017 21:27:59 -0700 Subject: [PATCH] upgrade hyper to v0.11 --- .travis.yml | 20 +- Cargo.toml | 24 +- examples/async.rs | 16 + examples/response_json.rs | 26 -- examples/simple.rs | 2 +- src/async_impl/body.rs | 117 ++++++++ src/async_impl/client.rs | 537 ++++++++++++++++++++++++++++++++++ src/async_impl/mod.rs | 11 + src/async_impl/request.rs | 410 ++++++++++++++++++++++++++ src/async_impl/response.rs | 112 +++++++ src/body.rs | 96 ++++-- src/client.rs | 417 +++++++------------------- src/error.rs | 58 +++- src/into_url.rs | 34 +++ src/lib.rs | 59 +++- src/redirect.rs | 4 +- src/request.rs | 128 ++++---- src/response.rs | 241 ++++++++------- src/tls.rs | 45 +++ src/wait.rs | 167 +++++++++++ tests/client.rs | 474 ++---------------------------- tests/gzip.rs | 166 +++++++++++ tests/redirect.rs | 318 ++++++++++++++++++++ tests/support/mod.rs | 2 + tests/{ => support}/server.rs | 69 ++++- tests/timeouts.rs | 121 ++++++++ 26 files changed, 2647 insertions(+), 1027 deletions(-) create mode 100644 examples/async.rs delete mode 100644 examples/response_json.rs create mode 100644 src/async_impl/body.rs create mode 100644 src/async_impl/client.rs create mode 100644 src/async_impl/mod.rs create mode 100644 src/async_impl/request.rs create mode 100644 src/async_impl/response.rs create mode 100644 src/into_url.rs create mode 100644 src/tls.rs create mode 100644 src/wait.rs create mode 100644 tests/gzip.rs create mode 100644 tests/redirect.rs create mode 100644 tests/support/mod.rs rename tests/{ => support}/server.rs (55%) create mode 100644 tests/timeouts.rs diff --git a/.travis.yml b/.travis.yml index aa15423..45b5740 100644 --- a/.travis.yml +++ b/.travis.yml @@ -4,11 +4,22 @@ matrix: allow_failures: - rust: nightly include: - - rust: stable - os: osx rust: stable + + - rust: stable + env: FEATURES="" - rust: beta + env: FEATURES="" - rust: nightly + env: FEATURES="" + + - rust: stable + env: FEATURES="--features unstable" + - rust: beta + env: FEATURES="--features unstable" + - rust: nightly + env: FEATURES="--features unstable" sudo: false dist: trusty @@ -20,8 +31,5 @@ cache: - target/debug/build script: - - cargo build --verbose - - cargo test --verbose - -notifications: - email: false + - cargo build $FEATURES + - cargo test $FEATURES diff --git a/Cargo.toml b/Cargo.toml index 15771a6..a69d0e8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,17 +10,33 @@ license = "MIT/Apache-2.0" categories = ["web-programming::http-client"] [dependencies] -hyper = "0.10.12" -hyper-native-tls = "0.2.4" -libc = "0.2" +bytes = "0.4" +futures = "0.1.14" +hyper = "0.11" +hyper-tls = "0.1" +libflate = "0.1.5" log = "0.3" +native-tls = "0.1" serde = "1.0" serde_json = "1.0" serde_urlencoded = "0.5" +tokio-core = "0.1.6" url = "1.2" -libflate = "0.1.5" [dev-dependencies] env_logger = "0.4" serde_derive = "1.0" error-chain = "0.10" + +[features] +unstable = [] + + +[[example]] +name = "simple" +path = "examples/simple.rs" + +[[example]] +name = "async" +path = "examples/async.rs" +required-features = ["unstable"] diff --git a/examples/async.rs b/examples/async.rs new file mode 100644 index 0000000..855b606 --- /dev/null +++ b/examples/async.rs @@ -0,0 +1,16 @@ +extern crate futures; +extern crate reqwest; +extern crate tokio_core; + +use futures::Future; + +fn main() { + let mut core = tokio_core::reactor::Core::new().unwrap(); + let client = reqwest::unstable::async::Client::new(&core.handle()).unwrap(); + + let work = client.get("https://hyper.rs").unwrap().send().map(|res| { + println!("{}", res.status()); + }); + + core.run(work).unwrap(); +} diff --git a/examples/response_json.rs b/examples/response_json.rs deleted file mode 100644 index b3dbc08..0000000 --- a/examples/response_json.rs +++ /dev/null @@ -1,26 +0,0 @@ -//! `cargo run --example response_json` -extern crate reqwest; -#[macro_use] -extern crate serde_derive; -#[macro_use] -extern crate error_chain; - -error_chain! { - foreign_links { - ReqError(reqwest::Error); - } -} - -#[derive(Debug, Deserialize)] -struct Response { - origin: String, -} - -fn run() -> Result<()> { - let mut res = reqwest::get("https://httpbin.org/ip")?; - let json = res.json::()?; - println!("JSON: {:?}", json); - Ok(()) -} - -quick_main!(run); diff --git a/examples/simple.rs b/examples/simple.rs index 12e2c00..846bf3f 100644 --- a/examples/simple.rs +++ b/examples/simple.rs @@ -16,7 +16,7 @@ fn run() -> Result<()> { println!("GET https://www.rust-lang.org"); - let mut res = reqwest::get("https://www.rust-lang.org")?; + let mut res = reqwest::get("https://www.rust-lang.org/en-US/")?; println!("Status: {}", res.status()); println!("Headers:\n{}", res.headers()); diff --git a/src/async_impl/body.rs b/src/async_impl/body.rs new file mode 100644 index 0000000..735f915 --- /dev/null +++ b/src/async_impl/body.rs @@ -0,0 +1,117 @@ +use std::fmt; + +use futures::{Stream, Poll, Async}; +use bytes::Bytes; + +/// An asynchronous `Stream`. +pub struct Body { + inner: Inner, +} + +enum Inner { + Reusable(Bytes), + Hyper(::hyper::Body), +} + +impl Body { + fn poll_inner(&mut self) -> &mut ::hyper::Body { + match self.inner { + Inner::Hyper(ref mut body) => body, + Inner::Reusable(_) => unreachable!(), + } + } +} + +impl Stream for Body { + type Item = Chunk; + type Error = ::Error; + + #[inline] + fn poll(&mut self) -> Poll, Self::Error> { + match try_!(self.poll_inner().poll()) { + Async::Ready(opt) => Ok(Async::Ready(opt.map(|chunk| Chunk { + inner: chunk, + }))), + Async::NotReady => Ok(Async::NotReady), + } + } +} + + +/// A chunk of bytes for a `Body`. +/// +/// A `Chunk` can be treated like `&[u8]`. +#[derive(Default)] +pub struct Chunk { + inner: ::hyper::Chunk, +} + +impl ::std::ops::Deref for Chunk { + type Target = [u8]; + #[inline] + fn deref(&self) -> &Self::Target { + self.inner.as_ref() + } +} + +impl Extend for Chunk { + fn extend(&mut self, iter: T) + where T: IntoIterator { + self.inner.extend(iter) + } +} + +impl IntoIterator for Chunk { + type Item = u8; + //XXX: exposing type from hyper! + type IntoIter = <::hyper::Chunk as IntoIterator>::IntoIter; + fn into_iter(self) -> Self::IntoIter { + self.inner.into_iter() + } +} + +impl fmt::Debug for Body { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("Body") + .finish() + } +} + +impl fmt::Debug for Chunk { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + fmt::Debug::fmt(&self.inner, f) + } +} + +// pub(crate) + +#[inline] +pub fn wrap(body: ::hyper::Body) -> Body { + Body { + inner: Inner::Hyper(body), + } +} + +#[inline] +pub fn take(body: &mut Body) -> Body { + use std::mem; + let inner = mem::replace(&mut body.inner, Inner::Hyper(::hyper::Body::empty())); + Body { + inner: inner, + } +} + +#[inline] +pub fn reusable(chunk: Bytes) -> Body { + Body { + inner: Inner::Reusable(chunk), + } +} + +#[inline] +pub fn into_hyper(body: Body) -> (Option, ::hyper::Body) { + match body.inner { + Inner::Reusable(chunk) => (Some(chunk.clone()), chunk.into()), + Inner::Hyper(b) => (None, b), + } +} diff --git a/src/async_impl/client.rs b/src/async_impl/client.rs new file mode 100644 index 0000000..05e4315 --- /dev/null +++ b/src/async_impl/client.rs @@ -0,0 +1,537 @@ +use std::fmt; +use std::sync::Arc; +use std::time::Duration; + +use bytes::Bytes; +use futures::{Async, Future, Poll}; +use hyper::client::FutureResponse; +use hyper::header::{Headers, Location, Referer, UserAgent, Accept, Encoding, + AcceptEncoding, Range, qitem}; +use native_tls::{TlsConnector, TlsConnectorBuilder}; +use tokio_core::reactor::Handle; + + +use super::body; +use super::request::{self, Request, RequestBuilder}; +use super::response::{self, Response}; +use redirect::{self, RedirectPolicy, check_redirect, remove_sensitive_headers}; +use {Certificate, IntoUrl, Method, StatusCode, Url}; + +static DEFAULT_USER_AGENT: &'static str = + concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION")); + +/// A `Client` to make Requests with. +/// +/// The Client has various configuration values to tweak, but the defaults +/// are set to what is usually the most commonly desired value. +/// +/// The `Client` holds a connection pool internally, so it is advised that +/// you create one and reuse it. +/// +/// # Examples +/// +/// ```rust +/// # use reqwest::{Error, Client}; +/// # +/// # fn run() -> Result<(), Error> { +/// let client = Client::new()?; +/// let resp = client.get("http://httpbin.org/")?.send()?; +/// # drop(resp); +/// # Ok(()) +/// # } +/// +/// ``` +#[derive(Clone)] +pub struct Client { + inner: Arc, +} + +/// A `ClientBuilder` can be used to create a `Client` with custom configuration: +/// +/// - with hostname verification disabled +/// - with one or multiple custom certificates +/// +/// # Examples +/// +/// ``` +/// # use std::fs::File; +/// # use std::io::Read; +/// # fn build_client() -> Result<(), Box> { +/// // read a local binary DER encoded certificate +/// let mut buf = Vec::new(); +/// File::open("my-cert.der")?.read_to_end(&mut buf)?; +/// +/// // create a certificate +/// let cert = reqwest::Certificate::from_der(&buf)?; +/// +/// // get a client builder +/// let client = reqwest::ClientBuilder::new()? +/// .add_root_certificate(cert)? +/// .build()?; +/// # drop(client); +/// # Ok(()) +/// # } +/// ``` +pub struct ClientBuilder { + config: Option, +} + +struct Config { + gzip: bool, + hostname_verification: bool, + redirect_policy: RedirectPolicy, + referer: bool, + timeout: Option, + tls: TlsConnectorBuilder, +} + +impl ClientBuilder { + /// Constructs a new `ClientBuilder` + /// + /// # Errors + /// + /// This method fails if native TLS backend cannot be created. + pub fn new() -> ::Result { + let tls_connector_builder = try_!(TlsConnector::builder()); + Ok(ClientBuilder { + config: Some(Config { + gzip: true, + hostname_verification: true, + redirect_policy: RedirectPolicy::default(), + referer: true, + timeout: None, + tls: tls_connector_builder, + }) + }) + } + + /// Returns a `Client` that uses this `ClientBuilder` configuration. + /// + /// # Errors + /// + /// This method fails if native TLS backend cannot be initialized. + /// + /// # Panics + /// + /// This method consumes the internal state of the builder. + /// Trying to use this builder again after calling `build` will panic. + pub fn build(&mut self, handle: &Handle) -> ::Result { + let config = self.take_config(); + + let tls = try_!(config.tls.build()); + + /* + let mut tls_client = NativeTlsClient::from(tls_connector); + if !config.hostname_verification { + tls_client.danger_disable_hostname_verification(true); + } + */ + + let hyper_client = create_hyper_client(tls, handle); + //let mut hyper_client = create_hyper_client(tls_client); + + //hyper_client.set_read_timeout(config.timeout); + //hyper_client.set_write_timeout(config.timeout); + + Ok(Client { + inner: Arc::new(ClientRef { + gzip: config.gzip, + hyper: hyper_client, + redirect_policy: config.redirect_policy, + referer: config.referer, + }), + }) + } + + /// Add a custom root certificate. + /// + /// This can be used to connect to a server that has a self-signed + /// certificate for example. + /// + /// # Errors + /// + /// This method fails if adding root certificate was unsuccessful. + pub fn add_root_certificate(&mut self, cert: Certificate) -> ::Result<&mut ClientBuilder> { + let cert = ::tls::cert(cert); + try_!(self.config_mut().tls.add_root_certificate(cert)); + Ok(self) + } + + /// Disable hostname verification. + /// + /// # Warning + /// + /// You should think very carefully before you use this method. If + /// hostname verification is not used, any valid certificate for any + /// site will be trusted for use from any other. This introduces a + /// significant vulnerability to man-in-the-middle attacks. + #[inline] + pub fn danger_disable_hostname_verification(&mut self) -> &mut ClientBuilder { + self.config_mut().hostname_verification = false; + self + } + + /// Enable hostname verification. + #[inline] + pub fn enable_hostname_verification(&mut self) -> &mut ClientBuilder { + self.config_mut().hostname_verification = true; + self + } + + /// Enable auto gzip decompression by checking the ContentEncoding response header. + /// + /// Default is enabled. + #[inline] + pub fn gzip(&mut self, enable: bool) -> &mut ClientBuilder { + self.config_mut().gzip = enable; + self + } + + /// Set a `RedirectPolicy` for this client. + /// + /// Default will follow redirects up to a maximum of 10. + #[inline] + pub fn redirect(&mut self, policy: RedirectPolicy) -> &mut ClientBuilder { + self.config_mut().redirect_policy = policy; + self + } + + /// Enable or disable automatic setting of the `Referer` header. + /// + /// Default is `true`. + #[inline] + pub fn referer(&mut self, enable: bool) -> &mut ClientBuilder { + self.config_mut().referer = enable; + self + } + + /// Set a timeout for both the read and write operations of a client. + #[inline] + pub fn timeout(&mut self, timeout: Duration) -> &mut ClientBuilder { + self.config_mut().timeout = Some(timeout); + self + } + + // private + fn config_mut(&mut self) -> &mut Config { + self.config + .as_mut() + .expect("ClientBuilder cannot be reused after building a Client") + } + + fn take_config(&mut self) -> Config { + self.config + .take() + .expect("ClientBuilder cannot be reused after building a Client") + } +} + +type HyperClient = ::hyper::Client<::hyper_tls::HttpsConnector<::hyper::client::HttpConnector>>; + +fn create_hyper_client(tls: TlsConnector, handle: &Handle) -> HyperClient { + let mut http = ::hyper::client::HttpConnector::new(4, handle); + http.enforce_http(false); + let https = ::hyper_tls::HttpsConnector::from((http, tls)); + ::hyper::Client::configure() + .connector(https) + .build(handle) +} + +impl Client { + /// Constructs a new `Client`. + /// + /// # Errors + /// + /// This method fails if native TLS backend cannot be created or initialized. + #[inline] + pub fn new(handle: &Handle) -> ::Result { + ClientBuilder::new()?.build(handle) + } + + /// Creates a `ClientBuilder` to configure a `Client`. + /// + /// # Errors + /// + /// This method fails if native TLS backend cannot be created. + #[inline] + pub fn builder() -> ::Result { + ClientBuilder::new() + } + + /// Convenience method to make a `GET` request to a URL. + /// + /// # Errors + /// + /// This method fails whenever supplied `Url` cannot be parsed. + pub fn get(&self, url: U) -> ::Result { + self.request(Method::Get, url) + } + + /// Convenience method to make a `POST` request to a URL. + /// + /// # Errors + /// + /// This method fails whenever supplied `Url` cannot be parsed. + pub fn post(&self, url: U) -> ::Result { + self.request(Method::Post, url) + } + + /// Convenience method to make a `PUT` request to a URL. + /// + /// # Errors + /// + /// This method fails whenever supplied `Url` cannot be parsed. + pub fn put(&self, url: U) -> ::Result { + self.request(Method::Put, url) + } + + /// Convenience method to make a `PATCH` request to a URL. + /// + /// # Errors + /// + /// This method fails whenever supplied `Url` cannot be parsed. + pub fn patch(&self, url: U) -> ::Result { + self.request(Method::Patch, url) + } + + /// Convenience method to make a `DELETE` request to a URL. + /// + /// # Errors + /// + /// This method fails whenever supplied `Url` cannot be parsed. + pub fn delete(&self, url: U) -> ::Result { + self.request(Method::Delete, url) + } + + /// Convenience method to make a `HEAD` request to a URL. + /// + /// # Errors + /// + /// This method fails whenever supplied `Url` cannot be parsed. + pub fn head(&self, url: U) -> ::Result { + self.request(Method::Head, url) + } + + /// Start building a `Request` with the `Method` and `Url`. + /// + /// Returns a `RequestBuilder`, which will allow setting headers and + /// request body before sending. + /// + /// # Errors + /// + /// This method fails whenever supplied `Url` cannot be parsed. + pub fn request(&self, method: Method, url: U) -> ::Result { + let url = try_!(url.into_url()); + Ok(request::builder(self.clone(), Request::new(method, url))) + } + + /// Executes a `Request`. + /// + /// A `Request` can be built manually with `Request::new()` or obtained + /// from a RequestBuilder with `RequestBuilder::build()`. + /// + /// You should prefer to use the `RequestBuilder` and + /// `RequestBuilder::send()`. + /// + /// # Errors + /// + /// This method fails if there was an error while sending request, + /// redirect loop was detected or redirect limit was exhausted. + pub fn execute(&self, request: Request) -> Pending { + self.execute_request(request) + } + + + fn execute_request(&self, req: Request) -> Pending { + let ( + method, + url, + mut headers, + body + ) = request::pieces(req); + + if !headers.has::() { + headers.set(UserAgent::new(DEFAULT_USER_AGENT)); + } + + if !headers.has::() { + headers.set(Accept::star()); + } + if self.inner.gzip && + !headers.has::() && + !headers.has::() { + headers.set(AcceptEncoding(vec![qitem(Encoding::Gzip)])); + } + + let mut req = ::hyper::Request::new(method.clone(), url_to_uri(&url)); + *req.headers_mut() = headers.clone(); + let body = body.and_then(|body| { + let (resuable, body) = body::into_hyper(body); + req.set_body(body); + resuable + }); + + let in_flight = self.inner.hyper.request(req); + + Pending { + method: method, + url: url, + headers: headers, + body: body, + + urls: Vec::new(), + + client: self.inner.clone(), + + in_flight: in_flight, + } + } +} + +impl fmt::Debug for Client { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("Client") + .field("gzip", &self.inner.gzip) + .field("redirect_policy", &self.inner.redirect_policy) + .field("referer", &self.inner.referer) + .finish() + } +} + +impl fmt::Debug for ClientBuilder { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("ClientBuilder") + .finish() + } +} + +struct ClientRef { + gzip: bool, + hyper: HyperClient, + redirect_policy: RedirectPolicy, + referer: bool, +} + +pub struct Pending { + method: Method, + url: Url, + headers: Headers, + body: Option, + + urls: Vec, + + client: Arc, + + in_flight: FutureResponse, +} + +impl Future for Pending { + type Item = Response; + type Error = ::Error; + + fn poll(&mut self) -> Poll { + loop { + let res = match try_!(self.in_flight.poll(), &self.url) { + Async::Ready(res) => res, + Async::NotReady => return Ok(Async::NotReady), + }; + let should_redirect = match res.status() { + StatusCode::MovedPermanently | + StatusCode::Found | + StatusCode::SeeOther => { + self.body = None; + match self.method { + Method::Get | Method::Head => {}, + _ => { + self.method = Method::Get; + } + } + true + }, + StatusCode::TemporaryRedirect | + StatusCode::PermanentRedirect => { + self.body.is_some() + }, + _ => false, + }; + if should_redirect { + let loc = res.headers() + .get::() + .map(|loc| self.url.join(loc)); + if let Some(Ok(loc)) = loc { + if self.client.referer { + if let Some(referer) = make_referer(&loc, &self.url) { + self.headers.set(referer); + } + } + self.urls.push(self.url.clone()); + let action = check_redirect(&self.client.redirect_policy, &loc, &self.urls); + + match action { + redirect::Action::Follow => { + self.url = loc; + + remove_sensitive_headers(&mut self.headers, &self.url, &self.urls); + debug!("redirecting to {:?} '{}'", self.method, self.url); + let mut req = ::hyper::Request::new( + self.method.clone(), + url_to_uri(&self.url) + ); + *req.headers_mut() = self.headers.clone(); + if let Some(ref body) = self.body { + req.set_body(body.clone()); + } + self.in_flight = self.client.hyper.request(req); + continue; + }, + redirect::Action::Stop => { + debug!("redirect_policy disallowed redirection to '{}'", loc); + }, + redirect::Action::LoopDetected => { + return Err(::error::loop_detected(self.url.clone())); + }, + redirect::Action::TooManyRedirects => { + return Err(::error::too_many_redirects(self.url.clone())); + } + } + } else if let Some(Err(e)) = loc { + debug!("Location header had invalid URI: {:?}", e); + } + } + let res = response::new(res, self.url.clone(), self.client.gzip); + return Ok(Async::Ready(res)); + } + } +} + +impl fmt::Debug for Pending { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("Pending") + .field("method", &self.method) + .field("url", &self.url) + .finish() + } +} + +fn make_referer(next: &Url, previous: &Url) -> Option { + if next.scheme() == "http" && previous.scheme() == "https" { + return None; + } + + let mut referer = previous.clone(); + let _ = referer.set_username(""); + let _ = referer.set_password(None); + referer.set_fragment(None); + Some(Referer::new(referer.into_string())) +} + +fn url_to_uri(url: &Url) -> ::hyper::Uri { + url.as_str().parse().expect("a parsed Url should always be a valid Uri") +} + +// pub(crate) + +pub fn take_builder(builder: &mut ClientBuilder) -> ClientBuilder { + use std::mem; + mem::replace(builder, ClientBuilder { config: None }) +} diff --git a/src/async_impl/mod.rs b/src/async_impl/mod.rs new file mode 100644 index 0000000..0ee4a66 --- /dev/null +++ b/src/async_impl/mod.rs @@ -0,0 +1,11 @@ +#![cfg_attr(not(features = "unstable"), allow(unused))] + +pub use self::body::{Body, Chunk}; +pub use self::client::{Client, ClientBuilder}; +pub use self::request::{Request, RequestBuilder}; +pub use self::response::Response; + +pub mod body; +pub mod client; +mod request; +mod response; diff --git a/src/async_impl/request.rs b/src/async_impl/request.rs new file mode 100644 index 0000000..ce53e0d --- /dev/null +++ b/src/async_impl/request.rs @@ -0,0 +1,410 @@ +use std::fmt; + +use serde::Serialize; +use serde_json; +use serde_urlencoded; + +use super::body::{self, Body}; +use super::client::{Client, Pending}; +use header::{ContentType, Headers}; +use {Method, Url}; + +/// A request which can be executed with `Client::execute()`. +pub struct Request { + method: Method, + url: Url, + headers: Headers, + body: Option, +} + +/// A builder to construct the properties of a `Request`. +pub struct RequestBuilder { + client: Client, + request: Option, +} + +impl Request { + /// Constructs a new request. + #[inline] + pub fn new(method: Method, url: Url) -> Self { + Request { + method, + url, + headers: Headers::new(), + body: None, + } + } + + /// Get the method. + #[inline] + pub fn method(&self) -> &Method { + &self.method + } + + /// Get a mutable reference to the method. + #[inline] + pub fn method_mut(&mut self) -> &mut Method { + &mut self.method + } + + /// Get the url. + #[inline] + pub fn url(&self) -> &Url { + &self.url + } + + /// Get a mutable reference to the url. + #[inline] + pub fn url_mut(&mut self) -> &mut Url { + &mut self.url + } + + /// Get the headers. + #[inline] + pub fn headers(&self) -> &Headers { + &self.headers + } + + /// Get a mutable reference to the headers. + #[inline] + pub fn headers_mut(&mut self) -> &mut Headers { + &mut self.headers + } + + /// Get the body. + #[inline] + pub fn body(&self) -> Option<&Body> { + self.body.as_ref() + } + + /// Get a mutable reference to the body. + #[inline] + pub fn body_mut(&mut self) -> &mut Option { + &mut self.body + } +} + +impl RequestBuilder { + /// Add a `Header` to this Request. + pub fn header(&mut self, header: H) -> &mut RequestBuilder + where + H: ::header::Header, + { + self.request_mut().headers.set(header); + self + } + /// Add a set of Headers to the existing ones on this Request. + /// + /// The headers will be merged in to any already set. + pub fn headers(&mut self, headers: ::header::Headers) -> &mut RequestBuilder { + self.request_mut().headers.extend(headers.iter()); + self + } + + /// Enable HTTP basic authentication. + pub fn basic_auth(&mut self, username: U, password: Option

) -> &mut RequestBuilder + where + U: Into, + P: Into, + { + self.header(::header::Authorization(::header::Basic { + username: username.into(), + password: password.map(|p| p.into()), + })) + } + + /// Set the request body. + pub fn body>(&mut self, body: T) -> &mut RequestBuilder { + self.request_mut().body = Some(body.into()); + self + } + + /// Send a form body. + pub fn form(&mut self, form: &T) -> ::Result<&mut RequestBuilder> { + { + // check request_mut() before running serde + let mut req = self.request_mut(); + let body = try_!(serde_urlencoded::to_string(form)); + req.headers.set(ContentType::form_url_encoded()); + req.body = Some(body::reusable(body.into())); + } + Ok(self) + } + + /// Send a JSON body. + /// + /// # Errors + /// + /// Serialization can fail if `T`'s implementation of `Serialize` decides to + /// fail, or if `T` contains a map with non-string keys. + pub fn json(&mut self, json: &T) -> ::Result<&mut RequestBuilder> { + { + // check request_mut() before running serde + let mut req = self.request_mut(); + let body = try_!(serde_json::to_vec(json)); + req.headers.set(ContentType::json()); + req.body = Some(body::reusable(body.into())); + } + Ok(self) + } + + /// Build a `Request`, which can be inspected, modified and executed with + /// `Client::execute()`. + /// + /// # Panics + /// + /// This method consumes builder internal state. It panics on an attempt to + /// reuse already consumed builder. + pub fn build(&mut self) -> Request { + self.request + .take() + .expect("RequestBuilder cannot be reused after builder a Request") + } + + /// Constructs the Request and sends it the target URL, returning a Response. + /// + /// # Errors + /// + /// This method fails if there was an error while sending request, + /// redirect loop was detected or redirect limit was exhausted. + pub fn send(&mut self) -> Pending { + let request = self.build(); + self.client.execute(request) + } + + // private + + fn request_mut(&mut self) -> &mut Request { + self.request + .as_mut() + .expect("RequestBuilder cannot be reused after builder a Request") + } +} + +impl fmt::Debug for Request { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + fmt_request_fields(&mut f.debug_struct("Request"), self) + .finish() + } +} + +impl fmt::Debug for RequestBuilder { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + if let Some(ref req) = self.request { + fmt_request_fields(&mut f.debug_struct("RequestBuilder"), req) + .finish() + } else { + f.debug_tuple("RequestBuilder") + .field(&"Consumed") + .finish() + } + } +} + +fn fmt_request_fields<'a, 'b>(f: &'a mut fmt::DebugStruct<'a, 'b>, req: &Request) -> &'a mut fmt::DebugStruct<'a, 'b> { + f.field("method", &req.method) + .field("url", &req.url) + .field("headers", &req.headers) +} + +// pub(crate) + +#[inline] +pub fn builder(client: Client, req: Request) -> RequestBuilder { + RequestBuilder { + client: client, + request: Some(req), + } +} + +#[inline] +pub fn pieces(req: Request) -> (Method, Url, Headers, Option) { + (req.method, req.url, req.headers, req.body) +} + +#[cfg(test)] +mod tests { + /* + use {body, Method}; + use super::Client; + use header::{Host, Headers, ContentType}; + use std::collections::HashMap; + use serde_urlencoded; + use serde_json; + + #[test] + fn basic_get_request() { + let client = Client::new().unwrap(); + let some_url = "https://google.com/"; + let r = client.get(some_url).unwrap().build(); + + assert_eq!(r.method, Method::Get); + assert_eq!(r.url.as_str(), some_url); + } + + #[test] + fn basic_head_request() { + let client = Client::new().unwrap(); + let some_url = "https://google.com/"; + let r = client.head(some_url).unwrap().build(); + + assert_eq!(r.method, Method::Head); + assert_eq!(r.url.as_str(), some_url); + } + + #[test] + fn basic_post_request() { + let client = Client::new().unwrap(); + let some_url = "https://google.com/"; + let r = client.post(some_url).unwrap().build(); + + assert_eq!(r.method, Method::Post); + assert_eq!(r.url.as_str(), some_url); + } + + #[test] + fn basic_put_request() { + let client = Client::new().unwrap(); + let some_url = "https://google.com/"; + let r = client.put(some_url).unwrap().build(); + + assert_eq!(r.method, Method::Put); + assert_eq!(r.url.as_str(), some_url); + } + + #[test] + fn basic_patch_request() { + let client = Client::new().unwrap(); + let some_url = "https://google.com/"; + let r = client.patch(some_url).unwrap().build(); + + assert_eq!(r.method, Method::Patch); + assert_eq!(r.url.as_str(), some_url); + } + + #[test] + fn basic_delete_request() { + let client = Client::new().unwrap(); + let some_url = "https://google.com/"; + let r = client.delete(some_url).unwrap().build(); + + assert_eq!(r.method, Method::Delete); + assert_eq!(r.url.as_str(), some_url); + } + + #[test] + fn add_header() { + let client = Client::new().unwrap(); + let some_url = "https://google.com/"; + let mut r = client.post(some_url).unwrap(); + + let header = Host { + hostname: "google.com".to_string(), + port: None, + }; + + // Add a copy of the header to the request builder + let r = r.header(header.clone()).build(); + + // then check it was actually added + assert_eq!(r.headers.get::(), Some(&header)); + } + + #[test] + fn add_headers() { + let client = Client::new().unwrap(); + let some_url = "https://google.com/"; + let mut r = client.post(some_url).unwrap(); + + let header = Host { + hostname: "google.com".to_string(), + port: None, + }; + + let mut headers = Headers::new(); + headers.set(header); + + // Add a copy of the headers to the request builder + let r = r.headers(headers.clone()).build(); + + // then make sure they were added correctly + assert_eq!(r.headers, headers); + } + + #[test] + fn add_body() { + let client = Client::new().unwrap(); + let some_url = "https://google.com/"; + let mut r = client.post(some_url).unwrap(); + + let body = "Some interesting content"; + + let r = r.body(body).build(); + + let buf = body::read_to_string(r.body.unwrap()).unwrap(); + + assert_eq!(buf, body); + } + + #[test] + fn add_form() { + let client = Client::new().unwrap(); + let some_url = "https://google.com/"; + let mut r = client.post(some_url).unwrap(); + + let mut form_data = HashMap::new(); + form_data.insert("foo", "bar"); + + let r = r.form(&form_data).unwrap().build(); + + // Make sure the content type was set + assert_eq!(r.headers.get::(), + Some(&ContentType::form_url_encoded())); + + let buf = body::read_to_string(r.body.unwrap()).unwrap(); + + let body_should_be = serde_urlencoded::to_string(&form_data).unwrap(); + assert_eq!(buf, body_should_be); + } + + #[test] + fn add_json() { + let client = Client::new().unwrap(); + let some_url = "https://google.com/"; + let mut r = client.post(some_url).unwrap(); + + let mut json_data = HashMap::new(); + json_data.insert("foo", "bar"); + + let r = r.json(&json_data).unwrap().build(); + + // Make sure the content type was set + assert_eq!(r.headers.get::(), Some(&ContentType::json())); + + let buf = body::read_to_string(r.body.unwrap()).unwrap(); + + let body_should_be = serde_json::to_string(&json_data).unwrap(); + assert_eq!(buf, body_should_be); + } + + #[test] + fn add_json_fail() { + use serde::{Serialize, Serializer}; + use serde::ser::Error; + struct MyStruct; + impl Serialize for MyStruct { + fn serialize(&self, _serializer: S) -> Result + where S: Serializer + { + Err(S::Error::custom("nope")) + } + } + + let client = Client::new().unwrap(); + let some_url = "https://google.com/"; + let mut r = client.post(some_url).unwrap(); + let json_data = MyStruct{}; + assert!(r.json(&json_data).unwrap_err().is_serialization()); + } + */ +} diff --git a/src/async_impl/response.rs b/src/async_impl/response.rs new file mode 100644 index 0000000..aaaa3a9 --- /dev/null +++ b/src/async_impl/response.rs @@ -0,0 +1,112 @@ +use std::fmt; +use std::marker::PhantomData; + +use futures::{Async, Future, Poll, Stream}; +use futures::stream::Concat2; +use header::Headers; +use hyper::StatusCode; +use serde::de::DeserializeOwned; +use serde_json; +use url::Url; + +use super::{body, Body}; + + +/// A Response to a submitted `Request`. +pub struct Response { + status: StatusCode, + headers: Headers, + url: Url, + body: Body, +} + +impl Response { + /// Get the final `Url` of this `Response`. + #[inline] + pub fn url(&self) -> &Url { + &self.url + } + + /// Get the `StatusCode` of this `Response`. + #[inline] + pub fn status(&self) -> StatusCode { + self.status + } + + /// Get the `Headers` of this `Response`. + #[inline] + pub fn headers(&self) -> &Headers { + &self.headers + } + + /// Get a mutable reference to the `Headers` of this `Response`. + #[inline] + pub fn headers_mut(&mut self) -> &mut Headers { + &mut self.headers + } + + /// Get a mutable reference to the `Body` of this `Response`. + #[inline] + pub fn body_mut(&mut self) -> &mut Body { + &mut self.body + } + + /// Try to deserialize the response body as JSON using `serde`. + #[inline] + pub fn json(&mut self) -> Json { + Json { + concat: body::take(self.body_mut()).concat2(), + _marker: PhantomData, + } + } +} + + +impl fmt::Debug for Response { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("Response") + .field("url", self.url()) + .field("status", &self.status()) + .field("headers", self.headers()) + .finish() + } +} + +pub struct Json { + concat: Concat2, + _marker: PhantomData, +} + +impl Future for Json { + type Item = T; + type Error = ::Error; + fn poll(&mut self) -> Poll { + let bytes = try_ready!(self.concat.poll()); + let t = try_!(serde_json::from_slice(&bytes)); + Ok(Async::Ready(t)) + } +} + +impl fmt::Debug for Json { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("Json") + .finish() + } +} + +// pub(crate) + +pub fn new(mut res: ::hyper::client::Response, url: Url, _gzip: bool) -> Response { + use std::mem; + + let status = res.status(); + let headers = mem::replace(res.headers_mut(), Headers::new()); + let body = res.body(); + info!("Response: '{}' for {}", status, url); + Response { + status: status, + headers: headers, + url: url, + body: super::body::wrap(body), + } +} diff --git a/src/body.rs b/src/body.rs index b1c59b5..eb25de0 100644 --- a/src/body.rs +++ b/src/body.rs @@ -1,7 +1,12 @@ -use std::io::Read; +use std::io::{self, Read}; use std::fs::File; use std::fmt; +use bytes::Bytes; +use hyper::{self, Chunk}; + +use {async_impl, wait}; + /// Body type for a request. #[derive(Debug)] pub struct Body { @@ -67,12 +72,6 @@ impl Body { reader: Kind::Reader(Box::new(reader), Some(len)), } } - - /* - pub fn chunked(reader: ()) -> Body { - unimplemented!() - } - */ } // useful for tests, but not publicly exposed @@ -88,14 +87,14 @@ pub fn read_to_string(mut body: Body) -> ::std::io::Result { enum Kind { Reader(Box, Option), - Bytes(Vec), + Bytes(Bytes), } impl From> for Body { #[inline] fn from(v: Vec) -> Body { Body { - reader: Kind::Bytes(v), + reader: Kind::Bytes(v.into()), } } } @@ -108,16 +107,18 @@ impl From for Body { } -impl<'a> From<&'a [u8]> for Body { +impl From<&'static [u8]> for Body { #[inline] - fn from(s: &'a [u8]) -> Body { - s.to_vec().into() + fn from(s: &'static [u8]) -> Body { + Body { + reader: Kind::Bytes(Bytes::from_static(s)), + } } } -impl<'a> From<&'a str> for Body { +impl From<&'static str> for Body { #[inline] - fn from(s: &'a str) -> Body { + fn from(s: &'static str) -> Body { s.as_bytes().into() } } @@ -142,28 +143,65 @@ impl fmt::Debug for Kind { } -// Wraps a `std::io::Write`. -//pub struct Pipe(Kind); +// pub(crate) +pub struct Sender { + body: (Box, Option), + tx: wait::WaitSink<::futures::sync::mpsc::Sender>>, +} -pub fn as_hyper_body(body: &mut Body) -> ::hyper::client::Body { - match body.reader { - Kind::Bytes(ref bytes) => { - let len = bytes.len(); - ::hyper::client::Body::BufBody(bytes, len) - } - Kind::Reader(ref mut reader, len_opt) => { - match len_opt { - Some(len) => ::hyper::client::Body::SizedBody(reader, len), - None => ::hyper::client::Body::ChunkedBody(reader), +impl Sender { + pub fn send(self) -> ::Result<()> { + use std::cmp; + use bytes::{BufMut, BytesMut}; + + let cap = cmp::min(self.body.1.unwrap_or(8192), 8192); + let mut buf = BytesMut::with_capacity(cap as usize); + let mut body = self.body.0; + let mut tx = self.tx; + loop { + println!("reading"); + match body.read(unsafe { buf.bytes_mut() }) { + Ok(0) => return Ok(()), + Ok(n) => { + unsafe { buf.advance_mut(n); } + println!("sending {}", n); + if let Err(e) = tx.send(Ok(buf.take().freeze().into())) { + if let wait::Waited::Err(_) = e { + let epipe = io::Error::new(io::ErrorKind::BrokenPipe, "broken pipe"); + return Err(::error::from(epipe)); + } else { + return Err(::error::timedout(None)); + } + } + if buf.remaining_mut() == 0 { + buf.reserve(8192); + } + } + Err(e) => { + let ret = io::Error::new(e.kind(), e.to_string()); + let _ = tx.send(Err(e.into())); + return Err(::error::from(ret)); + } } } } } -pub fn can_reset(body: &Body) -> bool { +#[inline] +pub fn async(body: Body) -> (Option, async_impl::Body, Option) { match body.reader { - Kind::Bytes(_) => true, - Kind::Reader(..) => false, + Kind::Reader(read, len) => { + let (tx, rx) = hyper::Body::pair(); + let tx = Sender { + body: (read, len), + tx: wait::sink(tx, None), + }; + (Some(tx), async_impl::body::wrap(rx), len) + }, + Kind::Bytes(chunk) => { + let len = chunk.len() as u64; + (None, async_impl::body::reusable(chunk), Some(len)) + } } } diff --git a/src/client.rs b/src/client.rs index e6f4d5e..9b9fd12 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,24 +1,13 @@ use std::fmt; -use std::net::TcpStream; use std::sync::Arc; use std::time::Duration; -use hyper::client::IntoUrl; -use hyper::header::{Location, Referer, UserAgent, Accept, Encoding, - AcceptEncoding, Range, qitem}; -use hyper::method::Method; -use hyper::status::StatusCode; -use hyper::Url; +use futures::{Future, Stream}; +use futures::sync::{mpsc, oneshot}; -use hyper_native_tls::{NativeTlsClient, TlsStream, native_tls}; - -use body; -use redirect::{self, RedirectPolicy, check_redirect, remove_sensitive_headers}; use request::{self, Request, RequestBuilder}; -use response::Response; - -static DEFAULT_USER_AGENT: &'static str = - concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION")); +use response::{self, Response}; +use {async_impl, Certificate, Method, IntoUrl, RedirectPolicy, wait}; /// A `Client` to make Requests with. /// @@ -43,37 +32,7 @@ static DEFAULT_USER_AGENT: &'static str = /// ``` #[derive(Clone)] pub struct Client { - inner: Arc, -} - -/// Represent an X509 certificate. -pub struct Certificate(native_tls::Certificate); - -impl Certificate { - /// Create a `Certificate` from a binary DER encoded certificate - /// - /// # Examples - /// - /// ``` - /// # use std::fs::File; - /// # use std::io::Read; - /// # fn cert() -> Result<(), Box> { - /// let mut buf = Vec::new(); - /// File::open("my_cert.der")? - /// .read_to_end(&mut buf)?; - /// let cert = reqwest::Certificate::from_der(&buf)?; - /// # drop(cert); - /// # Ok(()) - /// # } - /// ``` - /// - /// # Errors - /// - /// If the provided buffer is not valid DER, an error will be returned. - pub fn from_der(der: &[u8]) -> ::Result { - let inner = try_!(native_tls::Certificate::from_der(der)); - Ok(Certificate(inner)) - } + inner: ClientHandle, } /// A `ClientBuilder` can be used to create a `Client` with custom configuration: @@ -103,16 +62,9 @@ impl Certificate { /// # } /// ``` pub struct ClientBuilder { - config: Option, -} - -struct Config { gzip: bool, - hostname_verification: bool, - redirect_policy: RedirectPolicy, - referer: bool, + inner: async_impl::ClientBuilder, timeout: Option, - tls: native_tls::TlsConnectorBuilder, } impl ClientBuilder { @@ -122,16 +74,10 @@ impl ClientBuilder { /// /// This method fails if native TLS backend cannot be created. pub fn new() -> ::Result { - let tls_connector_builder = try_!(native_tls::TlsConnector::builder()); - Ok(ClientBuilder { - config: Some(Config { - gzip: true, - hostname_verification: true, - redirect_policy: RedirectPolicy::default(), - referer: true, - timeout: None, - tls: tls_connector_builder, - }) + async_impl::ClientBuilder::new().map(|builder| ClientBuilder { + inner: builder, + gzip: true, + timeout: None, }) } @@ -146,26 +92,8 @@ impl ClientBuilder { /// This method consumes the internal state of the builder. /// Trying to use this builder again after calling `build` will panic. pub fn build(&mut self) -> ::Result { - let config = self.take_config(); - - let tls_connector = try_!(config.tls.build()); - let mut tls_client = NativeTlsClient::from(tls_connector); - if !config.hostname_verification { - tls_client.danger_disable_hostname_verification(true); - } - - let mut hyper_client = create_hyper_client(tls_client); - - hyper_client.set_read_timeout(config.timeout); - hyper_client.set_write_timeout(config.timeout); - - Ok(Client { - inner: Arc::new(ClientRef { - gzip: config.gzip, - hyper: hyper_client, - redirect_policy: config.redirect_policy, - referer: config.referer, - }), + ClientHandle::new(self).map(|handle| Client { + inner: handle, }) } @@ -178,7 +106,7 @@ impl ClientBuilder { /// /// This method fails if adding root certificate was unsuccessful. pub fn add_root_certificate(&mut self, cert: Certificate) -> ::Result<&mut ClientBuilder> { - try_!(self.config_mut().tls.add_root_certificate(cert.0)); + self.inner.add_root_certificate(cert)?; Ok(self) } @@ -192,14 +120,14 @@ impl ClientBuilder { /// significant vulnerability to man-in-the-middle attacks. #[inline] pub fn danger_disable_hostname_verification(&mut self) -> &mut ClientBuilder { - self.config_mut().hostname_verification = false; + self.inner.danger_disable_hostname_verification(); self } /// Enable hostname verification. #[inline] pub fn enable_hostname_verification(&mut self) -> &mut ClientBuilder { - self.config_mut().hostname_verification = true; + self.inner.enable_hostname_verification(); self } @@ -208,7 +136,8 @@ impl ClientBuilder { /// Default is enabled. #[inline] pub fn gzip(&mut self, enable: bool) -> &mut ClientBuilder { - self.config_mut().gzip = enable; + self.inner.gzip(enable); + self.gzip = enable; self } @@ -217,7 +146,7 @@ impl ClientBuilder { /// Default will follow redirects up to a maximum of 10. #[inline] pub fn redirect(&mut self, policy: RedirectPolicy) -> &mut ClientBuilder { - self.config_mut().redirect_policy = policy; + self.inner.redirect(policy); self } @@ -226,128 +155,18 @@ impl ClientBuilder { /// Default is `true`. #[inline] pub fn referer(&mut self, enable: bool) -> &mut ClientBuilder { - self.config_mut().referer = enable; + self.inner.referer(enable); self } /// Set a timeout for both the read and write operations of a client. #[inline] pub fn timeout(&mut self, timeout: Duration) -> &mut ClientBuilder { - self.config_mut().timeout = Some(timeout); + self.timeout = Some(timeout); self } - - // private - fn config_mut(&mut self) -> &mut Config { - self.config - .as_mut() - .expect("ClientBuilder cannot be reused after building a Client") - } - - fn take_config(&mut self) -> Config { - self.config - .take() - .expect("ClientBuilder cannot be reused after building a Client") - } } -fn create_hyper_client(tls_client: NativeTlsClient) -> ::hyper::Client { - let mut pool = ::hyper::client::Pool::with_connector( - Default::default(), - ::hyper::net::HttpsConnector::new(tls_client), - ); - // For now, while experiementing, they're constants. - // TODO: maybe make these configurable someday? - pool.set_idle_timeout(Some(Duration::from_secs(60 * 2))); - pool.set_stale_check(|mut check| { - if stream_dead(check.stream()) { - check.stale() - } else { - check.fresh() - } - }); - - let mut hyper_client = ::hyper::Client::with_connector(pool); - - hyper_client.set_redirect_policy(::hyper::client::RedirectPolicy::FollowNone); - hyper_client -} - -fn stream_dead(stream: &::hyper::net::HttpsStream>) -> bool { - match *stream { - ::hyper::net::HttpsStream::Http(ref http) => socket_is_dead(&http.0), - ::hyper::net::HttpsStream::Https(ref https) => socket_is_dead(&https.lock().get_ref().0), - } -} - -#[cfg(unix)] -fn socket_is_dead(socket: &TcpStream) -> bool { - use std::mem; - use std::os::unix::io::AsRawFd; - use std::ptr; - use libc::{FD_SET, select, timeval}; - - let ret = unsafe { - let fd = socket.as_raw_fd(); - let nfds = fd + 1; - - let mut timeout = timeval { - tv_sec: 0, - tv_usec: 0, - }; - - let mut readfs = mem::zeroed(); - let mut errfs = mem::zeroed(); - FD_SET(fd, &mut readfs); - FD_SET(fd, &mut errfs); - select(nfds, &mut readfs, ptr::null_mut(), &mut errfs, &mut timeout) - }; - - // socket was readable (eof), or an error, then it's dead - ret != 0 -} - -#[cfg(windows)] -fn socket_is_dead(socket: &TcpStream) -> bool { - use std::mem; - use std::os::windows::io::{AsRawSocket, RawSocket}; - use std::ptr; - use libc::{c_int, timeval}; - - const FD_SETSIZE: usize = 64; - - #[repr(C)] - struct fd_set { - fd_count: c_int, - fd_array: [RawSocket; FD_SETSIZE], - } - - extern "system" { - fn select(maxfds: c_int, readfs: *mut fd_set, writefs: *mut fd_set, - errfs: *mut fd_set, timeout: *mut timeval) -> c_int; - } - - let ret = unsafe { - let fd = socket.as_raw_socket(); - let nfds = 0; // msdn says nfds is ignored - let mut timeout = timeval { - tv_sec: 0, - tv_usec: 0, - }; - - let mut readfs: fd_set = mem::zeroed(); - let mut errfs: fd_set = mem::zeroed(); - readfs.fd_count = 1; - readfs.fd_array[0] = fd; - errfs.fd_count = 1; - errfs.fd_array[0] = fd; - - select(nfds, &mut readfs, ptr::null_mut(), &mut errfs, &mut timeout) - }; - - // socket was readable (eof), or an error, then it's dead - ret != 0 -} impl Client { /// Constructs a new `Client`. @@ -457,140 +276,110 @@ impl Client { impl fmt::Debug for Client { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("Client") - .field("gzip", &self.inner.gzip) - .field("redirect_policy", &self.inner.redirect_policy) - .field("referer", &self.inner.referer) + //.field("gzip", &self.inner.gzip) + //.field("redirect_policy", &self.inner.redirect_policy) + //.field("referer", &self.inner.referer) .finish() } } -struct ClientRef { +impl fmt::Debug for ClientBuilder { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("ClientBuilder") + .finish() + } +} + +#[derive(Clone)] +struct ClientHandle { gzip: bool, - hyper: ::hyper::Client, - redirect_policy: RedirectPolicy, - referer: bool, + timeout: Option, + tx: Arc } -impl ClientRef { - fn execute_request(&self, req: Request) -> ::Result { - let ( - mut method, - mut url, - mut headers, - mut body - ) = request::pieces(req); +type ThreadSender = mpsc::UnboundedSender<(async_impl::Request, oneshot::Sender<::Result>)>; - if !headers.has::() { - headers.set(UserAgent(DEFAULT_USER_AGENT.to_owned())); - } +impl ClientHandle { + fn new(builder: &mut ClientBuilder) -> ::Result { + use std::thread; - if !headers.has::() { - headers.set(Accept::star()); - } - if self.gzip && - !headers.has::() && - !headers.has::() { - headers.set(AcceptEncoding(vec![qitem(Encoding::Gzip)])); - } + let gzip = builder.gzip; + let timeout = builder.timeout; + let mut builder = async_impl::client::take_builder(&mut builder.inner); + let (tx, rx) = mpsc::unbounded(); + let (spawn_tx, spawn_rx) = oneshot::channel::<::Result<()>>(); + try_!(thread::Builder::new().name("reqwest-internal-sync-core".into()).spawn(move || { + use tokio_core::reactor::Core; - let mut urls = Vec::new(); + let built = (|| { + let core = try_!(Core::new()); + let handle = core.handle(); + let client = builder.build(&handle)?; + Ok((core, handle, client)) + })(); - loop { - let res = { - info!("Request: {:?} {}", method, url); - let mut req = self.hyper.request(method.clone(), url.clone()) - .headers(headers.clone()); - - if let Some(ref mut b) = body { - let body = body::as_hyper_body(b); - req = req.body(body); + let (mut core, handle, client) = match built { + Ok((a, b, c)) => { + if let Err(_) = spawn_tx.send(Ok(())) { + return; + } + (a, b, c) + }, + Err(e) => { + let _ = spawn_tx.send(Err(e)); + return; } - - try_!(req.send(), &url) }; - let should_redirect = match res.status { - StatusCode::MovedPermanently | - StatusCode::Found | - StatusCode::SeeOther => { - body = None; - match method { - Method::Get | Method::Head => {}, - _ => { - method = Method::Get; - } - } - true - }, - StatusCode::TemporaryRedirect | - StatusCode::PermanentRedirect => { - if let Some(ref body) = body { - body::can_reset(body) - } else { - true - } - }, - _ => false, - }; + let work = rx.for_each(|(req, tx)| { + let tx: oneshot::Sender<::Result> = tx; + let task = client.execute(req) + .then(move |x| tx.send(x).map_err(|_| ())); + handle.spawn(task); + Ok(()) + }); - if should_redirect { - let loc = { - let loc = res.headers.get::().map(|loc| url.join(loc)); - if let Some(loc) = loc { - loc - } else { - return Ok(::response::new(res, self.gzip)); - } - }; + // work is Future<(), ()>, and our closure will never return Err + let _ = core.run(work); + })); - url = match loc { - Ok(loc) => { - if self.referer { - if let Some(referer) = make_referer(&loc, &url) { - headers.set(referer); - } - } - urls.push(url); - let action = check_redirect(&self.redirect_policy, &loc, &urls); + wait::timeout(spawn_rx, timeout).expect("core thread cancelled")?; - match action { - redirect::Action::Follow => loc, - redirect::Action::Stop => { - debug!("redirect_policy disallowed redirection to '{}'", loc); - return Ok(::response::new(res, self.gzip)); - }, - redirect::Action::LoopDetected => { - return Err(::error::loop_detected(res.url.clone())); - }, - redirect::Action::TooManyRedirects => { - return Err(::error::too_many_redirects(res.url.clone())); - } - } - }, - Err(e) => { - debug!("Location header had invalid URI: {:?}", e); + Ok(ClientHandle { + gzip: gzip, + timeout: timeout, + tx: Arc::new(tx), + }) + } - return Ok(::response::new(res, self.gzip)); - } - }; + fn execute_request(&self, req: Request) -> ::Result { + let (tx, rx) = oneshot::channel(); + let (req, body) = request::async(req); + let url = req.url().clone(); + self.tx.send((req, tx)).expect("core thread panicked"); - remove_sensitive_headers(&mut headers, &url, &urls); - debug!("redirecting to {:?} '{}'", method, url); - } else { - return Ok(::response::new(res, self.gzip)); - } + if let Some(body) = body { + try_!(body.send(), &url); } + + let res = match wait::timeout(rx, self.timeout) { + Ok(res) => res, + Err(wait::Waited::TimedOut) => return Err(::error::timedout(Some(url))), + Err(wait::Waited::Err(_canceled)) => { + // The only possible reason there would be a Cancelled error + // is if the thread running the Core panicked. We could return + // an Err here, like a BrokenPipe, but the Client is not + // recoverable. Additionally, the panic in the other thread + // is not normal, and should likely be propagated. + panic!("core thread panicked"); + } + }; + res.map(|res| { + response::new(res, self.gzip, self.timeout, KeepCoreThreadAlive(self.tx.clone())) + }) } } -fn make_referer(next: &Url, previous: &Url) -> Option { - if next.scheme() == "http" && previous.scheme() == "https" { - return None; - } +// pub(crate) - let mut referer = previous.clone(); - let _ = referer.set_username(""); - let _ = referer.set_password(None); - referer.set_fragment(None); - Some(Referer(referer.into_string())) -} +pub struct KeepCoreThreadAlive(Arc); diff --git a/src/error.rs b/src/error.rs index a18f506..22769f2 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,5 +1,6 @@ use std::error::Error as StdError; use std::fmt; +use std::io; use Url; @@ -204,8 +205,8 @@ impl StdError for Error { pub enum Kind { Http(::hyper::Error), Url(::url::ParseError), - Tls(::hyper_native_tls::native_tls::Error), - Io(::std::io::Error), + Tls(::native_tls::Error), + Io(io::Error), UrlEncoded(::serde_urlencoded::ser::Error), Json(::serde_json::Error), TooManyRedirects, @@ -218,6 +219,7 @@ impl From<::hyper::Error> for Kind { fn from(err: ::hyper::Error) -> Kind { match err { ::hyper::Error::Io(err) => Kind::Io(err), + /* ::hyper::Error::Uri(err) => Kind::Url(err), ::hyper::Error::Ssl(err) => { match err.downcast() { @@ -225,11 +227,19 @@ impl From<::hyper::Error> for Kind { Err(ssl) => Kind::Http(::hyper::Error::Ssl(ssl)), } } + */ other => Kind::Http(other), } } } +impl From for Kind { + #[inline] + fn from(err: io::Error) -> Kind { + Kind::Io(err) + } +} + impl From<::url::ParseError> for Kind { #[inline] fn from(err: ::url::ParseError) -> Kind { @@ -251,14 +261,35 @@ impl From<::serde_json::Error> for Kind { } } -impl From<::hyper_native_tls::native_tls::Error> for Kind { - fn from(err: ::hyper_native_tls::native_tls::Error) -> Kind { +impl From<::native_tls::Error> for Kind { + fn from(err: ::native_tls::Error) -> Kind { Kind::Tls(err) } } +impl From<::wait::Waited> for Kind +where T: Into { + fn from(err: ::wait::Waited) -> Kind { + match err { + ::wait::Waited::TimedOut => io_timeout().into(), + ::wait::Waited::Err(e) => e.into(), + } + } +} + +#[cfg(unix)] +fn io_timeout() -> io::Error { + io::Error::new(io::ErrorKind::WouldBlock, "timed out") +} + +#[cfg(windows)] +fn io_timeout() -> io::Error { + io::Error::new(io::ErrorKind::TimedOut, "timed out") +} + // pub(crate) +#[allow(missing_debug_implementations)] pub struct InternalFrom(pub T, pub Option); #[doc(hidden)] // https://github.com/rust-lang/rust/issues/42323 @@ -291,12 +322,21 @@ where InternalFrom(err, None).into() } +#[inline] +pub fn into_io(e: Error) -> io::Error { + match e.kind { + Kind::Io(io) => io, + _ => io::Error::new(io::ErrorKind::Other, e), + } +} + + macro_rules! try_ { ($e:expr) => ( match $e { Ok(v) => v, Err(err) => { - return Err(::Error::from(::error::InternalFrom(err, None))); + return Err(::error::from(err)); } } ); @@ -326,6 +366,14 @@ pub fn too_many_redirects(url: Url) -> Error { } } +#[inline] +pub fn timedout(url: Option) -> Error { + Error { + kind: Kind::Io(io_timeout()), + url: url, + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/into_url.rs b/src/into_url.rs new file mode 100644 index 0000000..5c50395 --- /dev/null +++ b/src/into_url.rs @@ -0,0 +1,34 @@ +use url::{Url, ParseError}; + +/// A trait to try to convert some type into a `Url`. +/// +/// This trait is "sealed", such that only types within reqwest can +/// implement it. The reason is that it will eventually be deprecated +/// and removed, when `std::convert::TryFrom` is stabilized. +pub trait IntoUrl: PolyfillTryInto {} + +impl IntoUrl for T {} + +// pub(crate) + +pub trait PolyfillTryInto { + fn into_url(self) -> Result; +} + +impl PolyfillTryInto for Url { + fn into_url(self) -> Result { + Ok(self) + } +} + +impl<'a> PolyfillTryInto for &'a str { + fn into_url(self) -> Result { + Url::parse(self) + } +} + +impl<'a> PolyfillTryInto for &'a String { + fn into_url(self) -> Result { + Url::parse(self) + } +} diff --git a/src/lib.rs b/src/lib.rs index 02939d6..5d2955d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,6 @@ #![deny(warnings)] #![deny(missing_docs)] +#![deny(missing_debug_implementations)] #![doc(html_root_url = "https://docs.rs/reqwest/0.6.2")] //! # reqwest @@ -16,10 +17,7 @@ //! //! The `reqwest::Client` is synchronous, making it a great fit for //! applications that only require a few HTTP requests, and wish to handle -//! them synchronously. When [hyper][] releases with asynchronous support, -//! `reqwest` will be updated to use it internally, but still provide a -//! synchronous Client, for convenience. A `reqwest::async::Client` will also -//! be added. +//! them synchronously. //! //! ## Making a GET request //! @@ -119,41 +117,76 @@ //! [get]: ./fn.get.html //! [builder]: ./client/struct.RequestBuilder.html //! [serde]: http://serde.rs -extern crate hyper; +extern crate bytes; +#[macro_use] +extern crate futures; +extern crate hyper; +extern crate hyper_tls; #[macro_use] extern crate log; -extern crate libc; extern crate libflate; -extern crate hyper_native_tls; +extern crate native_tls; extern crate serde; extern crate serde_json; extern crate serde_urlencoded; +extern crate tokio_core; extern crate url; -pub use hyper::client::IntoUrl; -pub use hyper::Error as HyperError; pub use hyper::header; pub use hyper::mime; -pub use hyper::method::Method; -pub use hyper::status::StatusCode; -pub use hyper::Url; +pub use hyper::Method; +pub use hyper::StatusCode; +pub use url::Url; pub use url::ParseError as UrlError; -pub use self::client::{Certificate, Client, ClientBuilder}; +pub use self::client::{Client, ClientBuilder}; pub use self::error::{Error, Result}; pub use self::body::Body; +pub use self::into_url::IntoUrl; pub use self::redirect::{RedirectAction, RedirectAttempt, RedirectPolicy}; pub use self::request::{Request, RequestBuilder}; pub use self::response::Response; +pub use self::tls::Certificate; + +// this module must be first because of the `try_` macro #[macro_use] mod error; + +/// A set of unstable functionality. +/// +/// This module is only available when the `unstable` feature is enabled. +/// There is no backwards compatibility guarantee for any of the types within. +#[cfg(feature = "unstable")] +pub mod unstable { + /// An 'async' implementation of the reqwest `Client`. + /// + /// Relies on the `futures` crate, which is unstable, hence this module + /// is unstable. + pub mod async { + pub use ::async_impl::{ + Body, + Chunk, + Client, + ClientBuilder, + Request, + RequestBuilder, + Response, + }; + } +} + + +mod async_impl; mod body; mod client; +mod into_url; mod redirect; mod request; mod response; +mod tls; +mod wait; /// Shortcut method to quickly make a `GET` request. diff --git a/src/redirect.rs b/src/redirect.rs index 2922802..04d4f5a 100644 --- a/src/redirect.rs +++ b/src/redirect.rs @@ -265,7 +265,9 @@ fn test_remove_sensitive_headers() { let mut headers = Headers::new(); headers.set(Accept::star()); headers.set(Authorization("let me in".to_owned())); - headers.set(Cookie(vec![String::from("foo=bar")])); + let mut cookie = Cookie::new(); + cookie.set("foo", "bar"); + headers.set(cookie); let next = Url::parse("http://initial-domain.com/path").unwrap(); let mut prev = vec![Url::parse("http://initial-domain.com/new_path").unwrap()]; diff --git a/src/request.rs b/src/request.rs index 65288e1..6def7bb 100644 --- a/src/request.rs +++ b/src/request.rs @@ -5,15 +5,14 @@ use serde::Serialize; use serde_json; use serde_urlencoded; +use body::{self, Body}; use header::Headers; -use {Body, Client, Method, Url}; +use {async_impl, Client, Method, Url}; /// A request which can be executed with `Client::execute()`. pub struct Request { - method: Method, - url: Url, - headers: Headers, body: Option, + inner: async_impl::Request, } /// A builder to construct the properties of a `Request`. @@ -27,47 +26,45 @@ impl Request { #[inline] pub fn new(method: Method, url: Url) -> Self { Request { - method, - url, - headers: Headers::new(), body: None, + inner: async_impl::Request::new(method, url), } } /// Get the method. #[inline] pub fn method(&self) -> &Method { - &self.method + self.inner.method() } /// Get a mutable reference to the method. #[inline] pub fn method_mut(&mut self) -> &mut Method { - &mut self.method + self.inner.method_mut() } /// Get the url. #[inline] pub fn url(&self) -> &Url { - &self.url + self.inner.url() } /// Get a mutable reference to the url. #[inline] pub fn url_mut(&mut self) -> &mut Url { - &mut self.url + self.inner.url_mut() } /// Get the headers. #[inline] pub fn headers(&self) -> &Headers { - &self.headers + self.inner.headers() } /// Get a mutable reference to the headers. #[inline] pub fn headers_mut(&mut self) -> &mut Headers { - &mut self.headers + self.inner.headers_mut() } /// Get the body. @@ -92,18 +89,19 @@ impl RequestBuilder { /// # fn run() -> Result<(), Box<::std::error::Error>> { /// let client = reqwest::Client::new()?; /// let res = client.get("https://www.rust-lang.org")? - /// .header(UserAgent("foo".to_string())) + /// .header(UserAgent::new("foo")) /// .send()?; /// # Ok(()) /// # } /// ``` pub fn header(&mut self, header: H) -> &mut RequestBuilder where - H: ::header::Header + ::header::HeaderFormat, + H: ::header::Header, { - self.request_mut().headers.set(header); + self.request_mut().headers_mut().set(header); self } + /// Add a set of Headers to the existing ones on this Request. /// /// The headers will be merged in to any already set. @@ -114,7 +112,7 @@ impl RequestBuilder { /// /// fn construct_headers() -> Headers { /// let mut headers = Headers::new(); - /// headers.set(UserAgent("reqwest".to_string())); + /// headers.set(UserAgent::new("reqwest")); /// headers.set(ContentType::png()); /// headers /// } @@ -130,7 +128,7 @@ impl RequestBuilder { /// # } /// ``` pub fn headers(&mut self, headers: ::header::Headers) -> &mut RequestBuilder { - self.request_mut().headers.extend(headers.iter()); + self.request_mut().headers_mut().extend(headers.iter()); self } @@ -193,7 +191,7 @@ impl RequestBuilder { /// # } /// ``` pub fn body>(&mut self, body: T) -> &mut RequestBuilder { - self.request_mut().body = Some(body.into()); + *self.request_mut().body_mut() = Some(body.into()); self } @@ -224,12 +222,13 @@ impl RequestBuilder { /// This method fails if the passed value cannot be serialized into /// url encoded format pub fn form(&mut self, form: &T) -> ::Result<&mut RequestBuilder> { + { // check request_mut() before running serde let mut req = self.request_mut(); let body = try_!(serde_urlencoded::to_string(form)); - req.headers.set(ContentType::form_url_encoded()); - req.body = Some(body.into()); + req.headers_mut().set(ContentType::form_url_encoded()); + *req.body_mut() = Some(body.into()); } Ok(self) } @@ -264,8 +263,8 @@ impl RequestBuilder { // check request_mut() before running serde let mut req = self.request_mut(); let body = try_!(serde_json::to_vec(json)); - req.headers.set(ContentType::json()); - req.body = Some(body.into()); + req.headers_mut().set(ContentType::json()); + *req.body_mut() = Some(body.into()); } Ok(self) } @@ -324,9 +323,9 @@ impl fmt::Debug for RequestBuilder { } fn fmt_request_fields<'a, 'b>(f: &'a mut fmt::DebugStruct<'a, 'b>, req: &Request) -> &'a mut fmt::DebugStruct<'a, 'b> { - f.field("method", &req.method) - .field("url", &req.url) - .field("headers", &req.headers) + f.field("method", req.method()) + .field("url", req.url()) + .field("headers", req.headers()) } // pub(crate) @@ -340,16 +339,25 @@ pub fn builder(client: Client, req: Request) -> RequestBuilder { } #[inline] -pub fn pieces(req: Request) -> (Method, Url, Headers, Option) { - (req.method, req.url, req.headers, req.body) +pub fn async(req: Request) -> (async_impl::Request, Option) { + use header::ContentLength; + + let mut req_async = req.inner; + let body = req.body.and_then(|body| { + let (tx, body, len) = body::async(body); + if let Some(len) = len { + req_async.headers_mut().set(ContentLength(len)); + } + *req_async.body_mut() = Some(body); + tx + }); + (req_async, body) } #[cfg(test)] mod tests { - use body; - use client::Client; - use hyper::method::Method; - use hyper::header::{Host, Headers, ContentType}; + use {body, Client, Method}; + use header::{Host, Headers, ContentType}; use std::collections::HashMap; use serde_urlencoded; use serde_json; @@ -360,8 +368,8 @@ mod tests { let some_url = "https://google.com/"; let r = client.get(some_url).unwrap().build(); - assert_eq!(r.method, Method::Get); - assert_eq!(r.url.as_str(), some_url); + assert_eq!(r.method(), &Method::Get); + assert_eq!(r.url().as_str(), some_url); } #[test] @@ -370,8 +378,8 @@ mod tests { let some_url = "https://google.com/"; let r = client.head(some_url).unwrap().build(); - assert_eq!(r.method, Method::Head); - assert_eq!(r.url.as_str(), some_url); + assert_eq!(r.method(), &Method::Head); + assert_eq!(r.url().as_str(), some_url); } #[test] @@ -380,8 +388,8 @@ mod tests { let some_url = "https://google.com/"; let r = client.post(some_url).unwrap().build(); - assert_eq!(r.method, Method::Post); - assert_eq!(r.url.as_str(), some_url); + assert_eq!(r.method(), &Method::Post); + assert_eq!(r.url().as_str(), some_url); } #[test] @@ -390,8 +398,8 @@ mod tests { let some_url = "https://google.com/"; let r = client.put(some_url).unwrap().build(); - assert_eq!(r.method, Method::Put); - assert_eq!(r.url.as_str(), some_url); + assert_eq!(r.method(), &Method::Put); + assert_eq!(r.url().as_str(), some_url); } #[test] @@ -400,8 +408,8 @@ mod tests { let some_url = "https://google.com/"; let r = client.patch(some_url).unwrap().build(); - assert_eq!(r.method, Method::Patch); - assert_eq!(r.url.as_str(), some_url); + assert_eq!(r.method(), &Method::Patch); + assert_eq!(r.url().as_str(), some_url); } #[test] @@ -410,8 +418,8 @@ mod tests { let some_url = "https://google.com/"; let r = client.delete(some_url).unwrap().build(); - assert_eq!(r.method, Method::Delete); - assert_eq!(r.url.as_str(), some_url); + assert_eq!(r.method(), &Method::Delete); + assert_eq!(r.url().as_str(), some_url); } #[test] @@ -420,16 +428,13 @@ mod tests { let some_url = "https://google.com/"; let mut r = client.post(some_url).unwrap(); - let header = Host { - hostname: "google.com".to_string(), - port: None, - }; + let header = Host::new("google.com", None); // Add a copy of the header to the request builder let r = r.header(header.clone()).build(); // then check it was actually added - assert_eq!(r.headers.get::(), Some(&header)); + assert_eq!(r.headers().get::(), Some(&header)); } #[test] @@ -438,10 +443,7 @@ mod tests { let some_url = "https://google.com/"; let mut r = client.post(some_url).unwrap(); - let header = Host { - hostname: "google.com".to_string(), - port: None, - }; + let header = Host::new("google.com", None); let mut headers = Headers::new(); headers.set(header); @@ -450,7 +452,7 @@ mod tests { let r = r.headers(headers.clone()).build(); // then make sure they were added correctly - assert_eq!(r.headers, headers); + assert_eq!(r.headers(), &headers); } #[test] @@ -461,9 +463,9 @@ mod tests { let body = "Some interesting content"; - let r = r.body(body).build(); + let mut r = r.body(body).build(); - let buf = body::read_to_string(r.body.unwrap()).unwrap(); + let buf = body::read_to_string(r.body_mut().take().unwrap()).unwrap(); assert_eq!(buf, body); } @@ -477,13 +479,13 @@ mod tests { let mut form_data = HashMap::new(); form_data.insert("foo", "bar"); - let r = r.form(&form_data).unwrap().build(); + let mut r = r.form(&form_data).unwrap().build(); // Make sure the content type was set - assert_eq!(r.headers.get::(), + assert_eq!(r.headers().get::(), Some(&ContentType::form_url_encoded())); - let buf = body::read_to_string(r.body.unwrap()).unwrap(); + let buf = body::read_to_string(r.body_mut().take().unwrap()).unwrap(); let body_should_be = serde_urlencoded::to_string(&form_data).unwrap(); assert_eq!(buf, body_should_be); @@ -498,12 +500,12 @@ mod tests { let mut json_data = HashMap::new(); json_data.insert("foo", "bar"); - let r = r.json(&json_data).unwrap().build(); + let mut r = r.json(&json_data).unwrap().build(); // Make sure the content type was set - assert_eq!(r.headers.get::(), Some(&ContentType::json())); + assert_eq!(r.headers().get::(), Some(&ContentType::json())); - let buf = body::read_to_string(r.body.unwrap()).unwrap(); + let buf = body::read_to_string(r.body_mut().take().unwrap()).unwrap(); let body_should_be = serde_json::to_string(&json_data).unwrap(); assert_eq!(buf, body_should_be); @@ -525,7 +527,7 @@ mod tests { let client = Client::new().unwrap(); let some_url = "https://google.com/"; let mut r = client.post(some_url).unwrap(); - let json_data = MyStruct{}; + let json_data = MyStruct; assert!(r.json(&json_data).unwrap_err().is_serialization()); } } diff --git a/src/response.rs b/src/response.rs index bfda894..05a9c63 100644 --- a/src/response.rs +++ b/src/response.rs @@ -1,45 +1,26 @@ use std::fmt; use std::io::{self, Read}; +use std::time::Duration; -use hyper::header::{Headers, ContentEncoding, ContentLength, Encoding, TransferEncoding}; -use hyper::status::StatusCode; -use hyper::Url; use libflate::gzip; use serde::de::DeserializeOwned; use serde_json; +use client::KeepCoreThreadAlive; +use header::{Headers, ContentEncoding, ContentLength, Encoding, TransferEncoding}; +use {async_impl, StatusCode, Url, wait}; + /// A Response to a submitted `Request`. pub struct Response { - inner: Decoder, -} - -pub fn new(res: ::hyper::client::Response, gzip: bool) -> Response { - info!("Response: '{}' for {}", res.status, res.url); - Response { - inner: Decoder::from_hyper_response(res, gzip), - } + body: Decoder, + inner: async_impl::Response, + _thread_handle: KeepCoreThreadAlive, } impl fmt::Debug for Response { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match self.inner { - Decoder::PlainText(ref hyper_response) => { - f.debug_struct("Response") - .field("url", &hyper_response.url) - .field("status", &hyper_response.status) - .field("headers", &hyper_response.headers) - .finish() - } - Decoder::Gzip { ref head, .. } | - Decoder::Errored { ref head, .. } => { - f.debug_struct("Response") - .field("url", &head.url) - .field("status", &head.status) - .field("headers", &head.headers) - .finish() - } - } + fmt::Debug::fmt(&self.inner, f) } } @@ -55,11 +36,7 @@ impl Response { /// ``` #[inline] pub fn url(&self) -> &Url { - match self.inner { - Decoder::PlainText(ref hyper_response) => &hyper_response.url, - Decoder::Gzip { ref head, .. } | - Decoder::Errored { ref head, .. } => &head.url, - } + self.inner.url() } /// Get the `StatusCode` of this `Response`. @@ -98,11 +75,7 @@ impl Response { /// ``` #[inline] pub fn status(&self) -> StatusCode { - match self.inner { - Decoder::PlainText(ref hyper_response) => hyper_response.status, - Decoder::Gzip { ref head, .. } | - Decoder::Errored { ref head, .. } => head.status, - } + self.inner.status() } /// Get the `Headers` of this `Response`. @@ -133,11 +106,7 @@ impl Response { /// ``` #[inline] pub fn headers(&self) -> &Headers { - match self.inner { - Decoder::PlainText(ref hyper_response) => &hyper_response.headers, - Decoder::Gzip { ref head, .. } | - Decoder::Errored { ref head, .. } => &head.headers, - } + self.inner.headers() } /// Try and deserialize the response body as JSON using `serde`. @@ -151,12 +120,12 @@ impl Response { /// # use reqwest::Error; /// # /// #[derive(Deserialize)] - /// struct Response { + /// struct Ip { /// origin: String, /// } /// /// # fn run() -> Result<(), Error> { - /// let resp: Response = reqwest::get("http://httpbin.org/ip")?.json()?; + /// let json: Ip = reqwest::get("http://httpbin.org/ip")?.json()?; /// # Ok(()) /// # } /// # @@ -171,24 +140,98 @@ impl Response { /// [`serde_json::from_reader`]: https://docs.serde.rs/serde_json/fn.from_reader.html #[inline] pub fn json(&mut self) -> ::Result { + // There's 2 ways we could implement this: + // + // 1. Just using from_reader(self), making use of our blocking read adapter + // 2. Just use self.inner.json().wait() + // + // Doing 1 is pretty easy, but it means we have the `serde_json` code + // in more than one place, doing basically the same thing. + // + // Doing 2 would mean `serde_json` is only in one place, but we'd + // need to update the sync Response to lazily make a blocking read + // adapter, so that our `inner` could possibly still have the original + // body. + // + // Went for easier for now, just to get it working. serde_json::from_reader(self).map_err(::error::from) } } +impl Read for Response { + #[inline] + fn read(&mut self, buf: &mut [u8]) -> io::Result { + self.body.read(buf) + } +} + +struct ReadableBody { + state: ReadState, + stream: wait::WaitStream, +} + +enum ReadState { + Ready(async_impl::Chunk, usize), + NotReady, + Eof, +} + + +impl Read for ReadableBody { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + use std::cmp; + + loop { + let ret; + match self.state { + ReadState::Ready(ref mut chunk, ref mut pos) => { + let chunk_start = *pos; + let len = cmp::min(buf.len(), chunk.len() - chunk_start); + let chunk_end = chunk_start + len; + buf[..len].copy_from_slice(&chunk[chunk_start..chunk_end]); + *pos += len; + if *pos == chunk.len() { + ret = len; + } else { + return Ok(len); + } + }, + ReadState::NotReady => { + match self.stream.next() { + Some(Ok(chunk)) => { + self.state = ReadState::Ready(chunk, 0); + continue; + }, + Some(Err(e)) => { + let req_err = match e { + wait::Waited::TimedOut => ::error::timedout(None), + wait::Waited::Err(e) => e, + }; + return Err(::error::into_io(req_err)); + }, + None => { + self.state = ReadState::Eof; + return Ok(0); + }, + } + }, + ReadState::Eof => return Ok(0), + } + self.state = ReadState::NotReady; + return Ok(ret); + } + } +} + + enum Decoder { /// A `PlainText` decoder just returns the response content as is. - PlainText(::hyper::client::Response), + PlainText(ReadableBody), /// A `Gzip` decoder will uncompress the gziped response content before returning it. - Gzip { - decoder: gzip::Decoder, - head: Head, - }, + Gzip(gzip::Decoder), /// An error occured reading the Gzip header, so return that error /// when the user tries to read on the `Response`. - Errored { - err: Option, - head: Head, - } + Errored(Option), } impl Decoder { @@ -198,22 +241,28 @@ impl Decoder { /// how to decode the content body of the request. /// /// Uses the correct variant by inspecting the Content-Encoding header. - fn from_hyper_response(mut res: ::hyper::client::Response, check_gzip: bool) -> Self { + fn new(res: &mut async_impl::Response, check_gzip: bool, timeout: Option) -> Self { + let body = async_impl::body::take(res.body_mut()); + let body = ReadableBody { + state: ReadState::NotReady, + stream: wait::stream(body, timeout), + }; + if !check_gzip { - return Decoder::PlainText(res); + return Decoder::PlainText(body); } let content_encoding_gzip: bool; let mut is_gzip = { - content_encoding_gzip = res.headers + content_encoding_gzip = res.headers() .get::() .map_or(false, |encs| encs.contains(&Encoding::Gzip)); content_encoding_gzip || - res.headers + res.headers() .get::() .map_or(false, |encs| encs.contains(&Encoding::Gzip)) }; if is_gzip { - if let Some(content_length) = res.headers.get::() { + if let Some(content_length) = res.headers().get::() { if content_length.0 == 0 { warn!("GZipped response with content-length of 0"); is_gzip = false; @@ -221,68 +270,41 @@ impl Decoder { } } if content_encoding_gzip { - res.headers.remove::(); - res.headers.remove::(); + res.headers_mut().remove::(); + res.headers_mut().remove::(); } if is_gzip { - new_gzip(res) + new_gzip(body) } else { - Decoder::PlainText(res) + Decoder::PlainText(body) } } } -fn new_gzip(mut res: ::hyper::client::Response) -> Decoder { +fn new_gzip(mut body: ReadableBody) -> Decoder { // libflate does a read_exact([0; 2]), so its impossible to tell // if the stream was empty, or truly had an UnexpectedEof. // Therefore, we need to peek a byte to make check for EOF first. let mut peek = [0]; - match res.read(&mut peek) { - Ok(0) => return Decoder::PlainText(res), - Ok(n) => { - debug_assert_eq!(n, 1); - } - Err(e) => return Decoder::Errored { - err: Some(e), - head: Head { - headers: res.headers.clone(), - status: res.status, - url: res.url.clone(), - } - } + match body.read(&mut peek) { + Ok(0) => return Decoder::PlainText(body), + Ok(n) => debug_assert_eq!(n, 1), + Err(e) => return Decoder::Errored(Some(e)), } - let head = Head { - headers: res.headers.clone(), - status: res.status, - url: res.url.clone(), - }; - let reader = Peeked { peeked: Some(peek[0]), - inner: res, + inner: body, }; match gzip::Decoder::new(reader) { - Ok(gzip) => Decoder::Gzip { - decoder: gzip, - head: head, - }, - Err(e) => Decoder::Errored { - err: Some(e), - head: head, - } + Ok(gzip) => Decoder::Gzip(gzip), + Err(e) => Decoder::Errored(Some(e)), } } -struct Head { - headers: ::hyper::header::Headers, - url: ::hyper::Url, - status: ::hyper::status::StatusCode, -} - struct Peeked { peeked: Option, - inner: ::hyper::client::Response, + inner: ReadableBody, } impl Read for Peeked { @@ -303,9 +325,9 @@ impl Read for Peeked { impl Read for Decoder { fn read(&mut self, buf: &mut [u8]) -> io::Result { match *self { - Decoder::PlainText(ref mut hyper_response) => hyper_response.read(buf), - Decoder::Gzip { ref mut decoder, .. } => decoder.read(buf), - Decoder::Errored { ref mut err, .. } => { + Decoder::PlainText(ref mut body) => body.read(buf), + Decoder::Gzip(ref mut decoder) => decoder.read(buf), + Decoder::Errored(ref mut err) => { Err(err.take().unwrap_or_else(previously_errored)) } } @@ -317,10 +339,15 @@ fn previously_errored() -> io::Error { io::Error::new(io::ErrorKind::Other, "permanently errored") } -/// Read the body of the Response. -impl Read for Response { - #[inline] - fn read(&mut self, buf: &mut [u8]) -> io::Result { - self.inner.read(buf) + +// pub(crate) + +pub fn new(mut res: async_impl::Response, gzip: bool, timeout: Option, thread: KeepCoreThreadAlive) -> Response { + + let decoder = Decoder::new(&mut res, gzip, timeout); + Response { + body: decoder, + inner: res, + _thread_handle: thread, } } diff --git a/src/tls.rs b/src/tls.rs new file mode 100644 index 0000000..9095398 --- /dev/null +++ b/src/tls.rs @@ -0,0 +1,45 @@ +use std::fmt; +use native_tls; + +/// Represent an X509 certificate. +pub struct Certificate(native_tls::Certificate); + +impl Certificate { + /// Create a `Certificate` from a binary DER encoded certificate + /// + /// # Examples + /// + /// ``` + /// # use std::fs::File; + /// # use std::io::Read; + /// # fn cert() -> Result<(), Box> { + /// let mut buf = Vec::new(); + /// File::open("my_cert.der")? + /// .read_to_end(&mut buf)?; + /// let cert = reqwest::Certificate::from_der(&buf)?; + /// # drop(cert); + /// # Ok(()) + /// # } + /// ``` + /// + /// # Errors + /// + /// If the provided buffer is not valid DER, an error will be returned. + pub fn from_der(der: &[u8]) -> ::Result { + let inner = try_!(native_tls::Certificate::from_der(der)); + Ok(Certificate(inner)) + } +} + +impl fmt::Debug for Certificate { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("Certificate") + .finish() + } +} + +// pub(crate) + +pub fn cert(cert: Certificate) -> native_tls::Certificate { + cert.0 +} diff --git a/src/wait.rs b/src/wait.rs new file mode 100644 index 0000000..0890967 --- /dev/null +++ b/src/wait.rs @@ -0,0 +1,167 @@ +use std::sync::Arc; +use std::thread; +use std::time::{Duration, Instant}; + +use futures::{Async, AsyncSink, Future, Sink, Stream}; +use futures::executor::{self, Notify}; + +// pub(crate) + + +pub fn timeout(fut: F, timeout: Option) -> Result> +where F: Future { + if let Some(dur) = timeout { + let start = Instant::now(); + let deadline = start + dur; + let mut task = executor::spawn(fut); + let notify = Arc::new(ThreadNotify { + thread: thread::current(), + }); + + loop { + let now = Instant::now(); + if now >= deadline { + return Err(Waited::TimedOut); + } + match task.poll_future_notify(¬ify, 0)? { + Async::Ready(val) => return Ok(val), + Async::NotReady => { + thread::park_timeout(deadline - now); + } + } + } + } else { + fut.wait().map_err(From::from) + } +} + +pub fn stream(stream: S, timeout: Option) -> WaitStream +where S: Stream { + WaitStream { + stream: executor::spawn(stream), + timeout: timeout, + } +} + +pub fn sink(sink: S, timeout: Option) -> WaitSink +where S: Sink { + WaitSink { + sink: executor::spawn(sink), + timeout: timeout, + } +} + +#[derive(Debug)] +pub enum Waited { + TimedOut, + Err(E), +} + +impl From for Waited { + fn from(err: E) -> Waited { + Waited::Err(err) + } +} + +pub struct WaitStream { + stream: executor::Spawn, + timeout: Option, +} + +impl Iterator for WaitStream +where S: Stream { + type Item = Result>; + + fn next(&mut self) -> Option { + if let Some(dur) = self.timeout { + let start = Instant::now(); + let deadline = start + dur; + let notify = Arc::new(ThreadNotify { + thread: thread::current(), + }); + + loop { + let now = Instant::now(); + if now >= deadline { + return Some(Err(Waited::TimedOut)); + } + match self.stream.poll_stream_notify(¬ify, 0) { + Ok(Async::Ready(Some(val))) => return Some(Ok(val)), + Ok(Async::Ready(None)) => return None, + Ok(Async::NotReady) => { + thread::park_timeout(deadline - now); + }, + Err(e) => return Some(Err(Waited::Err(e))), + } + } + } else { + let notify = Arc::new(ThreadNotify { + thread: thread::current(), + }); + + loop { + match self.stream.poll_stream_notify(¬ify, 0) { + Ok(Async::Ready(Some(val))) => return Some(Ok(val)), + Ok(Async::Ready(None)) => return None, + Ok(Async::NotReady) => { + thread::park(); + }, + Err(e) => return Some(Err(Waited::Err(e))), + } + } + } + } +} + +pub struct WaitSink { + sink: executor::Spawn, + timeout: Option, +} + +impl WaitSink +where S: Sink { + pub fn send(&mut self, mut item: S::SinkItem) -> Result<(), Waited> { + if let Some(dur) = self.timeout { + + let start = Instant::now(); + let deadline = start + dur; + let notify = Arc::new(ThreadNotify { + thread: thread::current(), + }); + + loop { + let now = Instant::now(); + if now >= deadline { + return Err(Waited::TimedOut); + } + item = match self.sink.start_send_notify(item, ¬ify, 0)? { + AsyncSink::Ready => return Ok(()), + AsyncSink::NotReady(val) => val, + }; + thread::park_timeout(deadline - now); + } + } else { + let notify = Arc::new(ThreadNotify { + thread: thread::current(), + }); + + loop { + item = match self.sink.start_send_notify(item, ¬ify, 0)? { + AsyncSink::Ready => return Ok(()), + AsyncSink::NotReady(val) => val, + }; + thread::park(); + } + } + } +} + +struct ThreadNotify { + thread: thread::Thread, +} + +impl Notify for ThreadNotify { + fn notify(&self, _id: usize) { + self.thread.unpark(); + } +} diff --git a/tests/client.rs b/tests/client.rs index 9e28dd4..a0012b1 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -1,11 +1,9 @@ extern crate reqwest; -extern crate libflate; #[macro_use] -mod server; +mod support; use std::io::Read; -use std::io::prelude::*; #[test] fn test_get() { @@ -32,7 +30,7 @@ fn test_get() { assert_eq!(res.url().as_str(), &url); assert_eq!(res.status(), reqwest::StatusCode::Ok); assert_eq!(res.headers().get(), - Some(&reqwest::header::Server("test".to_string()))); + Some(&reqwest::header::Server::new("test".to_string()))); assert_eq!(res.headers().get(), Some(&reqwest::header::ContentLength(0))); @@ -42,473 +40,43 @@ fn test_get() { } #[test] -fn test_redirect_301_and_302_and_303_changes_post_to_get() { - let client = reqwest::Client::new().unwrap(); - let codes = [301, 302, 303]; - - for code in codes.iter() { - let redirect = server! { - request: format!("\ - POST /{} HTTP/1.1\r\n\ - Host: $HOST\r\n\ - User-Agent: $USERAGENT\r\n\ - Accept: */*\r\n\ - Accept-Encoding: gzip\r\n\ - Content-Length: 0\r\n\ - \r\n\ - ", code), - response: format!("\ - HTTP/1.1 {} reason\r\n\ - Server: test-redirect\r\n\ - Content-Length: 0\r\n\ - Location: /dst\r\n\ - Connection: close\r\n\ - \r\n\ - ", code), - - request: format!("\ - GET /dst HTTP/1.1\r\n\ - Host: $HOST\r\n\ - User-Agent: $USERAGENT\r\n\ - Accept: */*\r\n\ - Accept-Encoding: gzip\r\n\ - Referer: http://$HOST/{}\r\n\ - \r\n\ - ", code), - response: b"\ - HTTP/1.1 200 OK\r\n\ - Server: test-dst\r\n\ - Content-Length: 0\r\n\ - \r\n\ - " - }; - - let url = format!("http://{}/{}", redirect.addr(), code); - let dst = format!("http://{}/{}", redirect.addr(), "dst"); - let res = client.post(&url) - .unwrap() - .send() - .unwrap(); - assert_eq!(res.url().as_str(), dst); - assert_eq!(res.status(), reqwest::StatusCode::Ok); - assert_eq!(res.headers().get(), - Some(&reqwest::header::Server("test-dst".to_string()))); - } -} - -#[test] -fn test_redirect_307_and_308_tries_to_post_again() { - let client = reqwest::Client::new().unwrap(); - let codes = [307, 308]; - for code in codes.iter() { - let redirect = server! { - request: format!("\ - POST /{} HTTP/1.1\r\n\ - Host: $HOST\r\n\ - User-Agent: $USERAGENT\r\n\ - Accept: */*\r\n\ - Accept-Encoding: gzip\r\n\ - Content-Length: 5\r\n\ - \r\n\ - Hello\ - ", code), - response: format!("\ - HTTP/1.1 {} reason\r\n\ - Server: test-redirect\r\n\ - Content-Length: 0\r\n\ - Location: /dst\r\n\ - Connection: close\r\n\ - \r\n\ - ", code), - - request: format!("\ - POST /dst HTTP/1.1\r\n\ - Host: $HOST\r\n\ - User-Agent: $USERAGENT\r\n\ - Accept: */*\r\n\ - Accept-Encoding: gzip\r\n\ - Referer: http://$HOST/{}\r\n\ - Content-Length: 5\r\n\ - \r\n\ - Hello\ - ", code), - response: b"\ - HTTP/1.1 200 OK\r\n\ - Server: test-dst\r\n\ - Content-Length: 0\r\n\ - \r\n\ - " - }; - - let url = format!("http://{}/{}", redirect.addr(), code); - let dst = format!("http://{}/{}", redirect.addr(), "dst"); - let res = client.post(&url) - .unwrap() - .body("Hello") - .send() - .unwrap(); - assert_eq!(res.url().as_str(), dst); - assert_eq!(res.status(), reqwest::StatusCode::Ok); - assert_eq!(res.headers().get(), - Some(&reqwest::header::Server("test-dst".to_string()))); - } -} - -#[test] -fn test_redirect_307_does_not_try_if_reader_cannot_reset() { - let client = reqwest::Client::new().unwrap(); - let codes = [307, 308]; - for &code in codes.iter() { - let redirect = server! { - request: format!("\ - POST /{} HTTP/1.1\r\n\ - Host: $HOST\r\n\ - User-Agent: $USERAGENT\r\n\ - Accept: */*\r\n\ - Accept-Encoding: gzip\r\n\ - Transfer-Encoding: chunked\r\n\ - \r\n\ - 5\r\n\ - Hello\r\n\ - 0\r\n\r\n\ - ", code), - response: format!("\ - HTTP/1.1 {} reason\r\n\ - Server: test-redirect\r\n\ - Content-Length: 0\r\n\ - Location: /dst\r\n\ - Connection: close\r\n\ - \r\n\ - ", code) - }; - - let url = format!("http://{}/{}", redirect.addr(), code); - let res = client - .post(&url) - .unwrap() - .body(reqwest::Body::new(&b"Hello"[..])) - .send() - .unwrap(); - assert_eq!(res.url().as_str(), url); - assert_eq!(res.status(), reqwest::StatusCode::from_u16(code)); - } -} - -#[test] -fn test_redirect_removes_sensitive_headers() { - let end_server = server! { +fn test_post() { + let server = server! { request: b"\ - GET /otherhost HTTP/1.1\r\n\ + POST /2 HTTP/1.1\r\n\ Host: $HOST\r\n\ + Content-Length: 5\r\n\ User-Agent: $USERAGENT\r\n\ Accept: */*\r\n\ Accept-Encoding: gzip\r\n\ \r\n\ + Hello\ ", response: b"\ HTTP/1.1 200 OK\r\n\ - Server: test\r\n\ + Server: post\r\n\ Content-Length: 0\r\n\ \r\n\ " }; - let mid_server = server! { - request: b"\ - GET /sensitive HTTP/1.1\r\n\ - Host: $HOST\r\n\ - Cookie: foo=bar\r\n\ - User-Agent: $USERAGENT\r\n\ - Accept: */*\r\n\ - Accept-Encoding: gzip\r\n\ - \r\n\ - ", - response: format!("\ - HTTP/1.1 302 Found\r\n\ - Server: test\r\n\ - Location: http://{}/otherhost\r\n\ - Content-Length: 0\r\n\ - \r\n\ - ", end_server.addr()) - }; - - reqwest::Client::builder() + let url = format!("http://{}/2", server.addr()); + let mut res = reqwest::Client::new() .unwrap() - .referer(false) - .build() - .unwrap() - .get(&format!("http://{}/sensitive", mid_server.addr())) - .unwrap() - .header(reqwest::header::Cookie(vec![String::from("foo=bar")])) - .send() - .unwrap(); -} - -#[test] -fn test_redirect_policy_can_return_errors() { - let server = server! { - request: b"\ - GET /loop HTTP/1.1\r\n\ - Host: $HOST\r\n\ - User-Agent: $USERAGENT\r\n\ - Accept: */*\r\n\ - Accept-Encoding: gzip\r\n\ - \r\n\ - ", - response: b"\ - HTTP/1.1 302 Found\r\n\ - Server: test\r\n\ - Location: /loop\r\n\ - Content-Length: 0\r\n\ - \r\n\ - " - }; - - let err = reqwest::get(&format!("http://{}/loop", server.addr())).unwrap_err(); - assert!(err.is_redirect()); -} - -#[test] -fn test_redirect_policy_can_stop_redirects_without_an_error() { - let server = server! { - request: b"\ - GET /no-redirect HTTP/1.1\r\n\ - Host: $HOST\r\n\ - User-Agent: $USERAGENT\r\n\ - Accept: */*\r\n\ - Accept-Encoding: gzip\r\n\ - \r\n\ - ", - response: b"\ - HTTP/1.1 302 Found\r\n\ - Server: test-dont\r\n\ - Location: /dont\r\n\ - Content-Length: 0\r\n\ - \r\n\ - " - }; - - let url = format!("http://{}/no-redirect", server.addr()); - - let res = reqwest::Client::builder() - .unwrap() - .redirect(reqwest::RedirectPolicy::none()) - .build() - .unwrap() - .get(&url) + .post(&url) .unwrap() + .body("Hello") .send() .unwrap(); - assert_eq!(res.url().as_str(), url); - assert_eq!(res.status(), reqwest::StatusCode::Found); + assert_eq!(res.url().as_str(), &url); + assert_eq!(res.status(), reqwest::StatusCode::Ok); assert_eq!(res.headers().get(), - Some(&reqwest::header::Server("test-dont".to_string()))); -} - -#[test] -fn test_referer_is_not_set_if_disabled() { - let server = server! { - request: b"\ - GET /no-refer HTTP/1.1\r\n\ - Host: $HOST\r\n\ - User-Agent: $USERAGENT\r\n\ - Accept: */*\r\n\ - Accept-Encoding: gzip\r\n\ - \r\n\ - ", - response: b"\ - HTTP/1.1 302 Found\r\n\ - Server: test-no-referer\r\n\ - Content-Length: 0\r\n\ - Location: /dst\r\n\ - Connection: close\r\n\ - \r\n\ - ", - - request: b"\ - GET /dst HTTP/1.1\r\n\ - Host: $HOST\r\n\ - User-Agent: $USERAGENT\r\n\ - Accept: */*\r\n\ - Accept-Encoding: gzip\r\n\ - \r\n\ - ", - response: b"\ - HTTP/1.1 200 OK\r\n\ - Server: test-dst\r\n\ - Content-Length: 0\r\n\ - \r\n\ - " - }; - reqwest::Client::builder().unwrap() - .referer(false) - .build().unwrap() - //client - .get(&format!("http://{}/no-refer", server.addr())) - .unwrap() - .send() - .unwrap(); -} - -#[test] -fn test_accept_header_is_not_changed_if_set() { - let server = server! { - request: b"\ - GET /accept HTTP/1.1\r\n\ - Host: $HOST\r\n\ - Accept: application/json\r\n\ - User-Agent: $USERAGENT\r\n\ - Accept-Encoding: gzip\r\n\ - \r\n\ - ", - response: b"\ - HTTP/1.1 200 OK\r\n\ - Server: test-accept\r\n\ - Content-Length: 0\r\n\ - \r\n\ - " - }; - let client = reqwest::Client::new().unwrap(); - - let res = client - .get(&format!("http://{}/accept", server.addr())) - .unwrap() - .header(reqwest::header::Accept::json()) - .send() - .unwrap(); - - assert_eq!(res.status(), reqwest::StatusCode::Ok); -} - -#[test] -fn test_accept_encoding_header_is_not_changed_if_set() { - let server = server! { - request: b"\ - GET /accept-encoding HTTP/1.1\r\n\ - Host: $HOST\r\n\ - Accept-Encoding: identity\r\n\ - User-Agent: $USERAGENT\r\n\ - Accept: */*\r\n\ - \r\n\ - ", - response: b"\ - HTTP/1.1 200 OK\r\n\ - Server: test-accept-encoding\r\n\ - Content-Length: 0\r\n\ - \r\n\ - " - }; - let client = reqwest::Client::new().unwrap(); - - let res = client.get(&format!("http://{}/accept-encoding", server.addr())) - .unwrap() - .header(reqwest::header::AcceptEncoding( - vec![reqwest::header::qitem(reqwest::header::Encoding::Identity)] - )) - .send() - .unwrap(); - - assert_eq!(res.status(), reqwest::StatusCode::Ok); -} - -#[test] -fn test_gzip_response() { - let mut encoder = ::libflate::gzip::Encoder::new(Vec::new()).unwrap(); - match encoder.write(b"test request") { - Ok(n) => assert!(n > 0, "Failed to write to encoder."), - _ => panic!("Failed to gzip encode string."), - }; - - let gzipped_content = encoder.finish().into_result().unwrap(); - - let mut response = format!("\ - HTTP/1.1 200 OK\r\n\ - Server: test-accept\r\n\ - Content-Encoding: gzip\r\n\ - Content-Length: {}\r\n\ - \r\n", &gzipped_content.len()) - .into_bytes(); - response.extend(&gzipped_content); - - let server = server! { - request: b"\ - GET /gzip HTTP/1.1\r\n\ - Host: $HOST\r\n\ - User-Agent: $USERAGENT\r\n\ - Accept: */*\r\n\ - Accept-Encoding: gzip\r\n\ - \r\n\ - ", - response: response - }; - let mut res = reqwest::get(&format!("http://{}/gzip", server.addr())).unwrap(); - - let mut body = ::std::string::String::new(); - match res.read_to_string(&mut body) { - Ok(n) => assert!(n > 0, "Failed to write to buffer."), - _ => panic!("Failed to write to buffer."), - }; - - assert_eq!(body, "test request"); -} - -#[test] -fn test_gzip_empty_body() { - let server = server! { - request: b"\ - HEAD /gzip HTTP/1.1\r\n\ - Host: $HOST\r\n\ - User-Agent: $USERAGENT\r\n\ - Accept: */*\r\n\ - Accept-Encoding: gzip\r\n\ - \r\n\ - ", - response: b"\ - HTTP/1.1 200 OK\r\n\ - Server: test-accept\r\n\ - Content-Encoding: gzip\r\n\ - Content-Length: 100\r\n\ - \r\n" - }; - - let client = reqwest::Client::new().unwrap(); - let mut res = client - .head(&format!("http://{}/gzip", server.addr())) - .unwrap() - .send() - .unwrap(); - - let mut body = ::std::string::String::new(); - res.read_to_string(&mut body).unwrap(); - - assert_eq!(body, ""); -} - -#[test] -fn test_gzip_invalid_body() { - let server = server! { - request: b"\ - GET /gzip HTTP/1.1\r\n\ - Host: $HOST\r\n\ - User-Agent: $USERAGENT\r\n\ - Accept: */*\r\n\ - Accept-Encoding: gzip\r\n\ - \r\n\ - ", - response: b"\ - HTTP/1.1 200 OK\r\n\ - Server: test-accept\r\n\ - Content-Encoding: gzip\r\n\ - Content-Length: 100\r\n\ - \r\n\ - 0" - }; - - let mut res = reqwest::get(&format!("http://{}/gzip", server.addr())).unwrap(); - // this tests that the request.send() didn't error, but that the error - // is in reading the body - - let mut body = ::std::string::String::new(); - res.read_to_string(&mut body).unwrap_err(); + Some(&reqwest::header::Server::new("post"))); + assert_eq!(res.headers().get(), + Some(&reqwest::header::ContentLength(0))); + + let mut buf = [0; 1024]; + let n = res.read(&mut buf).unwrap(); + assert_eq!(n, 0) } diff --git a/tests/gzip.rs b/tests/gzip.rs new file mode 100644 index 0000000..6e3a478 --- /dev/null +++ b/tests/gzip.rs @@ -0,0 +1,166 @@ +extern crate reqwest; +extern crate libflate; + +#[macro_use] +mod support; + +use std::io::{Read, Write}; + +#[test] +fn test_gzip_response() { + let mut encoder = ::libflate::gzip::Encoder::new(Vec::new()).unwrap(); + match encoder.write(b"test request") { + Ok(n) => assert!(n > 0, "Failed to write to encoder."), + _ => panic!("Failed to gzip encode string."), + }; + + let gzipped_content = encoder.finish().into_result().unwrap(); + + let mut response = format!("\ + HTTP/1.1 200 OK\r\n\ + Server: test-accept\r\n\ + Content-Encoding: gzip\r\n\ + Content-Length: {}\r\n\ + \r\n", &gzipped_content.len()) + .into_bytes(); + response.extend(&gzipped_content); + + let server = server! { + request: b"\ + GET /gzip HTTP/1.1\r\n\ + Host: $HOST\r\n\ + User-Agent: $USERAGENT\r\n\ + Accept: */*\r\n\ + Accept-Encoding: gzip\r\n\ + \r\n\ + ", + response: response + }; + let mut res = reqwest::get(&format!("http://{}/gzip", server.addr())).unwrap(); + + let mut body = String::new(); + res.read_to_string(&mut body).unwrap(); + + assert_eq!(body, "test request"); +} + +#[test] +fn test_gzip_empty_body() { + let server = server! { + request: b"\ + HEAD /gzip HTTP/1.1\r\n\ + Host: $HOST\r\n\ + User-Agent: $USERAGENT\r\n\ + Accept: */*\r\n\ + Accept-Encoding: gzip\r\n\ + \r\n\ + ", + response: b"\ + HTTP/1.1 200 OK\r\n\ + Server: test-accept\r\n\ + Content-Encoding: gzip\r\n\ + Content-Length: 100\r\n\ + \r\n" + }; + + let client = reqwest::Client::new().unwrap(); + let mut res = client + .head(&format!("http://{}/gzip", server.addr())) + .unwrap() + .send() + .unwrap(); + + let mut body = ::std::string::String::new(); + res.read_to_string(&mut body).unwrap(); + + assert_eq!(body, ""); +} + +#[test] +fn test_gzip_invalid_body() { + let server = server! { + request: b"\ + GET /gzip HTTP/1.1\r\n\ + Host: $HOST\r\n\ + User-Agent: $USERAGENT\r\n\ + Accept: */*\r\n\ + Accept-Encoding: gzip\r\n\ + \r\n\ + ", + response: b"\ + HTTP/1.1 200 OK\r\n\ + Server: test-accept\r\n\ + Content-Encoding: gzip\r\n\ + Content-Length: 100\r\n\ + \r\n\ + 0" + }; + + let mut res = reqwest::get(&format!("http://{}/gzip", server.addr())).unwrap(); + // this tests that the request.send() didn't error, but that the error + // is in reading the body + + let mut body = ::std::string::String::new(); + res.read_to_string(&mut body).unwrap_err(); +} + +#[test] +fn test_accept_header_is_not_changed_if_set() { + let server = server! { + request: b"\ + GET /accept HTTP/1.1\r\n\ + Host: $HOST\r\n\ + Accept: application/json\r\n\ + User-Agent: $USERAGENT\r\n\ + Accept-Encoding: gzip\r\n\ + \r\n\ + ", + response: b"\ + HTTP/1.1 200 OK\r\n\ + Server: test-accept\r\n\ + Content-Length: 0\r\n\ + \r\n\ + " + }; + let client = reqwest::Client::new().unwrap(); + + let res = client + .get(&format!("http://{}/accept", server.addr())) + .unwrap() + .header(reqwest::header::Accept::json()) + .send() + .unwrap(); + + assert_eq!(res.status(), reqwest::StatusCode::Ok); +} + +#[test] +fn test_accept_encoding_header_is_not_changed_if_set() { + let server = server! { + request: b"\ + GET /accept-encoding HTTP/1.1\r\n\ + Host: $HOST\r\n\ + Accept-Encoding: identity\r\n\ + User-Agent: $USERAGENT\r\n\ + Accept: */*\r\n\ + \r\n\ + ", + response: b"\ + HTTP/1.1 200 OK\r\n\ + Server: test-accept-encoding\r\n\ + Content-Length: 0\r\n\ + \r\n\ + " + }; + let client = reqwest::Client::new().unwrap(); + + let res = client.get(&format!("http://{}/accept-encoding", server.addr())) + .unwrap() + .header(reqwest::header::AcceptEncoding( + vec![reqwest::header::qitem(reqwest::header::Encoding::Identity)] + )) + .send() + .unwrap(); + + assert_eq!(res.status(), reqwest::StatusCode::Ok); +} diff --git a/tests/redirect.rs b/tests/redirect.rs new file mode 100644 index 0000000..1c7ed0d --- /dev/null +++ b/tests/redirect.rs @@ -0,0 +1,318 @@ +extern crate reqwest; + +#[macro_use] +mod support; + +#[test] +fn test_redirect_301_and_302_and_303_changes_post_to_get() { + let client = reqwest::Client::new().unwrap(); + let codes = [301, 302, 303]; + + for code in codes.iter() { + let redirect = server! { + request: format!("\ + POST /{} HTTP/1.1\r\n\ + Host: $HOST\r\n\ + User-Agent: $USERAGENT\r\n\ + Accept: */*\r\n\ + Accept-Encoding: gzip\r\n\ + Content-Length: 0\r\n\ + \r\n\ + ", code), + response: format!("\ + HTTP/1.1 {} reason\r\n\ + Server: test-redirect\r\n\ + Content-Length: 0\r\n\ + Location: /dst\r\n\ + Connection: close\r\n\ + \r\n\ + ", code), + + request: format!("\ + GET /dst HTTP/1.1\r\n\ + Host: $HOST\r\n\ + User-Agent: $USERAGENT\r\n\ + Accept: */*\r\n\ + Accept-Encoding: gzip\r\n\ + Referer: http://$HOST/{}\r\n\ + \r\n\ + ", code), + response: b"\ + HTTP/1.1 200 OK\r\n\ + Server: test-dst\r\n\ + Content-Length: 0\r\n\ + \r\n\ + " + }; + + let url = format!("http://{}/{}", redirect.addr(), code); + let dst = format!("http://{}/{}", redirect.addr(), "dst"); + let res = client.post(&url) + .unwrap() + .send() + .unwrap(); + assert_eq!(res.url().as_str(), dst); + assert_eq!(res.status(), reqwest::StatusCode::Ok); + assert_eq!(res.headers().get(), + Some(&reqwest::header::Server::new("test-dst".to_string()))); + } +} + +#[test] +fn test_redirect_307_and_308_tries_to_post_again() { + let client = reqwest::Client::new().unwrap(); + let codes = [307, 308]; + for code in codes.iter() { + let redirect = server! { + request: format!("\ + POST /{} HTTP/1.1\r\n\ + Host: $HOST\r\n\ + Content-Length: 5\r\n\ + User-Agent: $USERAGENT\r\n\ + Accept: */*\r\n\ + Accept-Encoding: gzip\r\n\ + \r\n\ + Hello\ + ", code), + response: format!("\ + HTTP/1.1 {} reason\r\n\ + Server: test-redirect\r\n\ + Content-Length: 0\r\n\ + Location: /dst\r\n\ + Connection: close\r\n\ + \r\n\ + ", code), + + request: format!("\ + POST /dst HTTP/1.1\r\n\ + Host: $HOST\r\n\ + Content-Length: 5\r\n\ + User-Agent: $USERAGENT\r\n\ + Accept: */*\r\n\ + Accept-Encoding: gzip\r\n\ + Referer: http://$HOST/{}\r\n\ + \r\n\ + Hello\ + ", code), + response: b"\ + HTTP/1.1 200 OK\r\n\ + Server: test-dst\r\n\ + Content-Length: 0\r\n\ + \r\n\ + " + }; + + let url = format!("http://{}/{}", redirect.addr(), code); + let dst = format!("http://{}/{}", redirect.addr(), "dst"); + let res = client.post(&url) + .unwrap() + .body("Hello") + .send() + .unwrap(); + assert_eq!(res.url().as_str(), dst); + assert_eq!(res.status(), reqwest::StatusCode::Ok); + assert_eq!(res.headers().get(), + Some(&reqwest::header::Server::new("test-dst".to_string()))); + } +} + +/* +#[test] +fn test_redirect_307_does_not_try_if_reader_cannot_reset() { + let client = reqwest::Client::new().unwrap(); + let codes = [307, 308]; + for &code in codes.iter() { + let redirect = server! { + request: format!("\ + POST /{} HTTP/1.1\r\n\ + Host: $HOST\r\n\ + User-Agent: $USERAGENT\r\n\ + Accept: * / *\r\n\ + Accept-Encoding: gzip\r\n\ + Transfer-Encoding: chunked\r\n\ + \r\n\ + 5\r\n\ + Hello\r\n\ + 0\r\n\r\n\ + ", code), + response: format!("\ + HTTP/1.1 {} reason\r\n\ + Server: test-redirect\r\n\ + Content-Length: 0\r\n\ + Location: /dst\r\n\ + Connection: close\r\n\ + \r\n\ + ", code) + }; + + let url = format!("http://{}/{}", redirect.addr(), code); + let res = client + .post(&url) + .unwrap() + .body(reqwest::Body::new(&b"Hello"[..])) + .send() + .unwrap(); + assert_eq!(res.url().as_str(), url); + assert_eq!(res.status(), reqwest::StatusCode::try_from(code).unwrap()); + } +} +*/ + +#[test] +fn test_redirect_removes_sensitive_headers() { + let end_server = server! { + request: b"\ + GET /otherhost HTTP/1.1\r\n\ + Host: $HOST\r\n\ + User-Agent: $USERAGENT\r\n\ + Accept: */*\r\n\ + Accept-Encoding: gzip\r\n\ + \r\n\ + ", + response: b"\ + HTTP/1.1 200 OK\r\n\ + Server: test\r\n\ + Content-Length: 0\r\n\ + \r\n\ + " + }; + + let mid_server = server! { + request: b"\ + GET /sensitive HTTP/1.1\r\n\ + Host: $HOST\r\n\ + Cookie: foo=bar\r\n\ + User-Agent: $USERAGENT\r\n\ + Accept: */*\r\n\ + Accept-Encoding: gzip\r\n\ + \r\n\ + ", + response: format!("\ + HTTP/1.1 302 Found\r\n\ + Server: test\r\n\ + Location: http://{}/otherhost\r\n\ + Content-Length: 0\r\n\ + \r\n\ + ", end_server.addr()) + }; + + let mut cookie = reqwest::header::Cookie::new(); + cookie.set("foo", "bar"); + reqwest::Client::builder() + .unwrap() + .referer(false) + .build() + .unwrap() + .get(&format!("http://{}/sensitive", mid_server.addr())) + .unwrap() + .header(cookie) + .send() + .unwrap(); +} + +#[test] +fn test_redirect_policy_can_return_errors() { + let server = server! { + request: b"\ + GET /loop HTTP/1.1\r\n\ + Host: $HOST\r\n\ + User-Agent: $USERAGENT\r\n\ + Accept: */*\r\n\ + Accept-Encoding: gzip\r\n\ + \r\n\ + ", + response: b"\ + HTTP/1.1 302 Found\r\n\ + Server: test\r\n\ + Location: /loop\r\n\ + Content-Length: 0\r\n\ + \r\n\ + " + }; + + let err = reqwest::get(&format!("http://{}/loop", server.addr())).unwrap_err(); + assert!(err.is_redirect()); +} + +#[test] +fn test_redirect_policy_can_stop_redirects_without_an_error() { + let server = server! { + request: b"\ + GET /no-redirect HTTP/1.1\r\n\ + Host: $HOST\r\n\ + User-Agent: $USERAGENT\r\n\ + Accept: */*\r\n\ + Accept-Encoding: gzip\r\n\ + \r\n\ + ", + response: b"\ + HTTP/1.1 302 Found\r\n\ + Server: test-dont\r\n\ + Location: /dont\r\n\ + Content-Length: 0\r\n\ + \r\n\ + " + }; + + let url = format!("http://{}/no-redirect", server.addr()); + + let res = reqwest::Client::builder() + .unwrap() + .redirect(reqwest::RedirectPolicy::none()) + .build() + .unwrap() + .get(&url) + .unwrap() + .send() + .unwrap(); + + assert_eq!(res.url().as_str(), url); + assert_eq!(res.status(), reqwest::StatusCode::Found); + assert_eq!(res.headers().get(), + Some(&reqwest::header::Server::new("test-dont".to_string()))); +} + +#[test] +fn test_referer_is_not_set_if_disabled() { + let server = server! { + request: b"\ + GET /no-refer HTTP/1.1\r\n\ + Host: $HOST\r\n\ + User-Agent: $USERAGENT\r\n\ + Accept: */*\r\n\ + Accept-Encoding: gzip\r\n\ + \r\n\ + ", + response: b"\ + HTTP/1.1 302 Found\r\n\ + Server: test-no-referer\r\n\ + Content-Length: 0\r\n\ + Location: /dst\r\n\ + Connection: close\r\n\ + \r\n\ + ", + + request: b"\ + GET /dst HTTP/1.1\r\n\ + Host: $HOST\r\n\ + User-Agent: $USERAGENT\r\n\ + Accept: */*\r\n\ + Accept-Encoding: gzip\r\n\ + \r\n\ + ", + response: b"\ + HTTP/1.1 200 OK\r\n\ + Server: test-dst\r\n\ + Content-Length: 0\r\n\ + \r\n\ + " + }; + reqwest::Client::builder().unwrap() + .referer(false) + .build().unwrap() + //client + .get(&format!("http://{}/no-refer", server.addr())) + .unwrap() + .send() + .unwrap(); +} diff --git a/tests/support/mod.rs b/tests/support/mod.rs new file mode 100644 index 0000000..dc77a29 --- /dev/null +++ b/tests/support/mod.rs @@ -0,0 +1,2 @@ +#[macro_use] +pub mod server; diff --git a/tests/server.rs b/tests/support/server.rs similarity index 55% rename from tests/server.rs rename to tests/support/server.rs index 3ebf81b..b56cb9f 100644 --- a/tests/server.rs +++ b/tests/support/server.rs @@ -2,6 +2,7 @@ use std::io::{Read, Write}; use std::net; +use std::time::Duration; use std::thread; pub struct Server { @@ -14,16 +15,33 @@ impl Server { } } +#[derive(Default)] +pub struct Txn { + pub request: Vec, + pub response: Vec, + + pub read_timeout: Option, + pub response_timeout: Option, + pub write_timeout: Option, +} + static DEFAULT_USER_AGENT: &'static str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION")); -pub fn spawn(txns: Vec<(Vec, Vec)>) -> Server { +pub fn spawn(txns: Vec) -> Server { let listener = net::TcpListener::bind("127.0.0.1:0").unwrap(); let addr = listener.local_addr().unwrap(); thread::spawn( - move || for (mut expected, reply) in txns { + move || for txn in txns { + let mut expected = txn.request; + let reply = txn.response; let (mut socket, _addr) = listener.accept().unwrap(); replace_expected_vars(&mut expected, addr.to_string().as_ref(), DEFAULT_USER_AGENT.as_ref()); + + if let Some(dur) = txn.read_timeout { + thread::park_timeout(dur); + } + let mut buf = [0; 4096]; let n = socket.read(&mut buf).unwrap(); @@ -31,7 +49,19 @@ pub fn spawn(txns: Vec<(Vec, Vec)>) -> Server { (Ok(expected), Ok(received)) => assert_eq!(expected, received), _ => assert_eq!(expected, &buf[..n]), } - socket.write_all(&reply).unwrap(); + + if let Some(dur) = txn.response_timeout { + thread::park_timeout(dur); + } + + if let Some(dur) = txn.write_timeout { + let headers_end = ::std::str::from_utf8(&reply).unwrap().find("\r\n\r\n").unwrap() + 4; + socket.write_all(&reply[..headers_end]).unwrap(); + thread::park_timeout(dur); + socket.write_all(&reply[headers_end..]).unwrap(); + } else { + socket.write_all(&reply).unwrap(); + } } ); @@ -76,9 +106,38 @@ fn replace_expected_vars(bytes: &mut Vec, host: &[u8], ua: &[u8]) { #[macro_export] macro_rules! server { ($(request: $req:expr, response: $res:expr),*) => ({ + server!($(request: $req, response: $res;)*) + }); + ($($($f:ident: $v:expr),*);*) => ({ let txns = vec![ - $(((&$req[..]).into(), (&$res[..]).into()),)* + $(__internal__txn! { + $($f: $v,)* + }),* ]; - ::server::spawn(txns) + ::support::server::spawn(txns) }) } + +#[macro_export] +macro_rules! __internal__txn { + ($($field:ident: $val:expr,)*) => ( + ::support::server::Txn { + $( $field: __internal__prop!($field: $val), )* + .. Default::default() + } + ) +} + + +#[macro_export] +macro_rules! __internal__prop { + (request: $val:expr) => ( + From::from(&$val[..]) + ); + (response: $val:expr) => ( + From::from(&$val[..]) + ); + ($field:ident: $val:expr) => ( + From::from($val) + ) +} diff --git a/tests/timeouts.rs b/tests/timeouts.rs new file mode 100644 index 0000000..6d0df4f --- /dev/null +++ b/tests/timeouts.rs @@ -0,0 +1,121 @@ +extern crate reqwest; + +#[macro_use] +mod support; + +use std::io::Read; +use std::time::Duration; + +#[test] +fn test_write_timeout() { + let server = server! { + request: b"\ + POST /write-timeout HTTP/1.1\r\n\ + Host: $HOST\r\n\ + Content-Length: 5\r\n\ + User-Agent: $USERAGENT\r\n\ + Accept: */*\r\n\ + Accept-Encoding: gzip\r\n\ + \r\n\ + Hello\ + ", + response: b"\ + HTTP/1.1 200 OK\r\n\ + Content-Length: 5\r\n\ + \r\n\ + Hello\ + ", + read_timeout: Duration::from_secs(1) + }; + + let url = format!("http://{}/write-timeout", server.addr()); + let err = reqwest::Client::builder() + .unwrap() + .timeout(Duration::from_millis(500)) + .build() + .unwrap() + .post(&url) + .unwrap() + .header(reqwest::header::ContentLength(5)) + .body(reqwest::Body::new(&b"Hello"[..])) + .send() + .unwrap_err(); + + assert_eq!(err.get_ref().unwrap().to_string(), "timed out"); + assert_eq!(err.url().map(|u| u.as_str()), Some(url.as_str())); +} + + +#[test] +fn test_response_timeout() { + let server = server! { + request: b"\ + GET /response-timeout HTTP/1.1\r\n\ + Host: $HOST\r\n\ + User-Agent: $USERAGENT\r\n\ + Accept: */*\r\n\ + Accept-Encoding: gzip\r\n\ + \r\n\ + ", + response: b"\ + HTTP/1.1 200 OK\r\n\ + Content-Length: 0\r\n\ + ", + response_timeout: Duration::from_secs(1) + }; + + let url = format!("http://{}/response-timeout", server.addr()); + let err = reqwest::Client::builder() + .unwrap() + .timeout(Duration::from_millis(500)) + .build() + .unwrap() + .get(&url) + .unwrap() + .send() + .unwrap_err(); + + assert_eq!(err.get_ref().unwrap().to_string(), "timed out"); + assert_eq!(err.url().map(|u| u.as_str()), Some(url.as_str())); +} + +#[test] +fn test_read_timeout() { + let server = server! { + request: b"\ + GET /read-timeout HTTP/1.1\r\n\ + Host: $HOST\r\n\ + User-Agent: $USERAGENT\r\n\ + Accept: */*\r\n\ + Accept-Encoding: gzip\r\n\ + \r\n\ + ", + response: b"\ + HTTP/1.1 200 OK\r\n\ + Content-Length: 5\r\n\ + \r\n\ + Hello\ + ", + write_timeout: Duration::from_secs(1) + }; + + let url = format!("http://{}/read-timeout", server.addr()); + let mut res = reqwest::Client::builder() + .unwrap() + .timeout(Duration::from_millis(500)) + .build() + .unwrap() + .get(&url) + .unwrap() + .send() + .unwrap(); + + assert_eq!(res.url().as_str(), &url); + assert_eq!(res.status(), reqwest::StatusCode::Ok); + assert_eq!(res.headers().get(), + Some(&reqwest::header::ContentLength(5))); + + let mut buf = [0; 1024]; + let err = res.read(&mut buf).unwrap_err(); + assert_eq!(err.to_string(), "timed out"); +}