upgrade hyper to v0.11

This commit is contained in:
Sean McArthur
2017-06-20 21:27:59 -07:00
parent 8633060eaf
commit 665b4fe718
26 changed files with 2647 additions and 1027 deletions

View File

@@ -1,24 +1,13 @@
use std::fmt;
use std::net::TcpStream;
use std::sync::Arc;
use std::time::Duration;
use hyper::client::IntoUrl;
use hyper::header::{Location, Referer, UserAgent, Accept, Encoding,
AcceptEncoding, Range, qitem};
use hyper::method::Method;
use hyper::status::StatusCode;
use hyper::Url;
use futures::{Future, Stream};
use futures::sync::{mpsc, oneshot};
use hyper_native_tls::{NativeTlsClient, TlsStream, native_tls};
use body;
use redirect::{self, RedirectPolicy, check_redirect, remove_sensitive_headers};
use request::{self, Request, RequestBuilder};
use response::Response;
static DEFAULT_USER_AGENT: &'static str =
concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"));
use response::{self, Response};
use {async_impl, Certificate, Method, IntoUrl, RedirectPolicy, wait};
/// A `Client` to make Requests with.
///
@@ -43,37 +32,7 @@ static DEFAULT_USER_AGENT: &'static str =
/// ```
#[derive(Clone)]
pub struct Client {
inner: Arc<ClientRef>,
}
/// Represent an X509 certificate.
pub struct Certificate(native_tls::Certificate);
impl Certificate {
/// Create a `Certificate` from a binary DER encoded certificate
///
/// # Examples
///
/// ```
/// # use std::fs::File;
/// # use std::io::Read;
/// # fn cert() -> Result<(), Box<std::error::Error>> {
/// let mut buf = Vec::new();
/// File::open("my_cert.der")?
/// .read_to_end(&mut buf)?;
/// let cert = reqwest::Certificate::from_der(&buf)?;
/// # drop(cert);
/// # Ok(())
/// # }
/// ```
///
/// # Errors
///
/// If the provided buffer is not valid DER, an error will be returned.
pub fn from_der(der: &[u8]) -> ::Result<Certificate> {
let inner = try_!(native_tls::Certificate::from_der(der));
Ok(Certificate(inner))
}
inner: ClientHandle,
}
/// A `ClientBuilder` can be used to create a `Client` with custom configuration:
@@ -103,16 +62,9 @@ impl Certificate {
/// # }
/// ```
pub struct ClientBuilder {
config: Option<Config>,
}
struct Config {
gzip: bool,
hostname_verification: bool,
redirect_policy: RedirectPolicy,
referer: bool,
inner: async_impl::ClientBuilder,
timeout: Option<Duration>,
tls: native_tls::TlsConnectorBuilder,
}
impl ClientBuilder {
@@ -122,16 +74,10 @@ impl ClientBuilder {
///
/// This method fails if native TLS backend cannot be created.
pub fn new() -> ::Result<ClientBuilder> {
let tls_connector_builder = try_!(native_tls::TlsConnector::builder());
Ok(ClientBuilder {
config: Some(Config {
gzip: true,
hostname_verification: true,
redirect_policy: RedirectPolicy::default(),
referer: true,
timeout: None,
tls: tls_connector_builder,
})
async_impl::ClientBuilder::new().map(|builder| ClientBuilder {
inner: builder,
gzip: true,
timeout: None,
})
}
@@ -146,26 +92,8 @@ impl ClientBuilder {
/// This method consumes the internal state of the builder.
/// Trying to use this builder again after calling `build` will panic.
pub fn build(&mut self) -> ::Result<Client> {
let config = self.take_config();
let tls_connector = try_!(config.tls.build());
let mut tls_client = NativeTlsClient::from(tls_connector);
if !config.hostname_verification {
tls_client.danger_disable_hostname_verification(true);
}
let mut hyper_client = create_hyper_client(tls_client);
hyper_client.set_read_timeout(config.timeout);
hyper_client.set_write_timeout(config.timeout);
Ok(Client {
inner: Arc::new(ClientRef {
gzip: config.gzip,
hyper: hyper_client,
redirect_policy: config.redirect_policy,
referer: config.referer,
}),
ClientHandle::new(self).map(|handle| Client {
inner: handle,
})
}
@@ -178,7 +106,7 @@ impl ClientBuilder {
///
/// This method fails if adding root certificate was unsuccessful.
pub fn add_root_certificate(&mut self, cert: Certificate) -> ::Result<&mut ClientBuilder> {
try_!(self.config_mut().tls.add_root_certificate(cert.0));
self.inner.add_root_certificate(cert)?;
Ok(self)
}
@@ -192,14 +120,14 @@ impl ClientBuilder {
/// significant vulnerability to man-in-the-middle attacks.
#[inline]
pub fn danger_disable_hostname_verification(&mut self) -> &mut ClientBuilder {
self.config_mut().hostname_verification = false;
self.inner.danger_disable_hostname_verification();
self
}
/// Enable hostname verification.
#[inline]
pub fn enable_hostname_verification(&mut self) -> &mut ClientBuilder {
self.config_mut().hostname_verification = true;
self.inner.enable_hostname_verification();
self
}
@@ -208,7 +136,8 @@ impl ClientBuilder {
/// Default is enabled.
#[inline]
pub fn gzip(&mut self, enable: bool) -> &mut ClientBuilder {
self.config_mut().gzip = enable;
self.inner.gzip(enable);
self.gzip = enable;
self
}
@@ -217,7 +146,7 @@ impl ClientBuilder {
/// Default will follow redirects up to a maximum of 10.
#[inline]
pub fn redirect(&mut self, policy: RedirectPolicy) -> &mut ClientBuilder {
self.config_mut().redirect_policy = policy;
self.inner.redirect(policy);
self
}
@@ -226,128 +155,18 @@ impl ClientBuilder {
/// Default is `true`.
#[inline]
pub fn referer(&mut self, enable: bool) -> &mut ClientBuilder {
self.config_mut().referer = enable;
self.inner.referer(enable);
self
}
/// Set a timeout for both the read and write operations of a client.
#[inline]
pub fn timeout(&mut self, timeout: Duration) -> &mut ClientBuilder {
self.config_mut().timeout = Some(timeout);
self.timeout = Some(timeout);
self
}
// private
fn config_mut(&mut self) -> &mut Config {
self.config
.as_mut()
.expect("ClientBuilder cannot be reused after building a Client")
}
fn take_config(&mut self) -> Config {
self.config
.take()
.expect("ClientBuilder cannot be reused after building a Client")
}
}
fn create_hyper_client(tls_client: NativeTlsClient) -> ::hyper::Client {
let mut pool = ::hyper::client::Pool::with_connector(
Default::default(),
::hyper::net::HttpsConnector::new(tls_client),
);
// For now, while experiementing, they're constants.
// TODO: maybe make these configurable someday?
pool.set_idle_timeout(Some(Duration::from_secs(60 * 2)));
pool.set_stale_check(|mut check| {
if stream_dead(check.stream()) {
check.stale()
} else {
check.fresh()
}
});
let mut hyper_client = ::hyper::Client::with_connector(pool);
hyper_client.set_redirect_policy(::hyper::client::RedirectPolicy::FollowNone);
hyper_client
}
fn stream_dead(stream: &::hyper::net::HttpsStream<TlsStream<::hyper::net::HttpStream>>) -> bool {
match *stream {
::hyper::net::HttpsStream::Http(ref http) => socket_is_dead(&http.0),
::hyper::net::HttpsStream::Https(ref https) => socket_is_dead(&https.lock().get_ref().0),
}
}
#[cfg(unix)]
fn socket_is_dead(socket: &TcpStream) -> bool {
use std::mem;
use std::os::unix::io::AsRawFd;
use std::ptr;
use libc::{FD_SET, select, timeval};
let ret = unsafe {
let fd = socket.as_raw_fd();
let nfds = fd + 1;
let mut timeout = timeval {
tv_sec: 0,
tv_usec: 0,
};
let mut readfs = mem::zeroed();
let mut errfs = mem::zeroed();
FD_SET(fd, &mut readfs);
FD_SET(fd, &mut errfs);
select(nfds, &mut readfs, ptr::null_mut(), &mut errfs, &mut timeout)
};
// socket was readable (eof), or an error, then it's dead
ret != 0
}
#[cfg(windows)]
fn socket_is_dead(socket: &TcpStream) -> bool {
use std::mem;
use std::os::windows::io::{AsRawSocket, RawSocket};
use std::ptr;
use libc::{c_int, timeval};
const FD_SETSIZE: usize = 64;
#[repr(C)]
struct fd_set {
fd_count: c_int,
fd_array: [RawSocket; FD_SETSIZE],
}
extern "system" {
fn select(maxfds: c_int, readfs: *mut fd_set, writefs: *mut fd_set,
errfs: *mut fd_set, timeout: *mut timeval) -> c_int;
}
let ret = unsafe {
let fd = socket.as_raw_socket();
let nfds = 0; // msdn says nfds is ignored
let mut timeout = timeval {
tv_sec: 0,
tv_usec: 0,
};
let mut readfs: fd_set = mem::zeroed();
let mut errfs: fd_set = mem::zeroed();
readfs.fd_count = 1;
readfs.fd_array[0] = fd;
errfs.fd_count = 1;
errfs.fd_array[0] = fd;
select(nfds, &mut readfs, ptr::null_mut(), &mut errfs, &mut timeout)
};
// socket was readable (eof), or an error, then it's dead
ret != 0
}
impl Client {
/// Constructs a new `Client`.
@@ -457,140 +276,110 @@ impl Client {
impl fmt::Debug for Client {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Client")
.field("gzip", &self.inner.gzip)
.field("redirect_policy", &self.inner.redirect_policy)
.field("referer", &self.inner.referer)
//.field("gzip", &self.inner.gzip)
//.field("redirect_policy", &self.inner.redirect_policy)
//.field("referer", &self.inner.referer)
.finish()
}
}
struct ClientRef {
impl fmt::Debug for ClientBuilder {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("ClientBuilder")
.finish()
}
}
#[derive(Clone)]
struct ClientHandle {
gzip: bool,
hyper: ::hyper::Client,
redirect_policy: RedirectPolicy,
referer: bool,
timeout: Option<Duration>,
tx: Arc<ThreadSender>
}
impl ClientRef {
fn execute_request(&self, req: Request) -> ::Result<Response> {
let (
mut method,
mut url,
mut headers,
mut body
) = request::pieces(req);
type ThreadSender = mpsc::UnboundedSender<(async_impl::Request, oneshot::Sender<::Result<async_impl::Response>>)>;
if !headers.has::<UserAgent>() {
headers.set(UserAgent(DEFAULT_USER_AGENT.to_owned()));
}
impl ClientHandle {
fn new(builder: &mut ClientBuilder) -> ::Result<ClientHandle> {
use std::thread;
if !headers.has::<Accept>() {
headers.set(Accept::star());
}
if self.gzip &&
!headers.has::<AcceptEncoding>() &&
!headers.has::<Range>() {
headers.set(AcceptEncoding(vec![qitem(Encoding::Gzip)]));
}
let gzip = builder.gzip;
let timeout = builder.timeout;
let mut builder = async_impl::client::take_builder(&mut builder.inner);
let (tx, rx) = mpsc::unbounded();
let (spawn_tx, spawn_rx) = oneshot::channel::<::Result<()>>();
try_!(thread::Builder::new().name("reqwest-internal-sync-core".into()).spawn(move || {
use tokio_core::reactor::Core;
let mut urls = Vec::new();
let built = (|| {
let core = try_!(Core::new());
let handle = core.handle();
let client = builder.build(&handle)?;
Ok((core, handle, client))
})();
loop {
let res = {
info!("Request: {:?} {}", method, url);
let mut req = self.hyper.request(method.clone(), url.clone())
.headers(headers.clone());
if let Some(ref mut b) = body {
let body = body::as_hyper_body(b);
req = req.body(body);
let (mut core, handle, client) = match built {
Ok((a, b, c)) => {
if let Err(_) = spawn_tx.send(Ok(())) {
return;
}
(a, b, c)
},
Err(e) => {
let _ = spawn_tx.send(Err(e));
return;
}
try_!(req.send(), &url)
};
let should_redirect = match res.status {
StatusCode::MovedPermanently |
StatusCode::Found |
StatusCode::SeeOther => {
body = None;
match method {
Method::Get | Method::Head => {},
_ => {
method = Method::Get;
}
}
true
},
StatusCode::TemporaryRedirect |
StatusCode::PermanentRedirect => {
if let Some(ref body) = body {
body::can_reset(body)
} else {
true
}
},
_ => false,
};
let work = rx.for_each(|(req, tx)| {
let tx: oneshot::Sender<::Result<async_impl::Response>> = tx;
let task = client.execute(req)
.then(move |x| tx.send(x).map_err(|_| ()));
handle.spawn(task);
Ok(())
});
if should_redirect {
let loc = {
let loc = res.headers.get::<Location>().map(|loc| url.join(loc));
if let Some(loc) = loc {
loc
} else {
return Ok(::response::new(res, self.gzip));
}
};
// work is Future<(), ()>, and our closure will never return Err
let _ = core.run(work);
}));
url = match loc {
Ok(loc) => {
if self.referer {
if let Some(referer) = make_referer(&loc, &url) {
headers.set(referer);
}
}
urls.push(url);
let action = check_redirect(&self.redirect_policy, &loc, &urls);
wait::timeout(spawn_rx, timeout).expect("core thread cancelled")?;
match action {
redirect::Action::Follow => loc,
redirect::Action::Stop => {
debug!("redirect_policy disallowed redirection to '{}'", loc);
return Ok(::response::new(res, self.gzip));
},
redirect::Action::LoopDetected => {
return Err(::error::loop_detected(res.url.clone()));
},
redirect::Action::TooManyRedirects => {
return Err(::error::too_many_redirects(res.url.clone()));
}
}
},
Err(e) => {
debug!("Location header had invalid URI: {:?}", e);
Ok(ClientHandle {
gzip: gzip,
timeout: timeout,
tx: Arc::new(tx),
})
}
return Ok(::response::new(res, self.gzip));
}
};
fn execute_request(&self, req: Request) -> ::Result<Response> {
let (tx, rx) = oneshot::channel();
let (req, body) = request::async(req);
let url = req.url().clone();
self.tx.send((req, tx)).expect("core thread panicked");
remove_sensitive_headers(&mut headers, &url, &urls);
debug!("redirecting to {:?} '{}'", method, url);
} else {
return Ok(::response::new(res, self.gzip));
}
if let Some(body) = body {
try_!(body.send(), &url);
}
let res = match wait::timeout(rx, self.timeout) {
Ok(res) => res,
Err(wait::Waited::TimedOut) => return Err(::error::timedout(Some(url))),
Err(wait::Waited::Err(_canceled)) => {
// The only possible reason there would be a Cancelled error
// is if the thread running the Core panicked. We could return
// an Err here, like a BrokenPipe, but the Client is not
// recoverable. Additionally, the panic in the other thread
// is not normal, and should likely be propagated.
panic!("core thread panicked");
}
};
res.map(|res| {
response::new(res, self.gzip, self.timeout, KeepCoreThreadAlive(self.tx.clone()))
})
}
}
fn make_referer(next: &Url, previous: &Url) -> Option<Referer> {
if next.scheme() == "http" && previous.scheme() == "https" {
return None;
}
// pub(crate)
let mut referer = previous.clone();
let _ = referer.set_username("");
let _ = referer.set_password(None);
referer.set_fragment(None);
Some(Referer(referer.into_string()))
}
pub struct KeepCoreThreadAlive(Arc<ThreadSender>);