Revert "Merge pull request #232 from tafia/hyper-proxy"

This reverts commit b09b8620a6, reversing
changes made to abfcd2796e.
This commit is contained in:
Sean McArthur
2018-02-15 12:12:29 -08:00
parent 0203fad886
commit 7db860759d
8 changed files with 517 additions and 67 deletions

View File

@@ -27,7 +27,6 @@ tokio-io = "0.1"
tokio-tls = "0.1"
url = "1.2"
uuid = { version = "0.5", features = ["v4"] }
hyper-proxy = "0.4.0"
[dev-dependencies]
env_logger = "0.5"

View File

@@ -1,15 +1,12 @@
use std::{io, fmt};
use std::fmt;
use std::sync::Arc;
use std::time::Duration;
use bytes::Bytes;
use futures::{Async, Future, Poll};
use hyper::client::{Connect, FutureResponse, HttpConnector};
use hyper::client::FutureResponse;
use hyper::header::{Headers, Location, Referer, UserAgent, Accept, Encoding,
AcceptEncoding, Range, qitem};
use hyper_proxy::ProxyConnector;
use hyper_proxy::Proxy as HyperProxy;
use hyper_tls::HttpsConnector;
use native_tls::{TlsConnector, TlsConnectorBuilder};
use tokio_core::reactor::Handle;
@@ -17,6 +14,7 @@ use tokio_core::reactor::Handle;
use super::body;
use super::request::{self, Request, RequestBuilder};
use super::response::{self, Response};
use connect::Connector;
use into_url::to_uri;
use redirect::{self, RedirectPolicy, check_redirect, remove_sensitive_headers};
use {Certificate, Identity, IntoUrl, Method, proxy, Proxy, StatusCode, Url};
@@ -124,30 +122,25 @@ impl ClientBuilder {
.take()
.expect("ClientBuilder cannot be reused after building a Client");
let mut https_connector = try_!(HttpsConnector::new(config.dns_threads, handle));
if !config.hostname_verification {
https_connector.danger_disable_hostname_verification(true);
}
let mut connector = ProxyConnector::unsecured(https_connector);
let tls = try_!(config.tls.build());
connector.set_tls(Some(tls));
connector.extend_proxies(config.proxies.iter().map(|p| p.inner.clone()));
let proxies = Arc::new(config.proxies);
let mut connector = Connector::new(config.dns_threads, tls, proxies.clone(), handle);
if !config.hostname_verification {
connector.danger_disable_hostname_verification();
}
let hyper_client = ::hyper::Client::configure()
.connector(connector)
.build(handle);
// save proxies for http request
let mut proxy = ProxyConnector::unsecured(());
proxy.extend_proxies(config.proxies.into_iter().map(|p| p.inner));
Ok(Client {
inner: Arc::new(ClientRef {
gzip: config.gzip,
hyper: hyper_client,
proxy: proxy,
headers: config.headers,
proxies: proxies,
redirect_policy: config.redirect_policy,
referer: config.referer,
}),
@@ -283,7 +276,7 @@ fn config_mut<'a>(config: &'a mut Option<Config>, err: &Option<::Error>) -> Opti
}
}
type HyperClient = ::hyper::Client<::hyper_proxy::ProxyConnector<HttpsConnector<HttpConnector>>>;
type HyperClient = ::hyper::Client<Connector>;
impl Client {
/// Constructs a new `Client`.
@@ -419,10 +412,12 @@ impl Client {
reusable
});
if let Some(headers) = self.inner.proxy.http_headers(&uri) {
req.set_proxy(true);
req.headers_mut().extend(headers.iter());
if proxy::is_proxied(&self.inner.proxies, &url) {
if uri.scheme() == Some("http") {
req.set_proxy(true);
}
}
let in_flight = self.inner.hyper.request(req);
Pending {
@@ -462,8 +457,8 @@ impl fmt::Debug for ClientBuilder {
struct ClientRef {
gzip: bool,
headers: Headers,
proxy: ProxyConnector<()>,
hyper: HyperClient,
proxies: Arc<Vec<Proxy>>,
redirect_policy: RedirectPolicy,
referer: bool,
}
@@ -561,9 +556,10 @@ impl Future for PendingRequest {
if let Some(Some(ref body)) = self.body {
req.set_body(body.clone());
}
if let Some(headers) = self.client.proxy.http_headers(&uri) {
req.set_proxy(true);
req.headers_mut().extend(headers.iter());
if proxy::is_proxied(&self.client.proxies, &self.url) {
if uri.scheme() == Some("http") {
req.set_proxy(true);
}
}
self.in_flight = self.client.hyper.request(req);
continue;

294
src/connect.rs Normal file
View File

@@ -0,0 +1,294 @@
use bytes::{Buf, BufMut, IntoBuf};
use futures::{Async, Future, Poll};
use hyper::client::{HttpConnector, Service};
use hyper::Uri;
use hyper_tls::{HttpsConnector, MaybeHttpsStream};
use native_tls::TlsConnector;
use tokio_core::reactor::Handle;
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_tls::{TlsConnectorExt, TlsStream};
use std::io::{self, Cursor, Read, Write};
use std::sync::Arc;
use {proxy, Proxy};
// pub(crate)
pub struct Connector {
https: HttpsConnector<HttpConnector>,
proxies: Arc<Vec<Proxy>>,
tls: TlsConnector,
}
impl Connector {
pub fn new(threads: usize, tls: TlsConnector, proxies: Arc<Vec<Proxy>>, handle: &Handle) -> Connector {
let mut http = HttpConnector::new(threads, handle);
http.enforce_http(false);
let https = HttpsConnector::from((http, tls.clone()));
Connector {
https: https,
proxies: proxies,
tls: tls,
}
}
pub fn danger_disable_hostname_verification(&mut self) {
self.https.danger_disable_hostname_verification(true);
}
}
impl Service for Connector {
type Request = Uri;
type Response = Conn;
type Error = io::Error;
type Future = Connecting;
fn call(&self, uri: Uri) -> Self::Future {
for prox in self.proxies.iter() {
if let Some(puri) = proxy::intercept(prox, &uri) {
trace!("proxy({:?}) intercepts {:?}", puri, uri);
if uri.scheme() == Some("https") {
let host = uri.host().unwrap().to_owned();
let port = uri.port().unwrap_or(443);
let tls = self.tls.clone();
return Box::new(self.https.call(puri).and_then(move |conn| {
trace!("tunneling HTTPS over proxy");
tunnel(conn, host.clone(), port)
.and_then(move |tunneled| {
tls.connect_async(&host, tunneled)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))
})
.map(|io| Conn::Proxied(io))
}));
}
return Box::new(self.https.call(puri).map(|io| Conn::Normal(io)));
}
}
Box::new(self.https.call(uri).map(|io| Conn::Normal(io)))
}
}
type HttpStream = <HttpConnector as Service>::Response;
type HttpsStream = MaybeHttpsStream<HttpStream>;
pub type Connecting = Box<Future<Item=Conn, Error=io::Error>>;
pub enum Conn {
Normal(HttpsStream),
Proxied(TlsStream<MaybeHttpsStream<HttpStream>>),
}
impl Read for Conn {
#[inline]
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
match *self {
Conn::Normal(ref mut s) => s.read(buf),
Conn::Proxied(ref mut s) => s.read(buf),
}
}
}
impl Write for Conn {
#[inline]
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
match *self {
Conn::Normal(ref mut s) => s.write(buf),
Conn::Proxied(ref mut s) => s.write(buf),
}
}
#[inline]
fn flush(&mut self) -> io::Result<()> {
match *self {
Conn::Normal(ref mut s) => s.flush(),
Conn::Proxied(ref mut s) => s.flush(),
}
}
}
impl AsyncRead for Conn {
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
match *self {
Conn::Normal(ref s) => s.prepare_uninitialized_buffer(buf),
Conn::Proxied(ref s) => s.prepare_uninitialized_buffer(buf),
}
}
fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
match *self {
Conn::Normal(ref mut s) => s.read_buf(buf),
Conn::Proxied(ref mut s) => s.read_buf(buf),
}
}
}
impl AsyncWrite for Conn {
fn shutdown(&mut self) -> Poll<(), io::Error> {
match *self {
Conn::Normal(ref mut s) => s.shutdown(),
Conn::Proxied(ref mut s) => s.shutdown(),
}
}
fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
match *self {
Conn::Normal(ref mut s) => s.write_buf(buf),
Conn::Proxied(ref mut s) => s.write_buf(buf),
}
}
}
fn tunnel<T>(conn: T, host: String, port: u16) -> Tunnel<T> {
let buf = format!("\
CONNECT {0}:{1} HTTP/1.1\r\n\
Host: {0}:{1}\r\n\
\r\n\
", host, port).into_bytes();
Tunnel {
buf: buf.into_buf(),
conn: Some(conn),
state: TunnelState::Writing,
}
}
struct Tunnel<T> {
buf: Cursor<Vec<u8>>,
conn: Option<T>,
state: TunnelState,
}
enum TunnelState {
Writing,
Reading
}
impl<T> Future for Tunnel<T>
where T: AsyncRead + AsyncWrite {
type Item = T;
type Error = io::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop {
if let TunnelState::Writing = self.state {
let n = try_ready!(self.conn.as_mut().unwrap().write_buf(&mut self.buf));
if !self.buf.has_remaining_mut() {
self.state = TunnelState::Reading;
self.buf.get_mut().truncate(0);
} else if n == 0 {
return Err(tunnel_eof());
}
} else {
let n = try_ready!(self.conn.as_mut().unwrap().read_buf(&mut self.buf.get_mut()));
let read = &self.buf.get_ref()[..];
if n == 0 {
return Err(tunnel_eof());
} else if read.len() > 12 {
if read.starts_with(b"HTTP/1.1 200") || read.starts_with(b"HTTP/1.0 200") {
if read.ends_with(b"\r\n\r\n") {
return Ok(Async::Ready(self.conn.take().unwrap()));
}
// else read more
} else {
return Err(io::Error::new(io::ErrorKind::Other, "unsuccessful tunnel"));
}
}
}
}
}
}
#[inline]
fn tunnel_eof() -> io::Error {
io::Error::new(
io::ErrorKind::UnexpectedEof,
"unexpected eof while tunneling"
)
}
#[cfg(test)]
mod tests {
use std::io::{Read, Write};
use std::net::TcpListener;
use std::thread;
use futures::Future;
use tokio_core::reactor::Core;
use tokio_core::net::TcpStream;
use super::tunnel;
macro_rules! mock_tunnel {
() => ({
mock_tunnel!(b"\
HTTP/1.1 200 OK\r\n\
\r\n\
")
});
($write:expr) => ({
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let addr = listener.local_addr().unwrap();
let connect_expected = format!("\
CONNECT {0}:{1} HTTP/1.1\r\n\
Host: {0}:{1}\r\n\
\r\n\
", addr.ip(), addr.port()).into_bytes();
thread::spawn(move || {
let (mut sock, _) = listener.accept().unwrap();
let mut buf = [0u8; 4096];
let n = sock.read(&mut buf).unwrap();
assert_eq!(&buf[..n], &connect_expected[..]);
sock.write_all($write).unwrap();
});
addr
})
}
#[test]
fn test_tunnel() {
let addr = mock_tunnel!();
let mut core = Core::new().unwrap();
let work = TcpStream::connect(&addr, &core.handle());
let host = addr.ip().to_string();
let port = addr.port();
let work = work.and_then(|tcp| {
tunnel(tcp, host, port)
});
core.run(work).unwrap();
}
#[test]
fn test_tunnel_eof() {
let addr = mock_tunnel!(b"HTTP/1.1 200 OK");
let mut core = Core::new().unwrap();
let work = TcpStream::connect(&addr, &core.handle());
let host = addr.ip().to_string();
let port = addr.port();
let work = work.and_then(|tcp| {
tunnel(tcp, host, port)
});
core.run(work).unwrap_err();
}
#[test]
fn test_tunnel_bad_response() {
let addr = mock_tunnel!(b"foo bar baz hallo");
let mut core = Core::new().unwrap();
let work = TcpStream::connect(&addr, &core.handle());
let host = addr.ip().to_string();
let port = addr.port();
let work = work.and_then(|tcp| {
tunnel(tcp, host, port)
});
core.run(work).unwrap_err();
}
}

View File

@@ -36,3 +36,7 @@ impl<'a> PolyfillTryInto for &'a String {
pub fn to_uri(url: &Url) -> ::hyper::Uri {
url.as_str().parse().expect("a parsed Url should always be a valid Uri")
}
pub fn to_url(uri: &::hyper::Uri) -> Url {
uri.as_ref().parse().expect("reqwest Uris should only ever come from Urls")
}

View File

@@ -133,7 +133,6 @@ extern crate encoding_rs;
#[macro_use]
extern crate futures;
extern crate hyper;
extern crate hyper_proxy;
extern crate hyper_tls;
#[macro_use]
extern crate log;
@@ -175,6 +174,7 @@ pub use self::tls::{Certificate, Identity};
mod error;
mod async_impl;
mod connect;
mod body;
mod client;
mod into_url;

View File

@@ -1,9 +1,8 @@
use std::any::Any;
use std::fmt;
use std::sync::Arc;
use hyper::Uri;
use hyper::header::{Scheme};
use {IntoUrl};
use hyper_proxy::Intercept;
use hyper_proxy::Proxy as HyperProxy;
use {into_url, IntoUrl, Url};
/// Configuration of a proxy that a `Client` should pass requests to.
///
@@ -16,13 +15,7 @@ use hyper_proxy::Proxy as HyperProxy;
///
/// ```rust
/// # fn run() -> Result<(), Box<::std::error::Error>> {
/// use reqwest::header::Basic;
///
/// let mut proxy = reqwest::Proxy::http("https://secure.example")?;
/// proxy.set_authorization(Basic {
/// username: "John Doe".into(),
/// password: Some("Agent1234".into()),
/// });
/// let proxy = reqwest::Proxy::http("https://secure.example")?;
/// # Ok(())
/// # }
/// ```
@@ -37,7 +30,7 @@ use hyper_proxy::Proxy as HyperProxy;
/// would prevent a `Proxy` later in the list from ever working, so take care.
#[derive(Clone, Debug)]
pub struct Proxy {
pub(crate) inner: HyperProxy,
intercept: Intercept,
}
impl Proxy {
@@ -56,7 +49,8 @@ impl Proxy {
/// # fn main() {}
/// ```
pub fn http<U: IntoUrl>(url: U) -> ::Result<Proxy> {
Proxy::new(Intercept::Http, url)
let uri = ::into_url::to_uri(&try_!(url.into_url()));
Ok(Proxy::new(Intercept::Http(uri)))
}
/// Proxy all HTTPS traffic to the passed URL.
@@ -74,7 +68,8 @@ impl Proxy {
/// # fn main() {}
/// ```
pub fn https<U: IntoUrl>(url: U) -> ::Result<Proxy> {
Proxy::new(Intercept::Https, url)
let uri = ::into_url::to_uri(&try_!(url.into_url()));
Ok(Proxy::new(Intercept::Https(uri)))
}
/// Proxy **all** traffic to the passed URL.
@@ -92,7 +87,8 @@ impl Proxy {
/// # fn main() {}
/// ```
pub fn all<U: IntoUrl>(url: U) -> ::Result<Proxy> {
Proxy::new(Intercept::All, url)
let uri = ::into_url::to_uri(&try_!(url.into_url()));
Ok(Proxy::new(Intercept::All(uri)))
}
/// Provide a custom function to determine what traffix to proxy to where.
@@ -104,20 +100,20 @@ impl Proxy {
/// # fn run() -> Result<(), Box<::std::error::Error>> {
/// let target = reqwest::Url::parse("https://my.prox")?;
/// let client = reqwest::Client::builder()
/// .proxy(reqwest::Proxy::custom(|url| url.host() == Some("hyper.rs"),
/// "http://proxy.custom")?)
/// .proxy(reqwest::Proxy::custom(move |url| {
/// if url.host_str() == Some("hyper.rs") {
/// Some(target.clone())
/// } else {
/// None
/// }
/// }))
/// .build()?;
/// # Ok(())
/// # }
/// # fn main() {}
pub fn custom<F, U: IntoUrl>(fun: F, url: U) -> ::Result<Proxy>
where F: Fn(&Uri) -> bool + 'static + Send + Sync {
Proxy::new(fun, url)
}
/// Set proxy authorization
pub fn set_authorization<S: Scheme + Any>(&mut self, scheme: S) {
self.inner.set_authorization(scheme);
pub fn custom<F>(fun: F) -> Proxy
where F: Fn(&Url) -> Option<Url> + Send + Sync + 'static {
Proxy::new(Intercept::Custom(Custom(Arc::new(fun))))
}
/*
@@ -126,8 +122,174 @@ impl Proxy {
}
*/
fn new<U: IntoUrl, I: Into<Intercept>>(intercept: I, url: U) -> ::Result<Proxy> {
let uri = ::into_url::to_uri(&try_!(url.into_url()));
Ok(Proxy { inner: HyperProxy::new(intercept, uri) })
fn new(intercept: Intercept) -> Proxy {
Proxy {
intercept: intercept,
}
}
fn proxies(&self, url: &Url) -> bool {
match self.intercept {
Intercept::All(..) => true,
Intercept::Http(..) => url.scheme() == "http",
Intercept::Https(..) => url.scheme() == "https",
Intercept::Custom(ref fun) => (fun.0)(url).is_some(),
}
}
fn intercept(&self, uri: &Uri) -> Option<Uri> {
match self.intercept {
Intercept::All(ref u) => Some(u.clone()),
Intercept::Http(ref u) => {
if uri.scheme() == Some("http") {
Some(u.clone())
} else {
None
}
},
Intercept::Https(ref u) => {
if uri.scheme() == Some("https") {
Some(u.clone())
} else {
None
}
},
Intercept::Custom(ref fun) => {
(fun.0)(&into_url::to_url(uri))
.map(|u| into_url::to_uri(&u))
},
}
}
}
#[derive(Clone, Debug)]
enum Intercept {
All(Uri),
Http(Uri),
Https(Uri),
Custom(Custom),
}
#[derive(Clone)]
struct Custom(Arc<Fn(&Url) -> Option<Url> + Send + Sync + 'static>);
impl fmt::Debug for Custom {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.write_str("_")
}
}
// pub(crate)
pub fn intercept(proxy: &Proxy, uri: &Uri) -> Option<Uri> {
proxy.intercept(uri)
}
pub fn is_proxied(proxies: &[Proxy], uri: &Url) -> bool {
proxies.iter().any(|p| p.proxies(uri))
}
#[cfg(test)]
mod tests {
use super::*;
fn uri(s: &str) -> Uri {
s.parse().unwrap()
}
fn url(s: &str) -> Url {
s.parse().unwrap()
}
#[test]
fn test_http() {
let target = "http://example.domain/";
let p = Proxy::http(target).unwrap();
let http = "http://hyper.rs";
let other = "https://hyper.rs";
assert!(p.proxies(&url(http)));
assert_eq!(p.intercept(&uri(http)).unwrap(), target);
assert!(!p.proxies(&url(other)));
assert!(p.intercept(&uri(other)).is_none());
}
#[test]
fn test_https() {
let target = "http://example.domain/";
let p = Proxy::https(target).unwrap();
let http = "http://hyper.rs";
let other = "https://hyper.rs";
assert!(!p.proxies(&url(http)));
assert!(p.intercept(&uri(http)).is_none());
assert!(p.proxies(&url(other)));
assert_eq!(p.intercept(&uri(other)).unwrap(), target);
}
#[test]
fn test_all() {
let target = "http://example.domain/";
let p = Proxy::all(target).unwrap();
let http = "http://hyper.rs";
let https = "https://hyper.rs";
let other = "x-youve-never-heard-of-me-mr-proxy://hyper.rs";
assert!(p.proxies(&url(http)));
assert!(p.proxies(&url(https)));
assert!(p.proxies(&url(other)));
assert_eq!(p.intercept(&uri(http)).unwrap(), target);
assert_eq!(p.intercept(&uri(https)).unwrap(), target);
assert_eq!(p.intercept(&uri(other)).unwrap(), target);
}
#[test]
fn test_custom() {
let target1 = "http://example.domain/";
let target2 = "https://example.domain/";
let p = Proxy::custom(move |url| {
if url.host_str() == Some("hyper.rs") {
target1.parse().ok()
} else if url.scheme() == "http" {
target2.parse().ok()
} else {
None
}
});
let http = "http://seanmonstar.com";
let https = "https://hyper.rs";
let other = "x-youve-never-heard-of-me-mr-proxy://seanmonstar.com";
assert!(p.proxies(&url(http)));
assert!(p.proxies(&url(https)));
assert!(!p.proxies(&url(other)));
assert_eq!(p.intercept(&uri(http)).unwrap(), target2);
assert_eq!(p.intercept(&uri(https)).unwrap(), target1);
assert!(p.intercept(&uri(other)).is_none());
}
#[test]
fn test_is_proxied() {
let proxies = vec![
Proxy::http("http://example.domain").unwrap(),
Proxy::https("http://other.domain").unwrap(),
];
let http = "http://hyper.rs".parse().unwrap();
let https = "https://hyper.rs".parse().unwrap();
let other = "x-other://hyper.rs".parse().unwrap();
assert!(is_proxied(&proxies, &http));
assert!(is_proxied(&proxies, &https));
assert!(!is_proxied(&proxies, &other));
}
}

View File

@@ -1,7 +1,5 @@
extern crate reqwest;
use reqwest::header::Bearer;
#[macro_use]
mod support;
@@ -14,7 +12,6 @@ fn test_http_proxy() {
User-Agent: $USERAGENT\r\n\
Accept: */*\r\n\
Accept-Encoding: gzip\r\n\
Authorization: Bearer MY_SECRET_TOKEN\r\n\
\r\n\
",
response: b"\
@@ -22,16 +19,14 @@ fn test_http_proxy() {
Server: proxied\r\n\
Content-Length: 0\r\n\
\r\n\
";
"
};
let proxy_uri = format!("http://{}", server.addr());
let mut proxy = reqwest::Proxy::http(&proxy_uri).unwrap();
proxy.set_authorization(Bearer { token: "MY_SECRET_TOKEN".to_string() });
let proxy = format!("http://{}", server.addr());
let url = "http://hyper.rs/prox";
let res = reqwest::Client::builder()
.proxy(proxy)
.proxy(reqwest::Proxy::http(&proxy).unwrap())
.build()
.unwrap()
.get(url)

View File

@@ -52,11 +52,11 @@ pub fn spawn(txns: Vec<Txn>) -> Server {
let mut n = 0;
while n < expected.len() {
match socket.read(&mut buf[n..]) {
Ok(0) => break,
Err(e) => panic!(e),
Ok(0) | Err(_) => break,
Ok(nread) => n += nread,
}
}
match (::std::str::from_utf8(&expected), ::std::str::from_utf8(&buf[..n])) {
(Ok(expected), Ok(received)) => assert_eq!(expected, received),
_ => assert_eq!(expected, &buf[..n]),