committed by
					
						 Sean McArthur
						Sean McArthur
					
				
			
			
				
	
			
			
			
						parent
						
							7bd3619ece
						
					
				
				
					commit
					c417d6dab8
				
			| @@ -1,10 +1,10 @@ | ||||
| use bytes::{Buf, BufMut, IntoBuf}; | ||||
| use futures::{Async, Future, Poll}; | ||||
| use hyper::client::{HttpConnector, Service}; | ||||
| use hyper::Uri; | ||||
| use http::uri::Scheme; | ||||
| use hyper::client::{HttpConnector}; | ||||
| use hyper::client::connect::{Connect, Connected, Destination}; | ||||
| use hyper_tls::{HttpsConnector, MaybeHttpsStream}; | ||||
| use native_tls::TlsConnector; | ||||
| use tokio_core::reactor::Handle; | ||||
| use tokio_io::{AsyncRead, AsyncWrite}; | ||||
| use tokio_tls::{TlsConnectorExt, TlsStream}; | ||||
|  | ||||
| @@ -22,8 +22,8 @@ pub struct Connector { | ||||
| } | ||||
|  | ||||
| impl Connector { | ||||
|     pub fn new(threads: usize, tls: TlsConnector, proxies: Arc<Vec<Proxy>>, handle: &Handle) -> Connector { | ||||
|         let mut http = HttpConnector::new(threads, handle); | ||||
|     pub fn new(threads: usize, tls: TlsConnector, proxies: Arc<Vec<Proxy>>) -> Connector { | ||||
|         let mut http = HttpConnector::new(threads); | ||||
|         http.enforce_http(false); | ||||
|         let https = HttpsConnector::from((http, tls.clone())); | ||||
|  | ||||
| @@ -39,41 +39,53 @@ impl Connector { | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl Service for Connector { | ||||
|     type Request = Uri; | ||||
|     type Response = Conn; | ||||
| impl Connect for Connector { | ||||
|     type Transport = Conn; | ||||
|     type Error = io::Error; | ||||
|     type Future = Connecting; | ||||
|  | ||||
|     fn call(&self, uri: Uri) -> Self::Future { | ||||
|     fn connect(&self, dst: Destination) -> Self::Future { | ||||
|         for prox in self.proxies.iter() { | ||||
|             if let Some(puri) = proxy::intercept(prox, &uri) { | ||||
|                 trace!("proxy({:?}) intercepts {:?}", puri, uri); | ||||
|                 if uri.scheme() == Some("https") { | ||||
|                     let host = uri.host().unwrap().to_owned(); | ||||
|                     let port = uri.port().unwrap_or(443); | ||||
|             if let Some(puri) = proxy::intercept(prox, &dst) { | ||||
|                 trace!("proxy({:?}) intercepts {:?}", puri, dst); | ||||
|                 let mut ndst = dst.clone(); | ||||
|                 let new_scheme = puri | ||||
|                     .scheme_part() | ||||
|                     .map(Scheme::as_str) | ||||
|                     .unwrap_or("http"); | ||||
|                 ndst.set_scheme(new_scheme) | ||||
|                     .expect("proxy target scheme should be valid"); | ||||
|  | ||||
|                 ndst.set_host(puri.host().expect("proxy target should have host")) | ||||
|                     .expect("proxy target host should be valid"); | ||||
|  | ||||
|                 ndst.set_port(puri.port()); | ||||
|  | ||||
|                 if dst.scheme() == "https" { | ||||
|                     let host = dst.host().to_owned(); | ||||
|                     let port = dst.port().unwrap_or(443); | ||||
|                     let tls = self.tls.clone(); | ||||
|                     return Box::new(self.https.call(puri).and_then(move |conn| { | ||||
|                     return Box::new(self.https.connect(ndst).and_then(move |(conn, connected)| { | ||||
|                         trace!("tunneling HTTPS over proxy"); | ||||
|                         tunnel(conn, host.clone(), port) | ||||
|                             .and_then(move |tunneled| { | ||||
|                                 tls.connect_async(&host, tunneled) | ||||
|                                     .map_err(|e| io::Error::new(io::ErrorKind::Other, e)) | ||||
|                             }) | ||||
|                             .map(|io| Conn::Proxied(io)) | ||||
|                             .map(|io| (Conn::Proxied(io), connected.proxy(true))) | ||||
|                     })); | ||||
|                 } | ||||
|                 return Box::new(self.https.call(puri).map(|io| Conn::Normal(io))); | ||||
|                 return Box::new(self.https.connect(ndst).map(|(io, connected)| (Conn::Normal(io), connected.proxy(true)))); | ||||
|             } | ||||
|         } | ||||
|         Box::new(self.https.call(uri).map(|io| Conn::Normal(io))) | ||||
|         Box::new(self.https.connect(dst).map(|(io, connected)| (Conn::Normal(io), connected))) | ||||
|     } | ||||
| } | ||||
|  | ||||
| type HttpStream = <HttpConnector as Service>::Response; | ||||
| type HttpStream = <HttpConnector as Connect>::Transport; | ||||
| type HttpsStream = MaybeHttpsStream<HttpStream>; | ||||
|  | ||||
| pub type Connecting = Box<Future<Item=Conn, Error=io::Error>>; | ||||
| pub type Connecting = Box<Future<Item=(Conn, Connected), Error=io::Error> + Send>; | ||||
|  | ||||
| pub enum Conn { | ||||
|     Normal(HttpsStream), | ||||
| @@ -214,8 +226,8 @@ mod tests { | ||||
|     use std::net::TcpListener; | ||||
|     use std::thread; | ||||
|     use futures::Future; | ||||
|     use tokio_core::reactor::Core; | ||||
|     use tokio_core::net::TcpStream; | ||||
|     use tokio::runtime::current_thread::Runtime; | ||||
|     use tokio::net::TcpStream; | ||||
|     use super::tunnel; | ||||
|  | ||||
|  | ||||
| @@ -251,44 +263,44 @@ mod tests { | ||||
|     fn test_tunnel() { | ||||
|         let addr = mock_tunnel!(); | ||||
|  | ||||
|         let mut core = Core::new().unwrap(); | ||||
|         let work = TcpStream::connect(&addr, &core.handle()); | ||||
|         let mut rt = Runtime::new().unwrap(); | ||||
|         let work = TcpStream::connect(&addr); | ||||
|         let host = addr.ip().to_string(); | ||||
|         let port = addr.port(); | ||||
|         let work = work.and_then(|tcp| { | ||||
|             tunnel(tcp, host, port) | ||||
|         }); | ||||
|  | ||||
|         core.run(work).unwrap(); | ||||
|         rt.block_on(work).unwrap(); | ||||
|     } | ||||
|  | ||||
|     #[test] | ||||
|     fn test_tunnel_eof() { | ||||
|         let addr = mock_tunnel!(b"HTTP/1.1 200 OK"); | ||||
|  | ||||
|         let mut core = Core::new().unwrap(); | ||||
|         let work = TcpStream::connect(&addr, &core.handle()); | ||||
|         let mut rt = Runtime::new().unwrap(); | ||||
|         let work = TcpStream::connect(&addr); | ||||
|         let host = addr.ip().to_string(); | ||||
|         let port = addr.port(); | ||||
|         let work = work.and_then(|tcp| { | ||||
|             tunnel(tcp, host, port) | ||||
|         }); | ||||
|  | ||||
|         core.run(work).unwrap_err(); | ||||
|         rt.block_on(work).unwrap_err(); | ||||
|     } | ||||
|  | ||||
|     #[test] | ||||
|     fn test_tunnel_bad_response() { | ||||
|         let addr = mock_tunnel!(b"foo bar baz hallo"); | ||||
|  | ||||
|         let mut core = Core::new().unwrap(); | ||||
|         let work = TcpStream::connect(&addr, &core.handle()); | ||||
|         let mut rt = Runtime::new().unwrap(); | ||||
|         let work = TcpStream::connect(&addr); | ||||
|         let host = addr.ip().to_string(); | ||||
|         let port = addr.port(); | ||||
|         let work = work.and_then(|tcp| { | ||||
|             tunnel(tcp, host, port) | ||||
|         }); | ||||
|  | ||||
|         core.run(work).unwrap_err(); | ||||
|         rt.block_on(work).unwrap_err(); | ||||
|     } | ||||
| } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user