refactor(client): use a tokio-threadpool for the GaiResolver
This commit is contained in:
		| @@ -16,12 +16,12 @@ use std::net::{ | |||||||
| use std::str::FromStr; | use std::str::FromStr; | ||||||
| use std::sync::Arc; | use std::sync::Arc; | ||||||
|  |  | ||||||
|  | use futures_util::{FutureExt, StreamExt}; | ||||||
| use tokio_executor::TypedExecutor; | use tokio_executor::TypedExecutor; | ||||||
| use tokio_sync::oneshot; | use tokio_sync::{mpsc, oneshot}; | ||||||
| use tokio_threadpool; | use tokio_threadpool; | ||||||
|  |  | ||||||
| use crate::common::{Future, Pin, Poll, Unpin, task}; | use crate::common::{Future, Never, Pin, Poll, Unpin, task}; | ||||||
| use self::sealed::GaiTask; |  | ||||||
|  |  | ||||||
| /// Resolve a hostname to a set of IP addresses. | /// Resolve a hostname to a set of IP addresses. | ||||||
| pub trait Resolve: Unpin { | pub trait Resolve: Unpin { | ||||||
| @@ -42,9 +42,15 @@ pub struct Name { | |||||||
| /// A resolver using blocking `getaddrinfo` calls in a threadpool. | /// A resolver using blocking `getaddrinfo` calls in a threadpool. | ||||||
| #[derive(Clone)] | #[derive(Clone)] | ||||||
| pub struct GaiResolver { | pub struct GaiResolver { | ||||||
|     executor: GaiExecutor, |     tx: tokio_threadpool::Sender, | ||||||
|  |     /// A handle to keep the threadpool alive until all `GaiResolver` clones | ||||||
|  |     /// have been dropped. | ||||||
|  |     _threadpool_keep_alive: ThreadPoolKeepAlive, | ||||||
| } | } | ||||||
|  |  | ||||||
|  | #[derive(Clone)] | ||||||
|  | struct ThreadPoolKeepAlive(mpsc::Sender<Never>); | ||||||
|  |  | ||||||
| /// An iterator of IP addresses returned from `getaddrinfo`. | /// An iterator of IP addresses returned from `getaddrinfo`. | ||||||
| pub struct GaiAddrs { | pub struct GaiAddrs { | ||||||
|     inner: IpAddrs, |     inner: IpAddrs, | ||||||
| @@ -52,8 +58,8 @@ pub struct GaiAddrs { | |||||||
|  |  | ||||||
| /// A future to resole a name returned by `GaiResolver`. | /// A future to resole a name returned by `GaiResolver`. | ||||||
| pub struct GaiFuture { | pub struct GaiFuture { | ||||||
|     //rx: oneshot::SpawnHandle<IpAddrs, io::Error>, |     rx: oneshot::Receiver<Result<IpAddrs, io::Error>>, | ||||||
|     blocking: GaiBlocking, |     _threadpool_keep_alive: ThreadPoolKeepAlive, | ||||||
| } | } | ||||||
|  |  | ||||||
| impl Name { | impl Name { | ||||||
| @@ -96,15 +102,11 @@ pub struct InvalidNameError(()); | |||||||
|  |  | ||||||
| impl fmt::Display for InvalidNameError { | impl fmt::Display for InvalidNameError { | ||||||
|     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { |     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { | ||||||
|         self.description().fmt(f) |         f.write_str("Not a valid domain name") | ||||||
|     } |     } | ||||||
| } | } | ||||||
|  |  | ||||||
| impl Error for InvalidNameError { | impl Error for InvalidNameError {} | ||||||
|     fn description(&self) -> &str { |  | ||||||
|         "Not a valid domain name" |  | ||||||
|     } |  | ||||||
| } |  | ||||||
|  |  | ||||||
|  |  | ||||||
| impl GaiResolver { | impl GaiResolver { | ||||||
| @@ -112,36 +114,39 @@ impl GaiResolver { | |||||||
|     /// |     /// | ||||||
|     /// Takes number of DNS worker threads. |     /// Takes number of DNS worker threads. | ||||||
|     pub fn new(threads: usize) -> Self { |     pub fn new(threads: usize) -> Self { | ||||||
|         GaiResolver { |         let pool = tokio_threadpool::Builder::new() | ||||||
|             executor: GaiExecutor, |  | ||||||
|         } |  | ||||||
|         /* |  | ||||||
|         use tokio_threadpool::Builder; |  | ||||||
|  |  | ||||||
|         let pool = Builder::new() |  | ||||||
|             .name_prefix("hyper-dns-gai-resolver") |             .name_prefix("hyper-dns-gai-resolver") | ||||||
|             // not for CPU tasks, so only spawn workers |             // not for CPU tasks, so only spawn workers | ||||||
|             // in blocking mode |             // in blocking mode | ||||||
|             .pool_size(1) |             .pool_size(1) | ||||||
|             .max_blocking(threads) |             .max_blocking(threads) | ||||||
|             .build(); |             .build(); | ||||||
|         GaiResolver::new_with_executor(pool) |  | ||||||
|         */ |         let tx = pool.sender().clone(); | ||||||
|  |  | ||||||
|  |         // The pool will start to shutdown once `pool` is dropped, | ||||||
|  |         // so to keep it alive, we spawn a future onto the pool itself | ||||||
|  |         // that will only resolve once all `GaiResolver` requests | ||||||
|  |         // are finished. | ||||||
|  |         let (shutdown_tx, shutdown_rx) = mpsc::channel(1); | ||||||
|  |  | ||||||
|  |         let on_shutdown = shutdown_rx | ||||||
|  |             .into_future() | ||||||
|  |             .map(move |(next, _rx)| { | ||||||
|  |                 match next { | ||||||
|  |                     Some(never) => match never {}, | ||||||
|  |                     None => (), | ||||||
|                 } |                 } | ||||||
|  |  | ||||||
|     /* |                 drop(pool) | ||||||
|     /// Construct a new `GaiResolver` with a shared thread pool executor. |             }); | ||||||
|     /// |         tx.spawn(on_shutdown); | ||||||
|     /// Takes an executor to run blocking `getaddrinfo` tasks on. |  | ||||||
|     /*pub */fn new_with_executor<E: 'static>(executor: E) -> Self |  | ||||||
|     where |  | ||||||
|         E: TypedExecutor<GaiTask> + Send + Sync, |  | ||||||
|     { |  | ||||||
|         GaiResolver { |         GaiResolver { | ||||||
|             executor: GaiExecutor(Arc::new(executor)), |             tx, | ||||||
|  |             _threadpool_keep_alive: ThreadPoolKeepAlive(shutdown_tx), | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
|     */ |  | ||||||
| } | } | ||||||
|  |  | ||||||
| impl Resolve for GaiResolver { | impl Resolve for GaiResolver { | ||||||
| @@ -149,11 +154,14 @@ impl Resolve for GaiResolver { | |||||||
|     type Future = GaiFuture; |     type Future = GaiFuture; | ||||||
|  |  | ||||||
|     fn resolve(&self, name: Name) -> Self::Future { |     fn resolve(&self, name: Name) -> Self::Future { | ||||||
|         let blocking = GaiBlocking::new(name.host); |         let (tx, rx) = oneshot::channel(); | ||||||
|         //let rx = oneshot::spawn(blocking, &self.executor); |         self.tx.spawn(GaiBlocking { | ||||||
|  |             host: name.host, | ||||||
|  |             tx: Some(tx), | ||||||
|  |         }); | ||||||
|         GaiFuture { |         GaiFuture { | ||||||
|             //rx, |             rx, | ||||||
|             blocking, |             _threadpool_keep_alive: self._threadpool_keep_alive.clone(), | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
| } | } | ||||||
| @@ -168,13 +176,11 @@ impl Future for GaiFuture { | |||||||
|     type Output = Result<GaiAddrs, io::Error>; |     type Output = Result<GaiAddrs, io::Error>; | ||||||
|  |  | ||||||
|     fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { |     fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { | ||||||
|         /* |         Pin::new(&mut self.rx).poll(cx).map(|res| match res { | ||||||
|         let addrs = try_ready!(self.rx.poll()); |             Ok(Ok(addrs)) => Ok(GaiAddrs { inner: addrs }), | ||||||
|         Ok(Async::Ready(GaiAddrs { |             Ok(Err(err)) => Err(err), | ||||||
|             inner: addrs, |             Err(_canceled) => unreachable!("GaiResolver threadpool shutdown"), | ||||||
|         })) |         }) | ||||||
|         */ |  | ||||||
|         Poll::Ready(self.blocking.block().map(|addrs| GaiAddrs { inner: addrs })) |  | ||||||
|     } |     } | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -198,27 +204,13 @@ impl fmt::Debug for GaiAddrs { | |||||||
|     } |     } | ||||||
| } | } | ||||||
|  |  | ||||||
| #[derive(Clone)] |  | ||||||
| struct GaiExecutor/*(Arc<dyn Executor<GaiTask> + Send + Sync>)*/; |  | ||||||
|  |  | ||||||
| /* |  | ||||||
| impl Executor<oneshot::Execute<GaiBlocking>> for GaiExecutor { |  | ||||||
|     fn execute(&self, future: oneshot::Execute<GaiBlocking>) -> Result<(), ExecuteError<oneshot::Execute<GaiBlocking>>> { |  | ||||||
|         self.0.execute(GaiTask { work: future }) |  | ||||||
|             .map_err(|err| ExecuteError::new(err.kind(), err.into_future().work)) |  | ||||||
|     } |  | ||||||
| } |  | ||||||
| */ |  | ||||||
|  |  | ||||||
| pub(super) struct GaiBlocking { | pub(super) struct GaiBlocking { | ||||||
|     host: String, |     host: String, | ||||||
|  |     tx: Option<oneshot::Sender<io::Result<IpAddrs>>>, | ||||||
| } | } | ||||||
|  |  | ||||||
| impl GaiBlocking { | impl GaiBlocking { | ||||||
|     pub(super) fn new(host: String) -> GaiBlocking { |  | ||||||
|         GaiBlocking { host } |  | ||||||
|     } |  | ||||||
|  |  | ||||||
|     fn block(&self) -> io::Result<IpAddrs> { |     fn block(&self) -> io::Result<IpAddrs> { | ||||||
|         debug!("resolving host={:?}", self.host); |         debug!("resolving host={:?}", self.host); | ||||||
|         (&*self.host, 0).to_socket_addrs() |         (&*self.host, 0).to_socket_addrs() | ||||||
| @@ -227,18 +219,25 @@ impl GaiBlocking { | |||||||
|     } |     } | ||||||
| } | } | ||||||
|  |  | ||||||
| /* |  | ||||||
| impl Future for GaiBlocking { | impl Future for GaiBlocking { | ||||||
|     type Item = IpAddrs; |     type Output = (); | ||||||
|     type Error = io::Error; |  | ||||||
|  |     fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { | ||||||
|  |         if self.tx.as_mut().expect("polled after complete").poll_closed(cx).is_ready() { | ||||||
|  |             trace!("resolve future canceled for {:?}", self.host); | ||||||
|  |             return Poll::Ready(()); | ||||||
|  |         } | ||||||
|  |  | ||||||
|     fn poll(&mut self) -> Poll<Self::Item, Self::Error> { |  | ||||||
|         debug!("resolving host={:?}", self.host); |         debug!("resolving host={:?}", self.host); | ||||||
|         (&*self.host, 0).to_socket_addrs() |         let res = (&*self.host, 0).to_socket_addrs() | ||||||
|             .map(|i| Async::Ready(IpAddrs { iter: i })) |             .map(|i| IpAddrs { iter: i }); | ||||||
|  |  | ||||||
|  |         let tx = self.tx.take().expect("polled after complete"); | ||||||
|  |         let _ = tx.send(res); | ||||||
|  |  | ||||||
|  |         Poll::Ready(()) | ||||||
|     } |     } | ||||||
| } | } | ||||||
| */ |  | ||||||
|  |  | ||||||
| pub(super) struct IpAddrs { | pub(super) struct IpAddrs { | ||||||
|     iter: vec::IntoIter<SocketAddr>, |     iter: vec::IntoIter<SocketAddr>, | ||||||
| @@ -297,33 +296,6 @@ impl Iterator for IpAddrs { | |||||||
|     } |     } | ||||||
| } | } | ||||||
|  |  | ||||||
| // Make this Future unnameable outside of this crate. |  | ||||||
| pub(super) mod sealed { |  | ||||||
|     use super::*; |  | ||||||
|     // Blocking task to be executed on a thread pool. |  | ||||||
|     pub struct GaiTask { |  | ||||||
|         //pub(super) work: oneshot::Execute<GaiBlocking> |  | ||||||
|     } |  | ||||||
|  |  | ||||||
|     impl fmt::Debug for GaiTask { |  | ||||||
|         fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { |  | ||||||
|             f.pad("GaiTask") |  | ||||||
|         } |  | ||||||
|     } |  | ||||||
|  |  | ||||||
|     /* |  | ||||||
|     impl Future for GaiTask { |  | ||||||
|         type Item = (); |  | ||||||
|         type Error = (); |  | ||||||
|  |  | ||||||
|         fn poll(&mut self) -> Poll<(), ()> { |  | ||||||
|             self.work.poll() |  | ||||||
|         } |  | ||||||
|     } |  | ||||||
|     */ |  | ||||||
| } |  | ||||||
|  |  | ||||||
|  |  | ||||||
| /// A resolver using `getaddrinfo` calls via the `tokio_threadpool::blocking` API. | /// A resolver using `getaddrinfo` calls via the `tokio_threadpool::blocking` API. | ||||||
| /// | /// | ||||||
| /// Unlike the `GaiResolver` this will not spawn dedicated threads, but only works when running on the | /// Unlike the `GaiResolver` this will not spawn dedicated threads, but only works when running on the | ||||||
|   | |||||||
| @@ -81,29 +81,6 @@ impl HttpConnector { | |||||||
|     pub fn new(threads: usize) -> HttpConnector { |     pub fn new(threads: usize) -> HttpConnector { | ||||||
|         HttpConnector::new_with_resolver(GaiResolver::new(threads)) |         HttpConnector::new_with_resolver(GaiResolver::new(threads)) | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     #[doc(hidden)] |  | ||||||
|     #[deprecated(note = "Use HttpConnector::set_reactor to set a reactor handle")] |  | ||||||
|     pub fn new_with_handle(threads: usize, handle: Handle) -> HttpConnector { |  | ||||||
|         let resolver = GaiResolver::new(threads); |  | ||||||
|         let mut http = HttpConnector::new_with_resolver(resolver); |  | ||||||
|         http.set_reactor(Some(handle)); |  | ||||||
|         http |  | ||||||
|     } |  | ||||||
|  |  | ||||||
|     /* |  | ||||||
|     /// Construct a new HttpConnector. |  | ||||||
|     /// |  | ||||||
|     /// Takes an executor to run blocking `getaddrinfo` tasks on. |  | ||||||
|     pub fn new_with_executor<E: 'static>(executor: E, handle: Option<Handle>) -> HttpConnector |  | ||||||
|         where E: Executor<dns::sealed::GaiTask> + Send + Sync |  | ||||||
|     { |  | ||||||
|         let resolver = GaiResolver::new_with_executor(executor); |  | ||||||
|         let mut http = HttpConnector::new_with_resolver(resolver); |  | ||||||
|         http.set_reactor(handle); |  | ||||||
|         http |  | ||||||
|     } |  | ||||||
|     */ |  | ||||||
| } | } | ||||||
|  |  | ||||||
| impl HttpConnector<TokioThreadpoolGaiResolver> { | impl HttpConnector<TokioThreadpoolGaiResolver> { | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user