Merge pull request #925 from jwilm/fix-spurious-timeouts
fix(http): Connection checks for spurious timeouts
This commit is contained in:
		| @@ -493,8 +493,13 @@ where C: Connect, | ||||
|                         trace!("connected and writable {:?}", seed.0); | ||||
|                         rotor::Response::ok( | ||||
|                             ClientFsm::Socket( | ||||
|                                 http::Conn::new(seed.0, seed.1, Next::write().timeout(scope.connect_timeout), scope.notifier()) | ||||
|                                     .keep_alive(scope.keep_alive) | ||||
|                                 http::Conn::new( | ||||
|                                     seed.0, | ||||
|                                     seed.1, | ||||
|                                     Next::write().timeout(scope.connect_timeout), | ||||
|                                     scope.notifier(), | ||||
|                                     scope.now() | ||||
|                                 ).keep_alive(scope.keep_alive) | ||||
|                             ) | ||||
|                         ) | ||||
|                     } else { | ||||
|   | ||||
| @@ -6,7 +6,7 @@ use std::marker::PhantomData; | ||||
| use std::mem; | ||||
| use std::time::Duration; | ||||
|  | ||||
| use rotor::{self, EventSet, PollOpt, Scope}; | ||||
| use rotor::{self, EventSet, PollOpt, Scope, Time}; | ||||
|  | ||||
| use http::{self, h1, Http1Message, Encoder, Decoder, Next, Next_, Reg, Control}; | ||||
| use http::channel; | ||||
| @@ -176,6 +176,7 @@ impl<K: Key, T: Transport, H: MessageHandler<T>> ConnInner<K, T, H> { | ||||
|                         let next = handler.on_incoming(head, &self.transport); | ||||
|                         trace!("handler.on_incoming() -> {:?}", next); | ||||
|  | ||||
|                         let now = scope.now(); | ||||
|                         match next.interest { | ||||
|                             Next_::Read => self.read(scope, State::Http1(Http1 { | ||||
|                                 handler: handler, | ||||
| @@ -183,6 +184,7 @@ impl<K: Key, T: Transport, H: MessageHandler<T>> ConnInner<K, T, H> { | ||||
|                                 writing: Writing::Init, | ||||
|                                 keep_alive: keep_alive, | ||||
|                                 timeout: next.timeout, | ||||
|                                 timeout_start: Some(now), | ||||
|                                 _marker: PhantomData, | ||||
|                             })), | ||||
|                             Next_::Write => State::Http1(Http1 { | ||||
| @@ -199,6 +201,7 @@ impl<K: Key, T: Transport, H: MessageHandler<T>> ConnInner<K, T, H> { | ||||
|                                 writing: Writing::Head, | ||||
|                                 keep_alive: keep_alive, | ||||
|                                 timeout: next.timeout, | ||||
|                                 timeout_start: Some(now), | ||||
|                                 _marker: PhantomData, | ||||
|                             }), | ||||
|                             Next_::ReadWrite => self.read(scope, State::Http1(Http1 { | ||||
| @@ -207,6 +210,7 @@ impl<K: Key, T: Transport, H: MessageHandler<T>> ConnInner<K, T, H> { | ||||
|                                 writing: Writing::Head, | ||||
|                                 keep_alive: keep_alive, | ||||
|                                 timeout: next.timeout, | ||||
|                                 timeout_start: Some(now), | ||||
|                                 _marker: PhantomData, | ||||
|                             })), | ||||
|                             Next_::Wait => State::Http1(Http1 { | ||||
| @@ -215,6 +219,7 @@ impl<K: Key, T: Transport, H: MessageHandler<T>> ConnInner<K, T, H> { | ||||
|                                 writing: Writing::Init, | ||||
|                                 keep_alive: keep_alive, | ||||
|                                 timeout: next.timeout, | ||||
|                                 timeout_start: Some(now), | ||||
|                                 _marker: PhantomData, | ||||
|                             }), | ||||
|                             Next_::End | | ||||
| @@ -288,7 +293,7 @@ impl<K: Key, T: Transport, H: MessageHandler<T>> ConnInner<K, T, H> { | ||||
|                 }; | ||||
|                 let mut s = State::Http1(http1); | ||||
|                 if let Some(next) = next { | ||||
|                     s.update(next, &**scope); | ||||
|                     s.update(next, &**scope, Some(scope.now())); | ||||
|                 } | ||||
|                 trace!("Conn.on_readable State::Http1 completed, new state = State::{:?}", s); | ||||
|  | ||||
| @@ -354,6 +359,7 @@ impl<K: Key, T: Transport, H: MessageHandler<T>> ConnInner<K, T, H> { | ||||
|                         handler: handler, | ||||
|                         keep_alive: keep_alive, | ||||
|                         timeout: interest.timeout, | ||||
|                         timeout_start: Some(scope.now()), | ||||
|                         _marker: PhantomData, | ||||
|                     }) | ||||
|                 } | ||||
| @@ -447,7 +453,7 @@ impl<K: Key, T: Transport, H: MessageHandler<T>> ConnInner<K, T, H> { | ||||
|         }; | ||||
|  | ||||
|         if let Some(next) = next { | ||||
|             state.update(next, &**scope); | ||||
|             state.update(next, &**scope, Some(scope.now())); | ||||
|         } | ||||
|         state | ||||
|     } | ||||
| @@ -467,7 +473,7 @@ impl<K: Key, T: Transport, H: MessageHandler<T>> ConnInner<K, T, H> { | ||||
|             State::Http1(ref mut http1) => http1.handler.on_error(err), | ||||
|             State::Closed => Next::remove(), | ||||
|         }; | ||||
|         self.state.update(next, factory); | ||||
|         self.state.update(next, factory, None); | ||||
|     } | ||||
|  | ||||
|     fn on_readable<F>(&mut self, scope: &mut Scope<F>) | ||||
| @@ -502,7 +508,13 @@ pub enum ReadyResult<C> { | ||||
| } | ||||
|  | ||||
| impl<K: Key, T: Transport, H: MessageHandler<T>> Conn<K, T, H> { | ||||
|     pub fn new(key: K, transport: T, next: Next, notify: rotor::Notifier) -> Conn<K, T, H> { | ||||
|     pub fn new( | ||||
|         key: K, | ||||
|         transport: T, | ||||
|         next: Next, | ||||
|         notify: rotor::Notifier, | ||||
|         now: Time | ||||
|     ) -> Conn<K, T, H> { | ||||
|         Conn(Box::new(ConnInner { | ||||
|             buf: Buffer::new(), | ||||
|             ctrl: channel::new(notify), | ||||
| @@ -511,6 +523,7 @@ impl<K: Key, T: Transport, H: MessageHandler<T>> Conn<K, T, H> { | ||||
|             state: State::Init { | ||||
|                 interest: next.interest, | ||||
|                 timeout: next.timeout, | ||||
|                 timeout_start: Some(now), | ||||
|             }, | ||||
|             transport: transport, | ||||
|         })) | ||||
| @@ -615,7 +628,8 @@ impl<K: Key, T: Transport, H: MessageHandler<T>> Conn<K, T, H> { | ||||
|     where F: MessageHandlerFactory<K, T, Output=H> { | ||||
|         while let Ok(next) = self.0.ctrl.1.try_recv() { | ||||
|             trace!("woke up with {:?}", next); | ||||
|             self.0.state.update(next, &**scope); | ||||
|             let timeout_start = self.0.state.timeout_start(); | ||||
|             self.0.state.update(next, &**scope, timeout_start); | ||||
|         } | ||||
|  | ||||
|         let mut conn = Some(self); | ||||
| @@ -629,8 +643,11 @@ impl<K: Key, T: Transport, H: MessageHandler<T>> Conn<K, T, H> { | ||||
|  | ||||
|     pub fn timeout<F>(mut self, scope: &mut Scope<F>) -> Option<(Self, Option<Duration>)> | ||||
|     where F: MessageHandlerFactory<K, T, Output=H> { | ||||
|         //TODO: check if this was a spurious timeout? | ||||
|         self.0.on_error(::Error::Timeout, &**scope); | ||||
|         // Run error handler if timeout has elapsed | ||||
|         if self.0.state.timeout_elapsed(scope.now()) { | ||||
|             self.0.on_error(::Error::Timeout, &**scope); | ||||
|         } | ||||
|  | ||||
|         let mut conn = Some(self); | ||||
|         loop { | ||||
|             match conn.take().unwrap().ready(EventSet::none(), scope) { | ||||
| @@ -667,6 +684,7 @@ enum State<H: MessageHandler<T>, T: Transport> { | ||||
|     Init { | ||||
|         interest: Next_, | ||||
|         timeout: Option<Duration>, | ||||
|         timeout_start: Option<Time>, | ||||
|     }, | ||||
|     /// Http1 will only ever use a connection to send and receive a single | ||||
|     /// message at a time. Once a H1 status has been determined, we will either | ||||
| @@ -681,6 +699,27 @@ enum State<H: MessageHandler<T>, T: Transport> { | ||||
|     Closed, | ||||
| } | ||||
|  | ||||
| /// Given two rotor::Time and a duration, see if the duration has elapsed. | ||||
| /// | ||||
| /// The rotor::Time type only implements Add<Duration>, doesn't provide an API for comparing | ||||
| /// itself with other rotor::Time, and it doesn't implement arithmetic operations with itself. | ||||
| /// | ||||
| /// `Time` is just a newtype around (u64). Since there's no other way to compare them, we'll just | ||||
| /// use this knowledge to actually do a comparison. | ||||
| fn timeout_elapsed(timeout: Duration, start: Time, now: Time) -> bool { | ||||
|     // type annotation for sanity | ||||
|     let timeout_at: rotor::Time = start + timeout; | ||||
|  | ||||
|     let timeout_at: u64 = unsafe { mem::transmute(timeout_at) }; | ||||
|     let now: u64 = unsafe { mem::transmute(now) }; | ||||
|  | ||||
|     if now >= timeout_at { | ||||
|         true | ||||
|     } else { | ||||
|         false | ||||
|     } | ||||
| } | ||||
|  | ||||
|  | ||||
| impl<H: MessageHandler<T>, T: Transport> State<H, T> { | ||||
|     fn timeout(&self) -> Option<Duration> { | ||||
| @@ -690,14 +729,37 @@ impl<H: MessageHandler<T>, T: Transport> State<H, T> { | ||||
|             State::Closed => None, | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     fn timeout_start(&self) -> Option<Time> { | ||||
|         match *self { | ||||
|             State::Init { timeout_start, .. } => timeout_start, | ||||
|             State::Http1(ref http1) => http1.timeout_start, | ||||
|             State::Closed => None, | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     fn timeout_elapsed(&self, now: Time) -> bool { | ||||
|         match *self { | ||||
|             State::Init { timeout, timeout_start, .. } => { | ||||
|                 if let (Some(timeout), Some(start)) = (timeout, timeout_start) { | ||||
|                     timeout_elapsed(timeout, start, now) | ||||
|                 } else { | ||||
|                     false | ||||
|                 } | ||||
|             }, | ||||
|             State::Http1(ref http1) => http1.timeout_elapsed(now), | ||||
|             State::Closed => false, | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl<H: MessageHandler<T>, T: Transport> fmt::Debug for State<H, T> { | ||||
|     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { | ||||
|         match *self { | ||||
|             State::Init { interest, timeout } => f.debug_struct("Init") | ||||
|             State::Init { interest, timeout, timeout_start } => f.debug_struct("Init") | ||||
|                 .field("interest", &interest) | ||||
|                 .field("timeout", &timeout) | ||||
|                 .field("timeout_start", &timeout_start) | ||||
|                 .finish(), | ||||
|             State::Http1(ref h1) => f.debug_tuple("Http1") | ||||
|                 .field(h1) | ||||
| @@ -708,7 +770,7 @@ impl<H: MessageHandler<T>, T: Transport> fmt::Debug for State<H, T> { | ||||
| } | ||||
|  | ||||
| impl<H: MessageHandler<T>, T: Transport> State<H, T> { | ||||
|     fn update<F, K>(&mut self, next: Next, factory: &F) | ||||
|     fn update<F, K>(&mut self, next: Next, factory: &F, timeout_start: Option<Time>) | ||||
|             where F: MessageHandlerFactory<K, T>, | ||||
|                   K: Key | ||||
|         { | ||||
| @@ -723,6 +785,7 @@ impl<H: MessageHandler<T>, T: Transport> State<H, T> { | ||||
|                                  State::Init { | ||||
|                                      interest: e, | ||||
|                                      timeout: timeout, | ||||
|                                      timeout_start: timeout_start, | ||||
|                                  }); | ||||
|                 } | ||||
|                 (State::Http1(mut http1), next_) => { | ||||
| @@ -782,6 +845,7 @@ impl<H: MessageHandler<T>, T: Transport> State<H, T> { | ||||
|                                                  State::Init { | ||||
|                                                      interest: next.interest, | ||||
|                                                      timeout: next.timeout, | ||||
|                                                      timeout_start: timeout_start, | ||||
|                                                  }); | ||||
|                                     return; | ||||
|                                 } | ||||
| @@ -913,9 +977,20 @@ struct Http1<H, T> { | ||||
|     writing: Writing, | ||||
|     keep_alive: bool, | ||||
|     timeout: Option<Duration>, | ||||
|     timeout_start: Option<Time>, | ||||
|     _marker: PhantomData<T>, | ||||
| } | ||||
|  | ||||
| impl<H, T> Http1<H, T> { | ||||
|     fn timeout_elapsed(&self, now: Time) -> bool { | ||||
|         if let (Some(timeout), Some(start)) = (self.timeout, self.timeout_start) { | ||||
|             timeout_elapsed(timeout, start, now) | ||||
|         } else { | ||||
|             false | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl<H, T> fmt::Debug for Http1<H, T> { | ||||
|     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { | ||||
|         f.debug_struct("Http1") | ||||
|   | ||||
| @@ -226,7 +226,7 @@ where A: Accept, | ||||
|         rotor_try!(scope.register(&seed, EventSet::readable(), PollOpt::level())); | ||||
|         rotor::Response::ok( | ||||
|             ServerFsm::Conn( | ||||
|                 http::Conn::new((), seed, Next::read(), scope.notifier()) | ||||
|                 http::Conn::new((), seed, Next::read(), scope.notifier(), scope.now()) | ||||
|                     .keep_alive(scope.keep_alive) | ||||
|             ) | ||||
|         ) | ||||
|   | ||||
		Reference in New Issue
	
	Block a user