diff --git a/Cargo.toml b/Cargo.toml index feb3dc3..74ec12d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,6 +26,7 @@ tokio-io = "0.1" tokio-tls = "0.1" url = "1.2" uuid = { version = "0.5", features = ["v4"] } +hyper-proxy = "0.2.0" [dev-dependencies] env_logger = "0.4" diff --git a/src/async_impl/client.rs b/src/async_impl/client.rs index 9d1d57c..4f2e880 100644 --- a/src/async_impl/client.rs +++ b/src/async_impl/client.rs @@ -1,12 +1,13 @@ -use std::fmt; +use std::{io, fmt}; use std::sync::Arc; use std::time::Duration; use bytes::Bytes; use futures::{Async, Future, Poll}; -use hyper::client::FutureResponse; +use hyper::client::{Connect, FutureResponse, HttpConnector}; use hyper::header::{Headers, Location, Referer, UserAgent, Accept, Encoding, AcceptEncoding, Range, qitem}; +use hyper_tls::HttpsConnector; use native_tls::{TlsConnector, TlsConnectorBuilder}; use tokio_core::reactor::Handle; @@ -14,7 +15,6 @@ use tokio_core::reactor::Handle; use super::body; use super::request::{self, Request, RequestBuilder}; use super::response::{self, Response}; -use connect::Connector; use into_url::to_uri; use redirect::{self, RedirectPolicy, check_redirect, remove_sensitive_headers}; use {Certificate, Identity, IntoUrl, Method, proxy, Proxy, StatusCode, Url}; @@ -65,7 +65,8 @@ struct Config { gzip: bool, headers: Headers, hostname_verification: bool, - proxies: Vec, + // TODO: investigate Vec as before + proxy: Proxy, redirect_policy: RedirectPolicy, referer: bool, timeout: Option, @@ -87,7 +88,7 @@ impl ClientBuilder { gzip: true, headers: headers, hostname_verification: true, - proxies: Vec::new(), + proxy: Proxy::empty(), redirect_policy: RedirectPolicy::default(), referer: true, timeout: None, @@ -122,14 +123,14 @@ impl ClientBuilder { .take() .expect("ClientBuilder cannot be reused after building a Client"); - let tls = try_!(config.tls.build()); - let proxies = Arc::new(config.proxies); - - let mut connector = Connector::new(config.dns_threads, tls, proxies.clone(), handle); + let mut https_connector = try_!(HttpsConnector::new(config.dns_threads, handle)); if !config.hostname_verification { - connector.danger_disable_hostname_verification(); + https_connector.danger_disable_hostname_verification(true); } + let mut connector = config.proxy.inner.with_connector(https_connector); + let tls = try_!(config.tls.build()); + connector.set_tls(Some(tls)); let hyper_client = ::hyper::Client::configure() .connector(connector) @@ -140,7 +141,6 @@ impl ClientBuilder { gzip: config.gzip, hyper: hyper_client, headers: config.headers, - proxies: proxies, redirect_policy: config.redirect_policy, referer: config.referer, }), @@ -222,7 +222,7 @@ impl ClientBuilder { #[inline] pub fn proxy(&mut self, proxy: Proxy) -> &mut ClientBuilder { if let Some(config) = config_mut(&mut self.config, &self.err) { - config.proxies.push(proxy); + config.proxy = proxy; } self } @@ -276,7 +276,7 @@ fn config_mut<'a>(config: &'a mut Option, err: &Option<::Error>) -> Opti } } -type HyperClient = ::hyper::Client; +type HyperClient = ::hyper::Client<::hyper_proxy::Proxy>>; impl Client { /// Constructs a new `Client`. @@ -412,12 +412,6 @@ impl Client { reusable }); - if proxy::is_proxied(&self.inner.proxies, &url) { - if uri.scheme() == Some("http") { - req.set_proxy(true); - } - } - let in_flight = self.inner.hyper.request(req); Pending { @@ -458,7 +452,6 @@ struct ClientRef { gzip: bool, headers: Headers, hyper: HyperClient, - proxies: Arc>, redirect_policy: RedirectPolicy, referer: bool, } @@ -556,11 +549,6 @@ impl Future for PendingRequest { if let Some(Some(ref body)) = self.body { req.set_body(body.clone()); } - if proxy::is_proxied(&self.client.proxies, &self.url) { - if uri.scheme() == Some("http") { - req.set_proxy(true); - } - } self.in_flight = self.client.hyper.request(req); continue; }, diff --git a/src/connect.rs b/src/connect.rs deleted file mode 100644 index adec0fb..0000000 --- a/src/connect.rs +++ /dev/null @@ -1,294 +0,0 @@ -use bytes::{Buf, BufMut, IntoBuf}; -use futures::{Async, Future, Poll}; -use hyper::client::{HttpConnector, Service}; -use hyper::Uri; -use hyper_tls::{HttpsConnector, MaybeHttpsStream}; -use native_tls::TlsConnector; -use tokio_core::reactor::Handle; -use tokio_io::{AsyncRead, AsyncWrite}; -use tokio_tls::{TlsConnectorExt, TlsStream}; - -use std::io::{self, Cursor, Read, Write}; -use std::sync::Arc; - -use {proxy, Proxy}; - -// pub(crate) - -pub struct Connector { - https: HttpsConnector, - proxies: Arc>, - tls: TlsConnector, -} - -impl Connector { - pub fn new(threads: usize, tls: TlsConnector, proxies: Arc>, handle: &Handle) -> Connector { - let mut http = HttpConnector::new(threads, handle); - http.enforce_http(false); - let https = HttpsConnector::from((http, tls.clone())); - - Connector { - https: https, - proxies: proxies, - tls: tls, - } - } - - pub fn danger_disable_hostname_verification(&mut self) { - self.https.danger_disable_hostname_verification(true); - } -} - -impl Service for Connector { - type Request = Uri; - type Response = Conn; - type Error = io::Error; - type Future = Connecting; - - fn call(&self, uri: Uri) -> Self::Future { - for prox in self.proxies.iter() { - if let Some(puri) = proxy::intercept(prox, &uri) { - trace!("proxy({:?}) intercepts {:?}", puri, uri); - if uri.scheme() == Some("https") { - let host = uri.host().unwrap().to_owned(); - let port = uri.port().unwrap_or(443); - let tls = self.tls.clone(); - return Box::new(self.https.call(puri).and_then(move |conn| { - trace!("tunneling HTTPS over proxy"); - tunnel(conn, host.clone(), port) - .and_then(move |tunneled| { - tls.connect_async(&host, tunneled) - .map_err(|e| io::Error::new(io::ErrorKind::Other, e)) - }) - .map(|io| Conn::Proxied(io)) - })); - } - return Box::new(self.https.call(puri).map(|io| Conn::Normal(io))); - } - } - Box::new(self.https.call(uri).map(|io| Conn::Normal(io))) - } -} - -type HttpStream = ::Response; -type HttpsStream = MaybeHttpsStream; - -pub type Connecting = Box>; - -pub enum Conn { - Normal(HttpsStream), - Proxied(TlsStream>), -} - -impl Read for Conn { - #[inline] - fn read(&mut self, buf: &mut [u8]) -> io::Result { - match *self { - Conn::Normal(ref mut s) => s.read(buf), - Conn::Proxied(ref mut s) => s.read(buf), - } - } -} - -impl Write for Conn { - #[inline] - fn write(&mut self, buf: &[u8]) -> io::Result { - match *self { - Conn::Normal(ref mut s) => s.write(buf), - Conn::Proxied(ref mut s) => s.write(buf), - } - } - - #[inline] - fn flush(&mut self) -> io::Result<()> { - match *self { - Conn::Normal(ref mut s) => s.flush(), - Conn::Proxied(ref mut s) => s.flush(), - } - } -} - -impl AsyncRead for Conn { - unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { - match *self { - Conn::Normal(ref s) => s.prepare_uninitialized_buffer(buf), - Conn::Proxied(ref s) => s.prepare_uninitialized_buffer(buf), - } - } - - fn read_buf(&mut self, buf: &mut B) -> Poll { - match *self { - Conn::Normal(ref mut s) => s.read_buf(buf), - Conn::Proxied(ref mut s) => s.read_buf(buf), - } - } -} - -impl AsyncWrite for Conn { - fn shutdown(&mut self) -> Poll<(), io::Error> { - match *self { - Conn::Normal(ref mut s) => s.shutdown(), - Conn::Proxied(ref mut s) => s.shutdown(), - } - } - - fn write_buf(&mut self, buf: &mut B) -> Poll { - match *self { - Conn::Normal(ref mut s) => s.write_buf(buf), - Conn::Proxied(ref mut s) => s.write_buf(buf), - } - } -} - -fn tunnel(conn: T, host: String, port: u16) -> Tunnel { - let buf = format!("\ - CONNECT {0}:{1} HTTP/1.1\r\n\ - Host: {0}:{1}\r\n\ - \r\n\ - ", host, port).into_bytes(); - - Tunnel { - buf: buf.into_buf(), - conn: Some(conn), - state: TunnelState::Writing, - } -} - -struct Tunnel { - buf: Cursor>, - conn: Option, - state: TunnelState, -} - -enum TunnelState { - Writing, - Reading -} - -impl Future for Tunnel -where T: AsyncRead + AsyncWrite { - type Item = T; - type Error = io::Error; - - fn poll(&mut self) -> Poll { - loop { - if let TunnelState::Writing = self.state { - let n = try_ready!(self.conn.as_mut().unwrap().write_buf(&mut self.buf)); - if !self.buf.has_remaining_mut() { - self.state = TunnelState::Reading; - self.buf.get_mut().truncate(0); - } else if n == 0 { - return Err(tunnel_eof()); - } - } else { - let n = try_ready!(self.conn.as_mut().unwrap().read_buf(&mut self.buf.get_mut())); - let read = &self.buf.get_ref()[..]; - if n == 0 { - return Err(tunnel_eof()); - } else if read.len() > 12 { - if read.starts_with(b"HTTP/1.1 200") || read.starts_with(b"HTTP/1.0 200") { - if read.ends_with(b"\r\n\r\n") { - return Ok(Async::Ready(self.conn.take().unwrap())); - } - // else read more - } else { - return Err(io::Error::new(io::ErrorKind::Other, "unsuccessful tunnel")); - } - } - } - } - } -} - -#[inline] -fn tunnel_eof() -> io::Error { - io::Error::new( - io::ErrorKind::UnexpectedEof, - "unexpected eof while tunneling" - ) -} - -#[cfg(test)] -mod tests { - use std::io::{Read, Write}; - use std::net::TcpListener; - use std::thread; - use futures::Future; - use tokio_core::reactor::Core; - use tokio_core::net::TcpStream; - use super::tunnel; - - - macro_rules! mock_tunnel { - () => ({ - mock_tunnel!(b"\ - HTTP/1.1 200 OK\r\n\ - \r\n\ - ") - }); - ($write:expr) => ({ - let listener = TcpListener::bind("127.0.0.1:0").unwrap(); - let addr = listener.local_addr().unwrap(); - let connect_expected = format!("\ - CONNECT {0}:{1} HTTP/1.1\r\n\ - Host: {0}:{1}\r\n\ - \r\n\ - ", addr.ip(), addr.port()).into_bytes(); - - thread::spawn(move || { - let (mut sock, _) = listener.accept().unwrap(); - let mut buf = [0u8; 4096]; - let n = sock.read(&mut buf).unwrap(); - assert_eq!(&buf[..n], &connect_expected[..]); - - sock.write_all($write).unwrap(); - }); - addr - }) - } - - #[test] - fn test_tunnel() { - let addr = mock_tunnel!(); - - let mut core = Core::new().unwrap(); - let work = TcpStream::connect(&addr, &core.handle()); - let host = addr.ip().to_string(); - let port = addr.port(); - let work = work.and_then(|tcp| { - tunnel(tcp, host, port) - }); - - core.run(work).unwrap(); - } - - #[test] - fn test_tunnel_eof() { - let addr = mock_tunnel!(b"HTTP/1.1 200 OK"); - - let mut core = Core::new().unwrap(); - let work = TcpStream::connect(&addr, &core.handle()); - let host = addr.ip().to_string(); - let port = addr.port(); - let work = work.and_then(|tcp| { - tunnel(tcp, host, port) - }); - - core.run(work).unwrap_err(); - } - - #[test] - fn test_tunnel_bad_response() { - let addr = mock_tunnel!(b"foo bar baz hallo"); - - let mut core = Core::new().unwrap(); - let work = TcpStream::connect(&addr, &core.handle()); - let host = addr.ip().to_string(); - let port = addr.port(); - let work = work.and_then(|tcp| { - tunnel(tcp, host, port) - }); - - core.run(work).unwrap_err(); - } -} diff --git a/src/into_url.rs b/src/into_url.rs index aee4c2f..80de1bd 100644 --- a/src/into_url.rs +++ b/src/into_url.rs @@ -36,7 +36,3 @@ impl<'a> PolyfillTryInto for &'a String { pub fn to_uri(url: &Url) -> ::hyper::Uri { url.as_str().parse().expect("a parsed Url should always be a valid Uri") } - -pub fn to_url(uri: &::hyper::Uri) -> Url { - uri.as_ref().parse().expect("reqwest Uris should only ever come from Urls") -} diff --git a/src/lib.rs b/src/lib.rs index 4181109..2966509 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -132,6 +132,7 @@ extern crate bytes; #[macro_use] extern crate futures; extern crate hyper; +extern crate hyper_proxy; extern crate hyper_tls; #[macro_use] extern crate log; @@ -170,7 +171,6 @@ pub use self::tls::{Certificate, Identity}; mod error; mod async_impl; -mod connect; mod body; mod client; mod into_url; diff --git a/src/proxy.rs b/src/proxy.rs index 9739cca..78ec807 100644 --- a/src/proxy.rs +++ b/src/proxy.rs @@ -1,8 +1,7 @@ -use std::fmt; -use std::sync::Arc; - use hyper::Uri; -use {into_url, IntoUrl, Url}; +use {IntoUrl}; +use hyper_proxy::Intercept; +use hyper_proxy::Proxy as HyperProxy; /// Configuration of a proxy that a `Client` should pass requests to. /// @@ -15,7 +14,11 @@ use {into_url, IntoUrl, Url}; /// /// ```rust /// # fn run() -> Result<(), Box<::std::error::Error>> { -/// let proxy = reqwest::Proxy::http("https://secure.example")?; +/// let mut proxy = reqwest::Proxy::http("https://secure.example")?; +/// // proxy.set_authorization(Basic { +/// // username: "John Doe".into(), +/// // password: Some("Agent1234".into()), +/// // }); /// # Ok(()) /// # } /// ``` @@ -28,9 +31,9 @@ use {into_url, IntoUrl, Url}; /// check each `Proxy` in the order it was added. This could mean that a /// `Proxy` added first with eager intercept rules, such as `Proxy::all`, /// would prevent a `Proxy` later in the list from ever working, so take care. -#[derive(Clone, Debug)] +#[derive(Debug)] pub struct Proxy { - intercept: Intercept, + pub(crate) inner: HyperProxy<()>, } impl Proxy { @@ -49,8 +52,7 @@ impl Proxy { /// # fn main() {} /// ``` pub fn http(url: U) -> ::Result { - let uri = ::into_url::to_uri(&try_!(url.into_url())); - Ok(Proxy::new(Intercept::Http(uri))) + Proxy::new(Intercept::Http, url) } /// Proxy all HTTPS traffic to the passed URL. @@ -68,8 +70,7 @@ impl Proxy { /// # fn main() {} /// ``` pub fn https(url: U) -> ::Result { - let uri = ::into_url::to_uri(&try_!(url.into_url())); - Ok(Proxy::new(Intercept::Https(uri))) + Proxy::new(Intercept::Https, url) } /// Proxy **all** traffic to the passed URL. @@ -87,8 +88,7 @@ impl Proxy { /// # fn main() {} /// ``` pub fn all(url: U) -> ::Result { - let uri = ::into_url::to_uri(&try_!(url.into_url())); - Ok(Proxy::new(Intercept::All(uri))) + Proxy::new(Intercept::All, url) } /// Provide a custom function to determine what traffix to proxy to where. @@ -111,9 +111,9 @@ impl Proxy { /// # Ok(()) /// # } /// # fn main() {} - pub fn custom(fun: F) -> Proxy - where F: Fn(&Url) -> Option + Send + Sync + 'static { - Proxy::new(Intercept::Custom(Custom(Arc::new(fun)))) + pub fn custom(fun: F, url: U) -> ::Result + where F: Fn(&Uri) -> bool + 'static + Send + Sync { + Proxy::new(Intercept::Custom(Box::new(fun)), url) } /* @@ -122,174 +122,13 @@ impl Proxy { } */ - fn new(intercept: Intercept) -> Proxy { - Proxy { - intercept: intercept, - } + /// Get a new empty proxy which will never intercept Uris + pub(crate) fn empty() -> Proxy { + Proxy { inner: HyperProxy::unsecured((), Intercept::None, Uri::default()) } } - fn proxies(&self, url: &Url) -> bool { - match self.intercept { - Intercept::All(..) => true, - Intercept::Http(..) => url.scheme() == "http", - Intercept::Https(..) => url.scheme() == "https", - Intercept::Custom(ref fun) => (fun.0)(url).is_some(), - } - } - - - fn intercept(&self, uri: &Uri) -> Option { - match self.intercept { - Intercept::All(ref u) => Some(u.clone()), - Intercept::Http(ref u) => { - if uri.scheme() == Some("http") { - Some(u.clone()) - } else { - None - } - }, - Intercept::Https(ref u) => { - if uri.scheme() == Some("https") { - Some(u.clone()) - } else { - None - } - }, - Intercept::Custom(ref fun) => { - (fun.0)(&into_url::to_url(uri)) - .map(|u| into_url::to_uri(&u)) - }, - } + fn new(intercept: Intercept, url: U) -> ::Result { + let uri = ::into_url::to_uri(&try_!(url.into_url())); + Ok(Proxy { inner: try_!(HyperProxy::new((), intercept, uri)) }) } } - -#[derive(Clone, Debug)] -enum Intercept { - All(Uri), - Http(Uri), - Https(Uri), - Custom(Custom), -} - -#[derive(Clone)] -struct Custom(Arc Option + Send + Sync + 'static>); - -impl fmt::Debug for Custom { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.write_str("_") - } -} - -// pub(crate) - -pub fn intercept(proxy: &Proxy, uri: &Uri) -> Option { - proxy.intercept(uri) -} - -pub fn is_proxied(proxies: &[Proxy], uri: &Url) -> bool { - proxies.iter().any(|p| p.proxies(uri)) -} - -#[cfg(test)] -mod tests { - use super::*; - - fn uri(s: &str) -> Uri { - s.parse().unwrap() - } - - fn url(s: &str) -> Url { - s.parse().unwrap() - } - - #[test] - fn test_http() { - let target = "http://example.domain/"; - let p = Proxy::http(target).unwrap(); - - let http = "http://hyper.rs"; - let other = "https://hyper.rs"; - - assert!(p.proxies(&url(http))); - assert_eq!(p.intercept(&uri(http)).unwrap(), target); - assert!(!p.proxies(&url(other))); - assert!(p.intercept(&uri(other)).is_none()); - } - - #[test] - fn test_https() { - let target = "http://example.domain/"; - let p = Proxy::https(target).unwrap(); - - let http = "http://hyper.rs"; - let other = "https://hyper.rs"; - - assert!(!p.proxies(&url(http))); - assert!(p.intercept(&uri(http)).is_none()); - assert!(p.proxies(&url(other))); - assert_eq!(p.intercept(&uri(other)).unwrap(), target); - } - - #[test] - fn test_all() { - let target = "http://example.domain/"; - let p = Proxy::all(target).unwrap(); - - let http = "http://hyper.rs"; - let https = "https://hyper.rs"; - let other = "x-youve-never-heard-of-me-mr-proxy://hyper.rs"; - - assert!(p.proxies(&url(http))); - assert!(p.proxies(&url(https))); - assert!(p.proxies(&url(other))); - - assert_eq!(p.intercept(&uri(http)).unwrap(), target); - assert_eq!(p.intercept(&uri(https)).unwrap(), target); - assert_eq!(p.intercept(&uri(other)).unwrap(), target); - } - - - #[test] - fn test_custom() { - let target1 = "http://example.domain/"; - let target2 = "https://example.domain/"; - let p = Proxy::custom(move |url| { - if url.host_str() == Some("hyper.rs") { - target1.parse().ok() - } else if url.scheme() == "http" { - target2.parse().ok() - } else { - None - } - }); - - let http = "http://seanmonstar.com"; - let https = "https://hyper.rs"; - let other = "x-youve-never-heard-of-me-mr-proxy://seanmonstar.com"; - - assert!(p.proxies(&url(http))); - assert!(p.proxies(&url(https))); - assert!(!p.proxies(&url(other))); - - assert_eq!(p.intercept(&uri(http)).unwrap(), target2); - assert_eq!(p.intercept(&uri(https)).unwrap(), target1); - assert!(p.intercept(&uri(other)).is_none()); - } - - #[test] - fn test_is_proxied() { - let proxies = vec![ - Proxy::http("http://example.domain").unwrap(), - Proxy::https("http://other.domain").unwrap(), - ]; - - let http = "http://hyper.rs".parse().unwrap(); - let https = "https://hyper.rs".parse().unwrap(); - let other = "x-other://hyper.rs".parse().unwrap(); - - assert!(is_proxied(&proxies, &http)); - assert!(is_proxied(&proxies, &https)); - assert!(!is_proxied(&proxies, &other)); - } - -}