perf(server): reduce task system wake up in new dispatcher

This commit is contained in:
Sean McArthur
2018-05-05 12:19:08 -07:00
parent a3be110a55
commit fdcd2a4b17
2 changed files with 31 additions and 39 deletions

View File

@@ -4,7 +4,6 @@ use std::marker::PhantomData;
use bytes::{Buf, Bytes};
use futures::{Async, AsyncSink, Poll, StartSend};
use futures::task::Task;
use http::{Method, Version};
use tokio_io::{AsyncRead, AsyncWrite};
@@ -40,7 +39,7 @@ where I: AsyncRead + AsyncWrite,
keep_alive: KA::Busy,
method: None,
title_case_headers: false,
read_task: None,
notify_read: false,
reading: Reading::Init,
writing: Writing::Init,
// We assume a modern world where the remote speaks HTTP/1.1.
@@ -238,9 +237,7 @@ where I: AsyncRead + AsyncWrite,
trace!("read_keep_alive; is_mid_message={}", self.is_mid_message());
if self.is_mid_message() {
self.maybe_park_read();
} else {
if !self.is_mid_message() {
self.require_empty_read().map_err(::Error::new_io)?;
}
Ok(())
@@ -253,20 +250,11 @@ where I: AsyncRead + AsyncWrite,
}
}
fn maybe_park_read(&mut self) {
if !self.io.is_read_blocked() {
// the Io object is ready to read, which means it will never alert
// us that it is ready until we drain it. However, we're currently
// finished reading, so we need to park the task to be able to
// wake back up later when more reading should happen.
let park = self.state.read_task.as_ref()
.map(|t| !t.will_notify_current())
.unwrap_or(true);
if park {
trace!("parking current task");
self.state.read_task = Some(::futures::task::current());
}
}
pub fn wants_read_again(&mut self) -> bool {
let ret = self.state.notify_read;
self.state.notify_read = false;
trace!("wants_read_again? {}", ret);
ret
}
// This will check to make sure the io object read is empty.
@@ -340,17 +328,14 @@ where I: AsyncRead + AsyncWrite,
// exhausted the underlying Io. We would have done this when we
// determined we couldn't keep reading until we knew how writing
// would finish.
//
// When writing finishes, we need to wake the task up in case there
// is more reading that can be done, to start a new message.
let wants_read = match self.state.reading {
match self.state.reading {
Reading::Body(..) |
Reading::KeepAlive => return,
Reading::Init => true,
Reading::Closed => false,
Reading::KeepAlive |
Reading::Closed => return,
Reading::Init => (),
};
match self.state.writing {
@@ -361,7 +346,7 @@ where I: AsyncRead + AsyncWrite,
}
if !self.io.is_read_blocked() {
if wants_read && self.io.read_buf().is_empty() {
if self.io.read_buf().is_empty() {
match self.io.read_from_io() {
Ok(Async::Ready(_)) => (),
Ok(Async::NotReady) => {
@@ -374,12 +359,7 @@ where I: AsyncRead + AsyncWrite,
}
}
}
if let Some(ref task) = self.state.read_task {
trace!("maybe_notify; notifying task");
task.notify();
} else {
trace!("maybe_notify; no task to notify");
}
self.state.notify_read = true;
}
}
@@ -615,7 +595,7 @@ struct State {
keep_alive: KA,
method: Option<Method>,
title_case_headers: bool,
read_task: Option<Task>,
notify_read: bool,
reading: Reading,
writing: Writing,
version: Version,
@@ -645,7 +625,6 @@ impl fmt::Debug for State {
.field("error", &self.error)
//.field("method", &self.method)
//.field("title_case_headers", &self.title_case_headers)
.field("read_task", &self.read_task)
.finish()
}
}
@@ -713,7 +692,6 @@ impl State {
fn close_read(&mut self) {
trace!("State::close_read()");
self.reading = Reading::Closed;
self.read_task = None;
self.keep_alive.disable();
}

View File

@@ -88,10 +88,24 @@ where
}
fn poll_inner(&mut self, should_shutdown: bool) -> Poll<(), ::Error> {
loop {
self.poll_read()?;
self.poll_write()?;
self.poll_flush()?;
// This could happen if reading paused before blocking on IO,
// such as getting to the end of a framed message, but then
// writing/flushing set the state back to Init. In that case,
// if the read buffer still had bytes, we'd want to try poll_read
// again, or else we wouldn't ever be woken up again.
//
// Using this instead of task::current() and notify() inside
// the Conn is noticeably faster in pipelined benchmarks.
if !self.conn.wants_read_again() {
break;
}
}
if self.is_done() {
if should_shutdown {
try_ready!(self.conn.shutdown().map_err(::Error::new_shutdown));