Merge pull request #1333 from srijs/feat/client-connector-executor
feat(client): allow custom executors for HttpConnector
This commit is contained in:
		| @@ -22,7 +22,7 @@ include = [ | |||||||
| base64 = "0.6" | base64 = "0.6" | ||||||
| bytes = "0.4.4" | bytes = "0.4.4" | ||||||
| futures = "0.1.14" | futures = "0.1.14" | ||||||
| futures-cpupool = "0.1" | futures-cpupool = "0.1.6" | ||||||
| http = { version = "0.1", optional = true } | http = { version = "0.1", optional = true } | ||||||
| httparse = "1.0" | httparse = "1.0" | ||||||
| language-tags = "0.2" | language-tags = "0.2" | ||||||
|   | |||||||
| @@ -2,9 +2,13 @@ use std::error::Error as StdError; | |||||||
| use std::fmt; | use std::fmt; | ||||||
| use std::io; | use std::io; | ||||||
| use std::mem; | use std::mem; | ||||||
|  | use std::sync::Arc; | ||||||
| //use std::net::SocketAddr; | //use std::net::SocketAddr; | ||||||
|  |  | ||||||
| use futures::{Future, Poll, Async}; | use futures::{Future, Poll, Async}; | ||||||
|  | use futures::future::{Executor, ExecuteError}; | ||||||
|  | use futures::sync::oneshot; | ||||||
|  | use futures_cpupool::{Builder as CpuPoolBuilder}; | ||||||
| use tokio_io::{AsyncRead, AsyncWrite}; | use tokio_io::{AsyncRead, AsyncWrite}; | ||||||
| use tokio::reactor::Handle; | use tokio::reactor::Handle; | ||||||
| use tokio::net::{TcpStream, TcpStreamNew}; | use tokio::net::{TcpStream, TcpStreamNew}; | ||||||
| @@ -43,22 +47,35 @@ where T: Service<Request=Uri, Error=io::Error> + 'static, | |||||||
| /// A connector for the `http` scheme. | /// A connector for the `http` scheme. | ||||||
| #[derive(Clone)] | #[derive(Clone)] | ||||||
| pub struct HttpConnector { | pub struct HttpConnector { | ||||||
|     dns: dns::Dns, |     executor: HttpConnectExecutor, | ||||||
|     enforce_http: bool, |     enforce_http: bool, | ||||||
|     handle: Handle, |     handle: Handle, | ||||||
| } | } | ||||||
|  |  | ||||||
| impl HttpConnector { | impl HttpConnector { | ||||||
|  |  | ||||||
|     /// Construct a new HttpConnector. |     /// Construct a new HttpConnector. | ||||||
|     /// |     /// | ||||||
|     /// Takes number of DNS worker threads. |     /// Takes number of DNS worker threads. | ||||||
|     #[inline] |     #[inline] | ||||||
|     pub fn new(threads: usize, handle: &Handle) -> HttpConnector { |     pub fn new(threads: usize, handle: &Handle) -> HttpConnector { | ||||||
|  |         let pool = CpuPoolBuilder::new() | ||||||
|  |             .name_prefix("hyper-dns") | ||||||
|  |             .pool_size(threads) | ||||||
|  |             .create(); | ||||||
|  |         HttpConnector::new_with_executor(pool, handle) | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     /// Construct a new HttpConnector. | ||||||
|  |     /// | ||||||
|  |     /// Takes an executor to run blocking tasks on. | ||||||
|  |     #[inline] | ||||||
|  |     pub fn new_with_executor<E: 'static>(executor: E, handle: &Handle) -> HttpConnector | ||||||
|  |         where E: Executor<HttpConnectorBlockingTask> | ||||||
|  |     { | ||||||
|         HttpConnector { |         HttpConnector { | ||||||
|             dns: dns::Dns::new(threads), |             executor: HttpConnectExecutor(Arc::new(executor)), | ||||||
|             enforce_http: true, |             enforce_http: true, | ||||||
|             handle: handle.clone(), |             handle: handle.clone() | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
|  |  | ||||||
| @@ -109,7 +126,7 @@ impl Service for HttpConnector { | |||||||
|         }; |         }; | ||||||
|  |  | ||||||
|         HttpConnecting { |         HttpConnecting { | ||||||
|             state: State::Lazy(self.dns.clone(), host.into(), port), |             state: State::Lazy(self.executor.clone(), host.into(), port), | ||||||
|             handle: self.handle.clone(), |             handle: self.handle.clone(), | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
| @@ -154,8 +171,8 @@ pub struct HttpConnecting { | |||||||
| } | } | ||||||
|  |  | ||||||
| enum State { | enum State { | ||||||
|     Lazy(dns::Dns, String, u16), |     Lazy(HttpConnectExecutor, String, u16), | ||||||
|     Resolving(dns::Query), |     Resolving(oneshot::SpawnHandle<dns::IpAddrs, io::Error>), | ||||||
|     Connecting(ConnectingTcp), |     Connecting(ConnectingTcp), | ||||||
|     Error(Option<io::Error>), |     Error(Option<io::Error>), | ||||||
| } | } | ||||||
| @@ -168,12 +185,13 @@ impl Future for HttpConnecting { | |||||||
|         loop { |         loop { | ||||||
|             let state; |             let state; | ||||||
|             match self.state { |             match self.state { | ||||||
|                 State::Lazy(ref dns, ref mut host, port) => { |                 State::Lazy(ref executor, ref mut host, port) => { | ||||||
|                     let host = mem::replace(host, String::new()); |                     let host = mem::replace(host, String::new()); | ||||||
|                     state = State::Resolving(dns.resolve(host, port)); |                     let work = dns::Work::new(host, port); | ||||||
|  |                     state = State::Resolving(oneshot::spawn(work, executor)); | ||||||
|                 }, |                 }, | ||||||
|                 State::Resolving(ref mut query) => { |                 State::Resolving(ref mut future) => { | ||||||
|                     match try!(query.poll()) { |                     match try!(future.poll()) { | ||||||
|                         Async::NotReady => return Ok(Async::NotReady), |                         Async::NotReady => return Ok(Async::NotReady), | ||||||
|                         Async::Ready(addrs) => { |                         Async::Ready(addrs) => { | ||||||
|                             state = State::Connecting(ConnectingTcp { |                             state = State::Connecting(ConnectingTcp { | ||||||
| @@ -231,6 +249,36 @@ impl ConnectingTcp { | |||||||
|     } |     } | ||||||
| } | } | ||||||
|  |  | ||||||
|  | /// Blocking task to be executed on a thread pool. | ||||||
|  | pub struct HttpConnectorBlockingTask { | ||||||
|  |     work: oneshot::Execute<dns::Work> | ||||||
|  | } | ||||||
|  |  | ||||||
|  | impl fmt::Debug for HttpConnectorBlockingTask { | ||||||
|  |     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { | ||||||
|  |         f.pad("HttpConnectorBlockingTask") | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
|  | impl Future for HttpConnectorBlockingTask { | ||||||
|  |     type Item = (); | ||||||
|  |     type Error = (); | ||||||
|  |  | ||||||
|  |     fn poll(&mut self) -> Poll<(), ()> { | ||||||
|  |         self.work.poll() | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
|  | #[derive(Clone)] | ||||||
|  | struct HttpConnectExecutor(Arc<Executor<HttpConnectorBlockingTask>>); | ||||||
|  |  | ||||||
|  | impl Executor<oneshot::Execute<dns::Work>> for HttpConnectExecutor { | ||||||
|  |     fn execute(&self, future: oneshot::Execute<dns::Work>) -> Result<(), ExecuteError<oneshot::Execute<dns::Work>>> { | ||||||
|  |         self.0.execute(HttpConnectorBlockingTask { work: future }) | ||||||
|  |             .map_err(|err| ExecuteError::new(err.kind(), err.into_future().work)) | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
| /* | /* | ||||||
| impl<S: SslClient> HttpsConnector<S> { | impl<S: SslClient> HttpsConnector<S> { | ||||||
|     /// Create a new connector using the provided SSL implementation. |     /// Create a new connector using the provided SSL implementation. | ||||||
|   | |||||||
| @@ -2,37 +2,27 @@ use std::io; | |||||||
| use std::net::{SocketAddr, ToSocketAddrs}; | use std::net::{SocketAddr, ToSocketAddrs}; | ||||||
| use std::vec; | use std::vec; | ||||||
|  |  | ||||||
| use ::futures::{Future, Poll}; | use ::futures::{Async, Future, Poll}; | ||||||
| use ::futures_cpupool::{CpuPool, CpuFuture, Builder}; |  | ||||||
|  |  | ||||||
| #[derive(Clone)] | pub struct Work { | ||||||
| pub struct Dns { |     host: String, | ||||||
|     pool: CpuPool, |     port: u16 | ||||||
| } | } | ||||||
|  |  | ||||||
| impl Dns { | impl Work { | ||||||
|     pub fn new(threads: usize) -> Dns { |     pub fn new(host: String, port: u16) -> Work { | ||||||
|         Dns { |         Work { host: host, port: port } | ||||||
|             pool: Builder::new() |  | ||||||
|                 .name_prefix("hyper-dns") |  | ||||||
|                 .pool_size(threads) |  | ||||||
|                 .create() |  | ||||||
|         } |  | ||||||
|     } |  | ||||||
|  |  | ||||||
|     pub fn resolve(&self, host: String, port: u16) -> Query { |  | ||||||
|         Query(self.pool.spawn_fn(move || work(host, port))) |  | ||||||
|     } |     } | ||||||
| } | } | ||||||
|  |  | ||||||
| pub struct Query(CpuFuture<IpAddrs, io::Error>); | impl Future for Work { | ||||||
|  |  | ||||||
| impl Future for Query { |  | ||||||
|     type Item = IpAddrs; |     type Item = IpAddrs; | ||||||
|     type Error = io::Error; |     type Error = io::Error; | ||||||
|  |  | ||||||
|     fn poll(&mut self) -> Poll<Self::Item, Self::Error> { |     fn poll(&mut self) -> Poll<Self::Item, Self::Error> { | ||||||
|         self.0.poll() |         debug!("resolve host={:?}, port={:?}", self.host, self.port); | ||||||
|  |         (&*self.host, self.port).to_socket_addrs() | ||||||
|  |             .map(|i| Async::Ready(IpAddrs { iter: i })) | ||||||
|     } |     } | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -47,10 +37,3 @@ impl Iterator for IpAddrs { | |||||||
|         self.iter.next() |         self.iter.next() | ||||||
|     } |     } | ||||||
| } | } | ||||||
|  |  | ||||||
| pub type Answer = io::Result<IpAddrs>; |  | ||||||
|  |  | ||||||
| fn work(hostname: String, port: u16) -> Answer { |  | ||||||
|     debug!("resolve host={:?}, port={:?}", hostname, port); |  | ||||||
|     (&*hostname, port).to_socket_addrs().map(|i| IpAddrs { iter: i }) |  | ||||||
| } |  | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user