From 934f2c481ba9f04d34ab737927730bd4948408f6 Mon Sep 17 00:00:00 2001 From: Joe Wilm Date: Fri, 7 Oct 2016 17:37:42 -0700 Subject: [PATCH] fix(http): Connection checks for spurious timeouts We've been seeing a strange number of timeouts in our benchmarking. Handling spurious timeouts as in this patch seems to fix it! Note that managing the `timeout_start` needs to be done carefully. If the current time is provided in the wrong place, it's possible requests would never timeout. --- src/client/mod.rs | 9 ++++- src/http/conn.rs | 95 ++++++++++++++++++++++++++++++++++++++++++----- src/server/mod.rs | 2 +- 3 files changed, 93 insertions(+), 13 deletions(-) diff --git a/src/client/mod.rs b/src/client/mod.rs index 1f4a7aa3..b8290497 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -493,8 +493,13 @@ where C: Connect, trace!("connected and writable {:?}", seed.0); rotor::Response::ok( ClientFsm::Socket( - http::Conn::new(seed.0, seed.1, Next::write().timeout(scope.connect_timeout), scope.notifier()) - .keep_alive(scope.keep_alive) + http::Conn::new( + seed.0, + seed.1, + Next::write().timeout(scope.connect_timeout), + scope.notifier(), + scope.now() + ).keep_alive(scope.keep_alive) ) ) } else { diff --git a/src/http/conn.rs b/src/http/conn.rs index b30738c6..cafb04ad 100644 --- a/src/http/conn.rs +++ b/src/http/conn.rs @@ -6,7 +6,7 @@ use std::marker::PhantomData; use std::mem; use std::time::Duration; -use rotor::{self, EventSet, PollOpt, Scope}; +use rotor::{self, EventSet, PollOpt, Scope, Time}; use http::{self, h1, Http1Message, Encoder, Decoder, Next, Next_, Reg, Control}; use http::channel; @@ -176,6 +176,7 @@ impl> ConnInner { let next = handler.on_incoming(head, &self.transport); trace!("handler.on_incoming() -> {:?}", next); + let now = scope.now(); match next.interest { Next_::Read => self.read(scope, State::Http1(Http1 { handler: handler, @@ -183,6 +184,7 @@ impl> ConnInner { writing: Writing::Init, keep_alive: keep_alive, timeout: next.timeout, + timeout_start: Some(now), _marker: PhantomData, })), Next_::Write => State::Http1(Http1 { @@ -199,6 +201,7 @@ impl> ConnInner { writing: Writing::Head, keep_alive: keep_alive, timeout: next.timeout, + timeout_start: Some(now), _marker: PhantomData, }), Next_::ReadWrite => self.read(scope, State::Http1(Http1 { @@ -207,6 +210,7 @@ impl> ConnInner { writing: Writing::Head, keep_alive: keep_alive, timeout: next.timeout, + timeout_start: Some(now), _marker: PhantomData, })), Next_::Wait => State::Http1(Http1 { @@ -215,6 +219,7 @@ impl> ConnInner { writing: Writing::Init, keep_alive: keep_alive, timeout: next.timeout, + timeout_start: Some(now), _marker: PhantomData, }), Next_::End | @@ -288,7 +293,7 @@ impl> ConnInner { }; let mut s = State::Http1(http1); if let Some(next) = next { - s.update(next, &**scope); + s.update(next, &**scope, Some(scope.now())); } trace!("Conn.on_readable State::Http1 completed, new state = State::{:?}", s); @@ -354,6 +359,7 @@ impl> ConnInner { handler: handler, keep_alive: keep_alive, timeout: interest.timeout, + timeout_start: Some(scope.now()), _marker: PhantomData, }) } @@ -447,7 +453,7 @@ impl> ConnInner { }; if let Some(next) = next { - state.update(next, &**scope); + state.update(next, &**scope, Some(scope.now())); } state } @@ -467,7 +473,7 @@ impl> ConnInner { State::Http1(ref mut http1) => http1.handler.on_error(err), State::Closed => Next::remove(), }; - self.state.update(next, factory); + self.state.update(next, factory, None); } fn on_readable(&mut self, scope: &mut Scope) @@ -502,7 +508,13 @@ pub enum ReadyResult { } impl> Conn { - pub fn new(key: K, transport: T, next: Next, notify: rotor::Notifier) -> Conn { + pub fn new( + key: K, + transport: T, + next: Next, + notify: rotor::Notifier, + now: Time + ) -> Conn { Conn(Box::new(ConnInner { buf: Buffer::new(), ctrl: channel::new(notify), @@ -511,6 +523,7 @@ impl> Conn { state: State::Init { interest: next.interest, timeout: next.timeout, + timeout_start: Some(now), }, transport: transport, })) @@ -615,7 +628,8 @@ impl> Conn { where F: MessageHandlerFactory { while let Ok(next) = self.0.ctrl.1.try_recv() { trace!("woke up with {:?}", next); - self.0.state.update(next, &**scope); + let timeout_start = self.0.state.timeout_start(); + self.0.state.update(next, &**scope, timeout_start); } let mut conn = Some(self); @@ -629,8 +643,11 @@ impl> Conn { pub fn timeout(mut self, scope: &mut Scope) -> Option<(Self, Option)> where F: MessageHandlerFactory { - //TODO: check if this was a spurious timeout? - self.0.on_error(::Error::Timeout, &**scope); + // Run error handler if timeout has elapsed + if self.0.state.timeout_elapsed(scope.now()) { + self.0.on_error(::Error::Timeout, &**scope); + } + let mut conn = Some(self); loop { match conn.take().unwrap().ready(EventSet::none(), scope) { @@ -667,6 +684,7 @@ enum State, T: Transport> { Init { interest: Next_, timeout: Option, + timeout_start: Option