From fdcd2a4b17326caebf72dd682c164348451ac9a1 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Sat, 5 May 2018 12:19:08 -0700 Subject: [PATCH] perf(server): reduce task system wake up in new dispatcher --- src/proto/h1/conn.rs | 50 +++++++++++----------------------------- src/proto/h1/dispatch.rs | 20 +++++++++++++--- 2 files changed, 31 insertions(+), 39 deletions(-) diff --git a/src/proto/h1/conn.rs b/src/proto/h1/conn.rs index bb11ae41..228e18b9 100644 --- a/src/proto/h1/conn.rs +++ b/src/proto/h1/conn.rs @@ -4,7 +4,6 @@ use std::marker::PhantomData; use bytes::{Buf, Bytes}; use futures::{Async, AsyncSink, Poll, StartSend}; -use futures::task::Task; use http::{Method, Version}; use tokio_io::{AsyncRead, AsyncWrite}; @@ -40,7 +39,7 @@ where I: AsyncRead + AsyncWrite, keep_alive: KA::Busy, method: None, title_case_headers: false, - read_task: None, + notify_read: false, reading: Reading::Init, writing: Writing::Init, // We assume a modern world where the remote speaks HTTP/1.1. @@ -238,9 +237,7 @@ where I: AsyncRead + AsyncWrite, trace!("read_keep_alive; is_mid_message={}", self.is_mid_message()); - if self.is_mid_message() { - self.maybe_park_read(); - } else { + if !self.is_mid_message() { self.require_empty_read().map_err(::Error::new_io)?; } Ok(()) @@ -253,20 +250,11 @@ where I: AsyncRead + AsyncWrite, } } - fn maybe_park_read(&mut self) { - if !self.io.is_read_blocked() { - // the Io object is ready to read, which means it will never alert - // us that it is ready until we drain it. However, we're currently - // finished reading, so we need to park the task to be able to - // wake back up later when more reading should happen. - let park = self.state.read_task.as_ref() - .map(|t| !t.will_notify_current()) - .unwrap_or(true); - if park { - trace!("parking current task"); - self.state.read_task = Some(::futures::task::current()); - } - } + pub fn wants_read_again(&mut self) -> bool { + let ret = self.state.notify_read; + self.state.notify_read = false; + trace!("wants_read_again? {}", ret); + ret } // This will check to make sure the io object read is empty. @@ -340,17 +328,14 @@ where I: AsyncRead + AsyncWrite, // exhausted the underlying Io. We would have done this when we // determined we couldn't keep reading until we knew how writing // would finish. - // - // When writing finishes, we need to wake the task up in case there - // is more reading that can be done, to start a new message. - let wants_read = match self.state.reading { + match self.state.reading { Reading::Body(..) | - Reading::KeepAlive => return, - Reading::Init => true, - Reading::Closed => false, + Reading::KeepAlive | + Reading::Closed => return, + Reading::Init => (), }; match self.state.writing { @@ -361,7 +346,7 @@ where I: AsyncRead + AsyncWrite, } if !self.io.is_read_blocked() { - if wants_read && self.io.read_buf().is_empty() { + if self.io.read_buf().is_empty() { match self.io.read_from_io() { Ok(Async::Ready(_)) => (), Ok(Async::NotReady) => { @@ -374,12 +359,7 @@ where I: AsyncRead + AsyncWrite, } } } - if let Some(ref task) = self.state.read_task { - trace!("maybe_notify; notifying task"); - task.notify(); - } else { - trace!("maybe_notify; no task to notify"); - } + self.state.notify_read = true; } } @@ -615,7 +595,7 @@ struct State { keep_alive: KA, method: Option, title_case_headers: bool, - read_task: Option, + notify_read: bool, reading: Reading, writing: Writing, version: Version, @@ -645,7 +625,6 @@ impl fmt::Debug for State { .field("error", &self.error) //.field("method", &self.method) //.field("title_case_headers", &self.title_case_headers) - .field("read_task", &self.read_task) .finish() } } @@ -713,7 +692,6 @@ impl State { fn close_read(&mut self) { trace!("State::close_read()"); self.reading = Reading::Closed; - self.read_task = None; self.keep_alive.disable(); } diff --git a/src/proto/h1/dispatch.rs b/src/proto/h1/dispatch.rs index c0e8a9f3..9bc9e1bc 100644 --- a/src/proto/h1/dispatch.rs +++ b/src/proto/h1/dispatch.rs @@ -88,9 +88,23 @@ where } fn poll_inner(&mut self, should_shutdown: bool) -> Poll<(), ::Error> { - self.poll_read()?; - self.poll_write()?; - self.poll_flush()?; + loop { + self.poll_read()?; + self.poll_write()?; + self.poll_flush()?; + + // This could happen if reading paused before blocking on IO, + // such as getting to the end of a framed message, but then + // writing/flushing set the state back to Init. In that case, + // if the read buffer still had bytes, we'd want to try poll_read + // again, or else we wouldn't ever be woken up again. + // + // Using this instead of task::current() and notify() inside + // the Conn is noticeably faster in pipelined benchmarks. + if !self.conn.wants_read_again() { + break; + } + } if self.is_done() { if should_shutdown {