feat(client): add Resolve, used by HttpConnector
				
					
				
			This introduces a `Resolve` trait to describe asynchronous DNS resolution. The `HttpConnector` can be configured with a resolver, allowing a user to still use all the functionality of the `HttpConnector`, while customizing the DNS resolution. To prevent a breaking change, the `HttpConnector` has its `Resolve` generic set by default to `GaiResolver`. This is same as the existing resolver, which uses `getaddrinfo` inside a thread pool. Closes #1517
This commit is contained in:
		| @@ -4,22 +4,18 @@ use std::error::Error as StdError; | ||||
| use std::io; | ||||
| use std::mem; | ||||
| use std::net::{IpAddr, SocketAddr}; | ||||
| use std::sync::Arc; | ||||
| use std::time::{Duration, Instant}; | ||||
|  | ||||
| use futures::{Async, Future, Poll}; | ||||
| use futures::future::{Executor, ExecuteError}; | ||||
| use futures::sync::oneshot; | ||||
| use futures_cpupool::{Builder as CpuPoolBuilder}; | ||||
| use futures::future::{Executor}; | ||||
| use http::uri::Scheme; | ||||
| use net2::TcpBuilder; | ||||
| use tokio_reactor::Handle; | ||||
| use tokio_tcp::{TcpStream, ConnectFuture}; | ||||
| use tokio_timer::Delay; | ||||
|  | ||||
| use super::{dns, Connect, Connected, Destination}; | ||||
|  | ||||
| use self::sealed::HttpConnectorBlockingTask; | ||||
| use super::{Connect, Connected, Destination}; | ||||
| use super::dns::{self, GaiResolver, Resolve}; | ||||
|  | ||||
| /// A connector for the `http` scheme. | ||||
| /// | ||||
| @@ -30,14 +26,14 @@ use self::sealed::HttpConnectorBlockingTask; | ||||
| /// Sets the [`HttpInfo`](HttpInfo) value on responses, which includes | ||||
| /// transport information such as the remote socket address used. | ||||
| #[derive(Clone)] | ||||
| pub struct HttpConnector { | ||||
|     executor: HttpConnectExecutor, | ||||
| pub struct HttpConnector<R = GaiResolver> { | ||||
|     enforce_http: bool, | ||||
|     handle: Option<Handle>, | ||||
|     keep_alive_timeout: Option<Duration>, | ||||
|     nodelay: bool, | ||||
|     local_address: Option<IpAddr>, | ||||
|     happy_eyeballs_timeout: Option<Duration>, | ||||
|     keep_alive_timeout: Option<Duration>, | ||||
|     local_address: Option<IpAddr>, | ||||
|     nodelay: bool, | ||||
|     resolver: R, | ||||
|     reuse_address: bool, | ||||
| } | ||||
|  | ||||
| @@ -78,36 +74,45 @@ impl HttpConnector { | ||||
|     /// Takes number of DNS worker threads. | ||||
|     #[inline] | ||||
|     pub fn new(threads: usize) -> HttpConnector { | ||||
|         HttpConnector::new_with_handle_opt(threads, None) | ||||
|         HttpConnector::new_with_resolver(GaiResolver::new(threads)) | ||||
|     } | ||||
|  | ||||
|     /// Construct a new HttpConnector with a specific Tokio handle. | ||||
|     #[doc(hidden)] | ||||
|     #[deprecated(note = "Use HttpConnector::set_reactor to set a reactor handle")] | ||||
|     pub fn new_with_handle(threads: usize, handle: Handle) -> HttpConnector { | ||||
|         HttpConnector::new_with_handle_opt(threads, Some(handle)) | ||||
|     } | ||||
|  | ||||
|     fn new_with_handle_opt(threads: usize, handle: Option<Handle>) -> HttpConnector { | ||||
|         let pool = CpuPoolBuilder::new() | ||||
|             .name_prefix("hyper-dns") | ||||
|             .pool_size(threads) | ||||
|             .create(); | ||||
|         HttpConnector::new_with_executor(pool, handle) | ||||
|         let resolver = GaiResolver::new(threads); | ||||
|         let mut http = HttpConnector::new_with_resolver(resolver); | ||||
|         http.set_reactor(Some(handle)); | ||||
|         http | ||||
|     } | ||||
|  | ||||
|     /// Construct a new HttpConnector. | ||||
|     /// | ||||
|     /// Takes an executor to run blocking tasks on. | ||||
|     /// Takes an executor to run blocking `getaddrinfo` tasks on. | ||||
|     pub fn new_with_executor<E: 'static>(executor: E, handle: Option<Handle>) -> HttpConnector | ||||
|         where E: Executor<HttpConnectorBlockingTask> + Send + Sync | ||||
|         where E: Executor<dns::sealed::GaiTask> + Send + Sync | ||||
|     { | ||||
|         let resolver = GaiResolver::new_with_executor(executor); | ||||
|         let mut http = HttpConnector::new_with_resolver(resolver); | ||||
|         http.set_reactor(handle); | ||||
|         http | ||||
|     } | ||||
| } | ||||
|  | ||||
|  | ||||
| impl<R> HttpConnector<R> { | ||||
|     /// Construct a new HttpConnector. | ||||
|     /// | ||||
|     /// Takes a `Resolve` to handle DNS lookups. | ||||
|     pub fn new_with_resolver(resolver: R) -> HttpConnector<R> { | ||||
|         HttpConnector { | ||||
|             executor: HttpConnectExecutor(Arc::new(executor)), | ||||
|             enforce_http: true, | ||||
|             handle, | ||||
|             keep_alive_timeout: None, | ||||
|             nodelay: false, | ||||
|             local_address: None, | ||||
|             handle: None, | ||||
|             happy_eyeballs_timeout: Some(Duration::from_millis(300)), | ||||
|             keep_alive_timeout: None, | ||||
|             local_address: None, | ||||
|             nodelay: false, | ||||
|             resolver, | ||||
|             reuse_address: false, | ||||
|         } | ||||
|     } | ||||
| @@ -120,6 +125,14 @@ impl HttpConnector { | ||||
|         self.enforce_http = is_enforced; | ||||
|     } | ||||
|  | ||||
|     /// Set a handle to a `Reactor` to register connections to. | ||||
|     /// | ||||
|     /// If `None`, the implicit default reactor will be used. | ||||
|     #[inline] | ||||
|     pub fn set_reactor(&mut self, handle: Option<Handle>) { | ||||
|         self.handle = handle; | ||||
|     } | ||||
|  | ||||
|     /// Set that all sockets have `SO_KEEPALIVE` set with the supplied duration. | ||||
|     /// | ||||
|     /// If `None`, the option will not be set. | ||||
| @@ -175,17 +188,22 @@ impl HttpConnector { | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl fmt::Debug for HttpConnector { | ||||
| // R: Debug required for now to allow adding it to debug output later... | ||||
| impl<R: fmt::Debug> fmt::Debug for HttpConnector<R> { | ||||
|     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { | ||||
|         f.debug_struct("HttpConnector") | ||||
|             .finish() | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl Connect for HttpConnector { | ||||
| impl<R> Connect for HttpConnector<R> | ||||
| where | ||||
|     R: Resolve + Clone + Send + Sync, | ||||
|     R::Future: Send, | ||||
| { | ||||
|     type Transport = TcpStream; | ||||
|     type Error = io::Error; | ||||
|     type Future = HttpConnecting; | ||||
|     type Future = HttpConnecting<R>; | ||||
|  | ||||
|     fn connect(&self, dst: Destination) -> Self::Future { | ||||
|         trace!( | ||||
| @@ -213,11 +231,12 @@ impl Connect for HttpConnector { | ||||
|         }; | ||||
|  | ||||
|         HttpConnecting { | ||||
|             state: State::Lazy(self.executor.clone(), host.into(), port, self.local_address), | ||||
|             state: State::Lazy(self.resolver.clone(), host.into(), self.local_address), | ||||
|             handle: self.handle.clone(), | ||||
|             happy_eyeballs_timeout: self.happy_eyeballs_timeout, | ||||
|             keep_alive_timeout: self.keep_alive_timeout, | ||||
|             nodelay: self.nodelay, | ||||
|             happy_eyeballs_timeout: self.happy_eyeballs_timeout, | ||||
|             port, | ||||
|             reuse_address: self.reuse_address, | ||||
|         } | ||||
|     } | ||||
| @@ -231,12 +250,13 @@ impl HttpInfo { | ||||
| } | ||||
|  | ||||
| #[inline] | ||||
| fn invalid_url(err: InvalidUrl, handle: &Option<Handle>) -> HttpConnecting { | ||||
| fn invalid_url<R: Resolve>(err: InvalidUrl, handle: &Option<Handle>) -> HttpConnecting<R> { | ||||
|     HttpConnecting { | ||||
|         state: State::Error(Some(io::Error::new(io::ErrorKind::InvalidInput, err))), | ||||
|         handle: handle.clone(), | ||||
|         keep_alive_timeout: None, | ||||
|         nodelay: false, | ||||
|         port: 0, | ||||
|         happy_eyeballs_timeout: None, | ||||
|         reuse_address: false, | ||||
|     } | ||||
| @@ -266,23 +286,24 @@ impl StdError for InvalidUrl { | ||||
| } | ||||
| /// A Future representing work to connect to a URL. | ||||
| #[must_use = "futures do nothing unless polled"] | ||||
| pub struct HttpConnecting { | ||||
|     state: State, | ||||
| pub struct HttpConnecting<R: Resolve = GaiResolver> { | ||||
|     state: State<R>, | ||||
|     handle: Option<Handle>, | ||||
|     happy_eyeballs_timeout: Option<Duration>, | ||||
|     keep_alive_timeout: Option<Duration>, | ||||
|     nodelay: bool, | ||||
|     happy_eyeballs_timeout: Option<Duration>, | ||||
|     port: u16, | ||||
|     reuse_address: bool, | ||||
| } | ||||
|  | ||||
| enum State { | ||||
|     Lazy(HttpConnectExecutor, String, u16, Option<IpAddr>), | ||||
|     Resolving(oneshot::SpawnHandle<dns::IpAddrs, io::Error>, Option<IpAddr>), | ||||
| enum State<R: Resolve> { | ||||
|     Lazy(R, String, Option<IpAddr>), | ||||
|     Resolving(R::Future, Option<IpAddr>), | ||||
|     Connecting(ConnectingTcp), | ||||
|     Error(Option<io::Error>), | ||||
| } | ||||
|  | ||||
| impl Future for HttpConnecting { | ||||
| impl<R: Resolve> Future for HttpConnecting<R> { | ||||
|     type Item = (TcpStream, Connected); | ||||
|     type Error = io::Error; | ||||
|  | ||||
| @@ -290,22 +311,26 @@ impl Future for HttpConnecting { | ||||
|         loop { | ||||
|             let state; | ||||
|             match self.state { | ||||
|                 State::Lazy(ref executor, ref mut host, port, local_addr) => { | ||||
|                 State::Lazy(ref resolver, ref mut host, local_addr) => { | ||||
|                     // If the host is already an IP addr (v4 or v6), | ||||
|                     // skip resolving the dns and start connecting right away. | ||||
|                     if let Some(addrs) = dns::IpAddrs::try_parse(host, port) { | ||||
|                     if let Some(addrs) = dns::IpAddrs::try_parse(host, self.port) { | ||||
|                         state = State::Connecting(ConnectingTcp::new( | ||||
|                             local_addr, addrs, self.happy_eyeballs_timeout, self.reuse_address)); | ||||
|                     } else { | ||||
|                         let host = mem::replace(host, String::new()); | ||||
|                         let work = dns::Work::new(host, port); | ||||
|                         state = State::Resolving(oneshot::spawn(work, executor), local_addr); | ||||
|                         let name = dns::Name::new(mem::replace(host, String::new())); | ||||
|                         state = State::Resolving(resolver.resolve(name), local_addr); | ||||
|                     } | ||||
|                 }, | ||||
|                 State::Resolving(ref mut future, local_addr) => { | ||||
|                     match try!(future.poll()) { | ||||
|                         Async::NotReady => return Ok(Async::NotReady), | ||||
|                         Async::Ready(addrs) => { | ||||
|                             let port = self.port; | ||||
|                             let addrs = addrs | ||||
|                                 .map(|addr| SocketAddr::new(addr, port)) | ||||
|                                 .collect(); | ||||
|                             let addrs = dns::IpAddrs::new(addrs); | ||||
|                             state = State::Connecting(ConnectingTcp::new( | ||||
|                                 local_addr, addrs, self.happy_eyeballs_timeout, self.reuse_address)); | ||||
|                         } | ||||
| @@ -335,7 +360,7 @@ impl Future for HttpConnecting { | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl fmt::Debug for HttpConnecting { | ||||
| impl<R: Resolve + fmt::Debug> fmt::Debug for HttpConnecting<R> { | ||||
|     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { | ||||
|         f.pad("HttpConnecting") | ||||
|     } | ||||
| @@ -522,40 +547,6 @@ impl ConnectingTcp { | ||||
|     } | ||||
| } | ||||
|  | ||||
| // Make this Future unnameable outside of this crate. | ||||
| mod sealed { | ||||
|     use super::*; | ||||
|     // Blocking task to be executed on a thread pool. | ||||
|     pub struct HttpConnectorBlockingTask { | ||||
|         pub(super) work: oneshot::Execute<dns::Work> | ||||
|     } | ||||
|  | ||||
|     impl fmt::Debug for HttpConnectorBlockingTask { | ||||
|         fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { | ||||
|             f.pad("HttpConnectorBlockingTask") | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     impl Future for HttpConnectorBlockingTask { | ||||
|         type Item = (); | ||||
|         type Error = (); | ||||
|  | ||||
|         fn poll(&mut self) -> Poll<(), ()> { | ||||
|             self.work.poll() | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| #[derive(Clone)] | ||||
| struct HttpConnectExecutor(Arc<Executor<HttpConnectorBlockingTask> + Send + Sync>); | ||||
|  | ||||
| impl Executor<oneshot::Execute<dns::Work>> for HttpConnectExecutor { | ||||
|     fn execute(&self, future: oneshot::Execute<dns::Work>) -> Result<(), ExecuteError<oneshot::Execute<dns::Work>>> { | ||||
|         self.0.execute(HttpConnectorBlockingTask { work: future }) | ||||
|             .map_err(|err| ExecuteError::new(err.kind(), err.into_future().work)) | ||||
|     } | ||||
| } | ||||
|  | ||||
| #[cfg(test)] | ||||
| mod tests { | ||||
|     use std::io; | ||||
|   | ||||
		Reference in New Issue
	
	Block a user