Re-enable trust-dns optional feature (#787)
This commit is contained in:
		
							
								
								
									
										119
									
								
								src/dns.rs
									
									
									
									
									
								
							
							
						
						
									
										119
									
								
								src/dns.rs
									
									
									
									
									
								
							| @@ -1,76 +1,91 @@ | ||||
| use std::future::Future; | ||||
| use std::net::IpAddr; | ||||
| use std::sync::{Arc, Mutex, Once}; | ||||
| use std::{io, vec}; | ||||
| use std::pin::Pin; | ||||
| use std::sync::Arc; | ||||
| use std::task::{self, Poll}; | ||||
| use std::io; | ||||
|  | ||||
| use futures_util::future; | ||||
| use hyper::client::connect::dns as hyper_dns; | ||||
| use tokio; | ||||
| use trust_dns_resolver::{system_conf, AsyncResolver, BackgroundLookupIp}; | ||||
| use hyper::service::Service; | ||||
| use tokio::sync::Mutex; | ||||
| use trust_dns_resolver::{ | ||||
|     config::{ResolverConfig, ResolverOpts}, | ||||
|     lookup_ip::LookupIpIntoIter, | ||||
|     system_conf, AsyncResolver, TokioConnection, TokioConnectionProvider, | ||||
| }; | ||||
|  | ||||
| // If instead the type were just `AsyncResolver`, it breaks the default recursion limit | ||||
| // for the compiler to determine if `reqwest::Client` is `Send`. This is because `AsyncResolver` | ||||
| // has **a lot** of internal generic types that pushes us over the limit. | ||||
| // | ||||
| // "Erasing" the internal resolver type saves us from this limit. | ||||
| type ErasedResolver = Box<dyn Fn(hyper_dns::Name) -> BackgroundLookupIp + Send + Sync>; | ||||
| type Background = Box<dyn Future<Item = (), Error = ()> + Send>; | ||||
| use crate::error::BoxError; | ||||
|  | ||||
| type SharedResolver = Arc<AsyncResolver<TokioConnection, TokioConnectionProvider>>; | ||||
|  | ||||
| lazy_static! { | ||||
|     static ref SYSTEM_CONF: io::Result<(ResolverConfig, ResolverOpts)> = system_conf::read_system_conf(); | ||||
| } | ||||
|  | ||||
| #[derive(Clone)] | ||||
| pub(crate) struct TrustDnsResolver { | ||||
|     inner: Arc<Inner>, | ||||
|     state: Arc<Mutex<State>>, | ||||
| } | ||||
|  | ||||
| struct Inner { | ||||
|     background: Mutex<Option<Background>>, | ||||
|     once: Once, | ||||
|     resolver: ErasedResolver, | ||||
| enum State { | ||||
|     Init, | ||||
|     Ready(SharedResolver), | ||||
| } | ||||
|  | ||||
| impl TrustDnsResolver { | ||||
|     pub(crate) fn new() -> io::Result<Self> { | ||||
|         let (conf, opts) = system_conf::read_system_conf()?; | ||||
|         let (resolver, bg) = AsyncResolver::new(conf, opts); | ||||
|  | ||||
|         let resolver: ErasedResolver = Box::new(move |name| resolver.lookup_ip(name.as_str())); | ||||
|         let background = Mutex::new(Some(Box::new(bg) as Background)); | ||||
|         let once = Once::new(); | ||||
|         SYSTEM_CONF.as_ref().map_err(|e| { | ||||
|             io::Error::new(e.kind(), format!("error reading DNS system conf: {}", e)) | ||||
|         })?; | ||||
|  | ||||
|         // At this stage, we might not have been called in the context of a | ||||
|         // Tokio Runtime, so we must delay the actual construction of the | ||||
|         // resolver. | ||||
|         Ok(TrustDnsResolver { | ||||
|             inner: Arc::new(Inner { | ||||
|                 background, | ||||
|                 once, | ||||
|                 resolver, | ||||
|             }), | ||||
|             state: Arc::new(Mutex::new(State::Init)), | ||||
|         }) | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl hyper_dns::Resolve for TrustDnsResolver { | ||||
|     type Addrs = vec::IntoIter<IpAddr>; | ||||
|     type Future = Box<dyn Future<Output = Result<Self::Addrs, io::Error>> + Send>; | ||||
| impl Service<hyper_dns::Name> for TrustDnsResolver { | ||||
|     type Response = LookupIpIntoIter; | ||||
|     type Error = BoxError; | ||||
|     type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>; | ||||
|  | ||||
|     fn resolve(&self, name: hyper_dns::Name) -> Self::Future { | ||||
|         let inner = self.inner.clone(); | ||||
|         Box::new(future::lazy(move || { | ||||
|             inner.once.call_once(|| { | ||||
|                 // The `bg` (background) future needs to be spawned onto an executor, | ||||
|                 // but a `reqwest::Client` may be constructed before an executor is ready. | ||||
|                 // So, the `bg` future cannot be spawned *until* the executor starts to | ||||
|                 // `poll` this future. | ||||
|                 let bg = inner | ||||
|                     .background | ||||
|                     .lock() | ||||
|                     .expect("resolver background lock") | ||||
|                     .take() | ||||
|                     .expect("background only taken once"); | ||||
|     fn poll_ready(&mut self, _: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> { | ||||
|         Poll::Ready(Ok(())) | ||||
|     } | ||||
|  | ||||
|                 tokio::spawn(bg); | ||||
|             }); | ||||
|     fn call(&mut self, name: hyper_dns::Name) -> Self::Future { | ||||
|         let resolver = self.clone(); | ||||
|         Box::pin(async move { | ||||
|             let mut lock = resolver.state.lock().await; | ||||
|  | ||||
|             (inner.resolver)(name) | ||||
|                 .map(|lookup| lookup.iter().collect::<Vec<_>>().into_iter()) | ||||
|                 .map_err(|err| io::Error::new(io::ErrorKind::Other, err.to_string())) | ||||
|         })) | ||||
|             let resolver = match &*lock { | ||||
|                 State::Init => { | ||||
|                     let resolver = new_resolver(tokio::runtime::Handle::current()).await?; | ||||
|                     *lock = State::Ready(resolver.clone()); | ||||
|                     resolver | ||||
|                 }, | ||||
|                 State::Ready(resolver) => resolver.clone(), | ||||
|             }; | ||||
|  | ||||
|             // Don't keep lock once the resolver is constructed, otherwise | ||||
|             // only one lookup could be done at a time. | ||||
|             drop(lock); | ||||
|  | ||||
|             let lookup = resolver.lookup_ip(name.as_str()).await?; | ||||
|             Ok(lookup.into_iter()) | ||||
|         }) | ||||
|     } | ||||
| } | ||||
|  | ||||
| /// Takes a `Handle` argument as an indicator that it must be called from | ||||
| /// within the context of a Tokio runtime. | ||||
| async fn new_resolver(handle: tokio::runtime::Handle) -> Result<SharedResolver, BoxError> { | ||||
|     let (config, opts) = SYSTEM_CONF | ||||
|         .as_ref() | ||||
|         .expect("can't construct TrustDnsResolver if SYSTEM_CONF is error") | ||||
|         .clone(); | ||||
|     let resolver = AsyncResolver::new(config, opts, handle).await?; | ||||
|     Ok(Arc::new(resolver)) | ||||
| } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user