From 3524db9473f3ae0a62146f14ac9759399b4a88a6 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Wed, 10 Jul 2019 15:58:08 -0700 Subject: [PATCH] refactor(client): use a tokio-threadpool for the GaiResolver --- src/client/connect/dns.rs | 156 +++++++++++++++---------------------- src/client/connect/http.rs | 23 ------ 2 files changed, 64 insertions(+), 115 deletions(-) diff --git a/src/client/connect/dns.rs b/src/client/connect/dns.rs index 92a85f8f..cdc08358 100644 --- a/src/client/connect/dns.rs +++ b/src/client/connect/dns.rs @@ -16,12 +16,12 @@ use std::net::{ use std::str::FromStr; use std::sync::Arc; +use futures_util::{FutureExt, StreamExt}; use tokio_executor::TypedExecutor; -use tokio_sync::oneshot; +use tokio_sync::{mpsc, oneshot}; use tokio_threadpool; -use crate::common::{Future, Pin, Poll, Unpin, task}; -use self::sealed::GaiTask; +use crate::common::{Future, Never, Pin, Poll, Unpin, task}; /// Resolve a hostname to a set of IP addresses. pub trait Resolve: Unpin { @@ -42,9 +42,15 @@ pub struct Name { /// A resolver using blocking `getaddrinfo` calls in a threadpool. #[derive(Clone)] pub struct GaiResolver { - executor: GaiExecutor, + tx: tokio_threadpool::Sender, + /// A handle to keep the threadpool alive until all `GaiResolver` clones + /// have been dropped. + _threadpool_keep_alive: ThreadPoolKeepAlive, } +#[derive(Clone)] +struct ThreadPoolKeepAlive(mpsc::Sender); + /// An iterator of IP addresses returned from `getaddrinfo`. pub struct GaiAddrs { inner: IpAddrs, @@ -52,8 +58,8 @@ pub struct GaiAddrs { /// A future to resole a name returned by `GaiResolver`. pub struct GaiFuture { - //rx: oneshot::SpawnHandle, - blocking: GaiBlocking, + rx: oneshot::Receiver>, + _threadpool_keep_alive: ThreadPoolKeepAlive, } impl Name { @@ -96,15 +102,11 @@ pub struct InvalidNameError(()); impl fmt::Display for InvalidNameError { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - self.description().fmt(f) + f.write_str("Not a valid domain name") } } -impl Error for InvalidNameError { - fn description(&self) -> &str { - "Not a valid domain name" - } -} +impl Error for InvalidNameError {} impl GaiResolver { @@ -112,36 +114,39 @@ impl GaiResolver { /// /// Takes number of DNS worker threads. pub fn new(threads: usize) -> Self { - GaiResolver { - executor: GaiExecutor, - } - /* - use tokio_threadpool::Builder; - - let pool = Builder::new() + let pool = tokio_threadpool::Builder::new() .name_prefix("hyper-dns-gai-resolver") // not for CPU tasks, so only spawn workers // in blocking mode .pool_size(1) .max_blocking(threads) .build(); - GaiResolver::new_with_executor(pool) - */ - } - /* - /// Construct a new `GaiResolver` with a shared thread pool executor. - /// - /// Takes an executor to run blocking `getaddrinfo` tasks on. - /*pub */fn new_with_executor(executor: E) -> Self - where - E: TypedExecutor + Send + Sync, - { + let tx = pool.sender().clone(); + + // The pool will start to shutdown once `pool` is dropped, + // so to keep it alive, we spawn a future onto the pool itself + // that will only resolve once all `GaiResolver` requests + // are finished. + let (shutdown_tx, shutdown_rx) = mpsc::channel(1); + + let on_shutdown = shutdown_rx + .into_future() + .map(move |(next, _rx)| { + match next { + Some(never) => match never {}, + None => (), + } + + drop(pool) + }); + tx.spawn(on_shutdown); + GaiResolver { - executor: GaiExecutor(Arc::new(executor)), + tx, + _threadpool_keep_alive: ThreadPoolKeepAlive(shutdown_tx), } } - */ } impl Resolve for GaiResolver { @@ -149,11 +154,14 @@ impl Resolve for GaiResolver { type Future = GaiFuture; fn resolve(&self, name: Name) -> Self::Future { - let blocking = GaiBlocking::new(name.host); - //let rx = oneshot::spawn(blocking, &self.executor); + let (tx, rx) = oneshot::channel(); + self.tx.spawn(GaiBlocking { + host: name.host, + tx: Some(tx), + }); GaiFuture { - //rx, - blocking, + rx, + _threadpool_keep_alive: self._threadpool_keep_alive.clone(), } } } @@ -168,13 +176,11 @@ impl Future for GaiFuture { type Output = Result; fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { - /* - let addrs = try_ready!(self.rx.poll()); - Ok(Async::Ready(GaiAddrs { - inner: addrs, - })) - */ - Poll::Ready(self.blocking.block().map(|addrs| GaiAddrs { inner: addrs })) + Pin::new(&mut self.rx).poll(cx).map(|res| match res { + Ok(Ok(addrs)) => Ok(GaiAddrs { inner: addrs }), + Ok(Err(err)) => Err(err), + Err(_canceled) => unreachable!("GaiResolver threadpool shutdown"), + }) } } @@ -198,27 +204,13 @@ impl fmt::Debug for GaiAddrs { } } -#[derive(Clone)] -struct GaiExecutor/*(Arc + Send + Sync>)*/; - -/* -impl Executor> for GaiExecutor { - fn execute(&self, future: oneshot::Execute) -> Result<(), ExecuteError>> { - self.0.execute(GaiTask { work: future }) - .map_err(|err| ExecuteError::new(err.kind(), err.into_future().work)) - } -} -*/ pub(super) struct GaiBlocking { host: String, + tx: Option>>, } impl GaiBlocking { - pub(super) fn new(host: String) -> GaiBlocking { - GaiBlocking { host } - } - fn block(&self) -> io::Result { debug!("resolving host={:?}", self.host); (&*self.host, 0).to_socket_addrs() @@ -227,18 +219,25 @@ impl GaiBlocking { } } -/* impl Future for GaiBlocking { - type Item = IpAddrs; - type Error = io::Error; + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + if self.tx.as_mut().expect("polled after complete").poll_closed(cx).is_ready() { + trace!("resolve future canceled for {:?}", self.host); + return Poll::Ready(()); + } - fn poll(&mut self) -> Poll { debug!("resolving host={:?}", self.host); - (&*self.host, 0).to_socket_addrs() - .map(|i| Async::Ready(IpAddrs { iter: i })) + let res = (&*self.host, 0).to_socket_addrs() + .map(|i| IpAddrs { iter: i }); + + let tx = self.tx.take().expect("polled after complete"); + let _ = tx.send(res); + + Poll::Ready(()) } } -*/ pub(super) struct IpAddrs { iter: vec::IntoIter, @@ -297,33 +296,6 @@ impl Iterator for IpAddrs { } } -// Make this Future unnameable outside of this crate. -pub(super) mod sealed { - use super::*; - // Blocking task to be executed on a thread pool. - pub struct GaiTask { - //pub(super) work: oneshot::Execute - } - - impl fmt::Debug for GaiTask { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.pad("GaiTask") - } - } - - /* - impl Future for GaiTask { - type Item = (); - type Error = (); - - fn poll(&mut self) -> Poll<(), ()> { - self.work.poll() - } - } - */ -} - - /// A resolver using `getaddrinfo` calls via the `tokio_threadpool::blocking` API. /// /// Unlike the `GaiResolver` this will not spawn dedicated threads, but only works when running on the diff --git a/src/client/connect/http.rs b/src/client/connect/http.rs index b7f39511..b936005c 100644 --- a/src/client/connect/http.rs +++ b/src/client/connect/http.rs @@ -81,29 +81,6 @@ impl HttpConnector { pub fn new(threads: usize) -> HttpConnector { HttpConnector::new_with_resolver(GaiResolver::new(threads)) } - - #[doc(hidden)] - #[deprecated(note = "Use HttpConnector::set_reactor to set a reactor handle")] - pub fn new_with_handle(threads: usize, handle: Handle) -> HttpConnector { - let resolver = GaiResolver::new(threads); - let mut http = HttpConnector::new_with_resolver(resolver); - http.set_reactor(Some(handle)); - http - } - - /* - /// Construct a new HttpConnector. - /// - /// Takes an executor to run blocking `getaddrinfo` tasks on. - pub fn new_with_executor(executor: E, handle: Option) -> HttpConnector - where E: Executor + Send + Sync - { - let resolver = GaiResolver::new_with_executor(executor); - let mut http = HttpConnector::new_with_resolver(resolver); - http.set_reactor(handle); - http - } - */ } impl HttpConnector {