fix(client): handle when DNS resolves after a timeout triggers
Closes #848
This commit is contained in:
@@ -16,7 +16,7 @@ pub trait Connect {
|
||||
/// Type of Transport to create
|
||||
type Output: Transport;
|
||||
/// The key used to determine if an existing socket can be used.
|
||||
type Key: Eq + Hash + Clone;
|
||||
type Key: Eq + Hash + Clone + fmt::Debug;
|
||||
/// Returns the key based off the Url.
|
||||
fn key(&self, &Url) -> Option<Self::Key>;
|
||||
/// Connect to a remote address.
|
||||
@@ -96,10 +96,12 @@ impl Connect for HttpConnector {
|
||||
}
|
||||
|
||||
fn connected(&mut self) -> Option<(Self::Key, io::Result<HttpStream>)> {
|
||||
let (host, addr) = match self.dns.as_ref().expect("dns workers lost").resolved() {
|
||||
let (host, addrs) = match self.dns.as_ref().expect("dns workers lost").resolved() {
|
||||
Ok(res) => res,
|
||||
Err(_) => return None
|
||||
};
|
||||
//TODO: try all addrs
|
||||
let addr = addrs.and_then(|mut addrs| Ok(addrs.next().unwrap()));
|
||||
debug!("Http::resolved <- ({:?}, {:?})", host, addr);
|
||||
if let Entry::Occupied(mut entry) = self.resolving.entry(host) {
|
||||
let resolved = entry.get_mut().remove(0);
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use std::io;
|
||||
use std::net::{IpAddr, ToSocketAddrs};
|
||||
use std::net::{IpAddr, SocketAddr, ToSocketAddrs};
|
||||
use std::thread;
|
||||
use std::vec;
|
||||
|
||||
use ::spmc;
|
||||
|
||||
@@ -11,7 +12,19 @@ pub struct Dns {
|
||||
rx: channel::Receiver<Answer>,
|
||||
}
|
||||
|
||||
pub type Answer = (String, io::Result<IpAddr>);
|
||||
pub type Answer = (String, io::Result<IpAddrs>);
|
||||
|
||||
pub struct IpAddrs {
|
||||
iter: vec::IntoIter<SocketAddr>,
|
||||
}
|
||||
|
||||
impl Iterator for IpAddrs {
|
||||
type Item = IpAddr;
|
||||
#[inline]
|
||||
fn next(&mut self) -> Option<IpAddr> {
|
||||
self.iter.next().map(|addr| addr.ip())
|
||||
}
|
||||
}
|
||||
|
||||
impl Dns {
|
||||
pub fn new(notify: (channel::Sender<Answer>, channel::Receiver<Answer>), threads: usize) -> Dns {
|
||||
@@ -26,7 +39,7 @@ impl Dns {
|
||||
}
|
||||
|
||||
pub fn resolve<T: Into<String>>(&self, hostname: T) {
|
||||
self.tx.send(hostname.into()).expect("Workers all died horribly");
|
||||
self.tx.send(hostname.into()).expect("DNS workers all died unexpectedly");
|
||||
}
|
||||
|
||||
pub fn resolved(&self) -> Result<Answer, channel::TryRecvError> {
|
||||
@@ -41,9 +54,8 @@ fn work(rx: spmc::Receiver<String>, notify: channel::Sender<Answer>) {
|
||||
let notify = worker.notify.as_ref().expect("Worker lost notify");
|
||||
while let Ok(host) = rx.recv() {
|
||||
debug!("resolve {:?}", host);
|
||||
let res = match (&*host, 80).to_socket_addrs().map(|mut i| i.next()) {
|
||||
Ok(Some(addr)) => (host, Ok(addr.ip())),
|
||||
Ok(None) => (host, Err(io::Error::new(io::ErrorKind::Other, "no addresses found"))),
|
||||
let res = match (&*host, 80).to_socket_addrs().map(|i| IpAddrs{ iter: i }) {
|
||||
Ok(addrs) => (host, Ok(addrs)),
|
||||
Err(e) => (host, Err(e))
|
||||
};
|
||||
|
||||
|
||||
@@ -30,7 +30,6 @@ mod response;
|
||||
|
||||
/// A Client to make outgoing HTTP requests.
|
||||
pub struct Client<H> {
|
||||
//handle: Option<thread::JoinHandle<()>>,
|
||||
tx: http::channel::Sender<Notify<H>>,
|
||||
}
|
||||
|
||||
@@ -64,16 +63,6 @@ impl<H> Client<H> {
|
||||
pub fn configure() -> Config<DefaultConnector> {
|
||||
Config::default()
|
||||
}
|
||||
|
||||
/*TODO
|
||||
pub fn http() -> Config<HttpConnector> {
|
||||
|
||||
}
|
||||
|
||||
pub fn https() -> Config<HttpsConnector> {
|
||||
|
||||
}
|
||||
*/
|
||||
}
|
||||
|
||||
impl<H: Handler<<DefaultConnector as Connect>::Output>> Client<H> {
|
||||
@@ -243,14 +232,6 @@ impl<H> fmt::Display for ClientError<H> {
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
impl Drop for Client {
|
||||
fn drop(&mut self) {
|
||||
self.handle.take().map(|handle| handle.join());
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
||||
/// A trait to react to client events that happen for each message.
|
||||
///
|
||||
/// Each event handler returns it's desired `Next` action.
|
||||
@@ -338,15 +319,16 @@ struct Context<K, H> {
|
||||
}
|
||||
|
||||
impl<K: http::Key, H> Context<K, H> {
|
||||
fn pop_queue(&mut self, key: &K) -> Queued<H> {
|
||||
fn pop_queue(&mut self, key: &K) -> Option<Queued<H>> {
|
||||
let mut should_remove = false;
|
||||
let queued = {
|
||||
let mut vec = self.queue.get_mut(key).expect("handler not in queue for key");
|
||||
let queued = vec.remove(0);
|
||||
if vec.is_empty() {
|
||||
should_remove = true;
|
||||
}
|
||||
queued
|
||||
self.queue.get_mut(key).map(|vec| {
|
||||
let queued = vec.remove(0);
|
||||
if vec.is_empty() {
|
||||
should_remove = true;
|
||||
}
|
||||
queued
|
||||
})
|
||||
};
|
||||
if should_remove {
|
||||
self.queue.remove(key);
|
||||
@@ -379,16 +361,22 @@ impl<K: http::Key, H> Context<K, H> {
|
||||
impl<K: http::Key, H: Handler<T>, T: Transport> http::MessageHandlerFactory<K, T> for Context<K, H> {
|
||||
type Output = Message<H, T>;
|
||||
|
||||
fn create(&mut self, seed: http::Seed<K>) -> Self::Output {
|
||||
fn create(&mut self, seed: http::Seed<K>) -> Option<Self::Output> {
|
||||
let key = seed.key();
|
||||
let queued = self.pop_queue(key);
|
||||
let (url, mut handler) = (queued.url, queued.handler);
|
||||
handler.on_control(seed.control());
|
||||
Message {
|
||||
handler: handler,
|
||||
url: Some(url),
|
||||
_marker: PhantomData,
|
||||
}
|
||||
self.pop_queue(key).map(|queued| {
|
||||
let (url, mut handler) = (queued.url, queued.handler);
|
||||
handler.on_control(seed.control());
|
||||
|
||||
Message {
|
||||
handler: handler,
|
||||
url: Some(url),
|
||||
_marker: PhantomData,
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn keep_alive_interest(&self) -> Next {
|
||||
Next::wait()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -402,6 +390,7 @@ where C: Connect,
|
||||
C::Output: Transport,
|
||||
H: Handler<C::Output> {
|
||||
Connector(C, http::channel::Receiver<Notify<H>>),
|
||||
Connecting((C::Key, C::Output)),
|
||||
Socket(http::Conn<C::Key, C::Output, Message<H, C::Output>>)
|
||||
}
|
||||
|
||||
@@ -415,6 +404,7 @@ where
|
||||
|
||||
impl<C, H> rotor::Machine for ClientFsm<C, H>
|
||||
where C: Connect,
|
||||
C::Key: fmt::Debug,
|
||||
C::Output: Transport,
|
||||
H: Handler<C::Output> {
|
||||
type Context = Context<C::Key, H>;
|
||||
@@ -422,24 +412,47 @@ where C: Connect,
|
||||
|
||||
fn create(seed: Self::Seed, scope: &mut Scope<Self::Context>) -> rotor::Response<Self, rotor::Void> {
|
||||
rotor_try!(scope.register(&seed.1, EventSet::writable(), PollOpt::level()));
|
||||
rotor::Response::ok(
|
||||
ClientFsm::Socket(
|
||||
http::Conn::new(seed.0, seed.1, scope.notifier())
|
||||
.keep_alive(scope.keep_alive)
|
||||
)
|
||||
)
|
||||
rotor::Response::ok(ClientFsm::Connecting(seed))
|
||||
}
|
||||
|
||||
fn ready(self, events: EventSet, scope: &mut Scope<Self::Context>) -> rotor::Response<Self, Self::Seed> {
|
||||
match self {
|
||||
ClientFsm::Connector(..) => {
|
||||
unreachable!("Connector can never be ready")
|
||||
},
|
||||
ClientFsm::Socket(conn) => {
|
||||
let res = conn.ready(events, scope);
|
||||
let now = scope.now();
|
||||
scope.conn_response(res, now)
|
||||
},
|
||||
ClientFsm::Connecting(mut seed) => {
|
||||
if events.is_error() || events.is_hup() {
|
||||
if let Some(err) = seed.1.take_socket_error().err() {
|
||||
debug!("error while connecting: {:?}", err);
|
||||
scope.pop_queue(&seed.0).map(move |mut queued| queued.handler.on_error(::Error::Io(err)));
|
||||
rotor::Response::done()
|
||||
} else {
|
||||
trace!("connecting is_error, but no socket error");
|
||||
rotor::Response::ok(ClientFsm::Connecting(seed))
|
||||
}
|
||||
} else if events.is_writable() {
|
||||
if scope.queue.contains_key(&seed.0) {
|
||||
trace!("connected and writable {:?}", seed.0);
|
||||
rotor::Response::ok(
|
||||
ClientFsm::Socket(
|
||||
http::Conn::new(seed.0, seed.1, Next::write().timeout(scope.connect_timeout), scope.notifier())
|
||||
.keep_alive(scope.keep_alive)
|
||||
)
|
||||
)
|
||||
} else {
|
||||
trace!("connected, but queued handler is gone: {:?}", seed.0); // probably took too long connecting
|
||||
rotor::Response::done()
|
||||
}
|
||||
} else {
|
||||
// spurious?
|
||||
rotor::Response::ok(ClientFsm::Connecting(seed))
|
||||
}
|
||||
}
|
||||
ClientFsm::Connector(..) => {
|
||||
unreachable!("Connector can never be ready")
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -477,6 +490,7 @@ where C: Connect,
|
||||
None => rotor::Response::ok(self)
|
||||
}
|
||||
}
|
||||
ClientFsm::Connecting(..) => unreachable!(),
|
||||
ClientFsm::Socket(conn) => {
|
||||
let res = conn.timeout(scope);
|
||||
let now = scope.now();
|
||||
@@ -494,13 +508,15 @@ where C: Connect,
|
||||
let res = conn.wakeup(scope);
|
||||
let now = scope.now();
|
||||
scope.conn_response(res, now)
|
||||
}
|
||||
},
|
||||
ClientFsm::Connecting(..) => unreachable!("connecting sockets should not be woken up")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<C, H> ClientFsm<C, H>
|
||||
where C: Connect,
|
||||
C::Key: fmt::Debug,
|
||||
C::Output: Transport,
|
||||
H: Handler<C::Output> {
|
||||
fn connect(self, scope: &mut rotor::Scope<<Self as rotor::Machine>::Context>) -> rotor::Response<Self, <Self as rotor::Machine>::Seed> {
|
||||
@@ -509,13 +525,12 @@ where C: Connect,
|
||||
if let Some((key, res)) = connector.connected() {
|
||||
match res {
|
||||
Ok(socket) => {
|
||||
trace!("connected");
|
||||
trace!("connecting {:?}", key);
|
||||
return rotor::Response::spawn(ClientFsm::Connector(connector, rx), (key, socket));
|
||||
},
|
||||
Err(e) => {
|
||||
trace!("connected error = {:?}", e);
|
||||
let mut queued = scope.pop_queue(&key);
|
||||
let _ = queued.handler.on_error(::Error::Io(e));
|
||||
trace!("connect error = {:?}", e);
|
||||
scope.pop_queue(&key).map(|mut queued| queued.handler.on_error(::Error::Io(e)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user