fix(http1): force always-ready connections to yield after a few spins

This commit is contained in:
Sean McArthur
2019-06-14 11:33:31 -07:00
parent 50198851a2
commit 8316f96d80
3 changed files with 81 additions and 14 deletions

View File

@@ -4,8 +4,10 @@ pub(crate) mod exec;
pub(crate) mod io; pub(crate) mod io;
mod lazy; mod lazy;
mod never; mod never;
pub(crate) mod task;
pub(crate) use self::buf::StaticBuf; pub(crate) use self::buf::StaticBuf;
pub(crate) use self::exec::Exec; pub(crate) use self::exec::Exec;
pub(crate) use self::lazy::{lazy, Started as Lazy}; pub(crate) use self::lazy::{lazy, Started as Lazy};
pub use self::never::Never; pub use self::never::Never;
pub(crate) use self::task::YieldNow;

40
src/common/task.rs Normal file
View File

@@ -0,0 +1,40 @@
use futures::{Async, Poll, task::Task};
use super::Never;
/// A type to help "yield" a future, such that it is re-scheduled immediately.
///
/// Useful for spin counts, so a future doesn't hog too much time.
#[derive(Debug)]
pub(crate) struct YieldNow {
cached_task: Option<Task>,
}
impl YieldNow {
pub(crate) fn new() -> YieldNow {
YieldNow {
cached_task: None,
}
}
/// Returns `Ok(Async::NotReady)` always, while also notifying the
/// current task so that it is rescheduled immediately.
///
/// Since it never returns `Async::Ready` or `Err`, those types are
/// set to `Never`.
pub(crate) fn poll_yield(&mut self) -> Poll<Never, Never> {
// Check for a cached `Task` first...
if let Some(ref t) = self.cached_task {
if t.will_notify_current() {
t.notify();
return Ok(Async::NotReady);
}
}
// No cached task, or not current, so get a new one...
let t = ::futures::task::current();
t.notify();
self.cached_task = Some(t);
Ok(Async::NotReady)
}
}

View File

@@ -7,7 +7,7 @@ use tokio_io::{AsyncRead, AsyncWrite};
use body::{Body, Payload}; use body::{Body, Payload};
use body::internal::FullDataArg; use body::internal::FullDataArg;
use common::Never; use common::{Never, YieldNow};
use proto::{BodyLength, DecodedLength, Conn, Dispatched, MessageHead, RequestHead, RequestLine, ResponseHead}; use proto::{BodyLength, DecodedLength, Conn, Dispatched, MessageHead, RequestHead, RequestLine, ResponseHead};
use super::Http1Transaction; use super::Http1Transaction;
use service::Service; use service::Service;
@@ -18,6 +18,10 @@ pub(crate) struct Dispatcher<D, Bs: Payload, I, T> {
body_tx: Option<::body::Sender>, body_tx: Option<::body::Sender>,
body_rx: Option<Bs>, body_rx: Option<Bs>,
is_closing: bool, is_closing: bool,
/// If the poll loop reaches its max spin count, it will yield by notifying
/// the task immediately. This will cache that `Task`, since it usually is
/// the same one.
yield_now: YieldNow,
} }
pub(crate) trait Dispatch { pub(crate) trait Dispatch {
@@ -58,6 +62,7 @@ where
body_tx: None, body_tx: None,
body_rx: None, body_rx: None,
is_closing: false, is_closing: false,
yield_now: YieldNow::new(),
} }
} }
@@ -98,7 +103,29 @@ where
fn poll_inner(&mut self, should_shutdown: bool) -> Poll<Dispatched, ::Error> { fn poll_inner(&mut self, should_shutdown: bool) -> Poll<Dispatched, ::Error> {
T::update_date(); T::update_date();
loop { try_ready!(self.poll_loop());
if self.is_done() {
if let Some(pending) = self.conn.pending_upgrade() {
self.conn.take_error()?;
return Ok(Async::Ready(Dispatched::Upgrade(pending)));
} else if should_shutdown {
try_ready!(self.conn.shutdown().map_err(::Error::new_shutdown));
}
self.conn.take_error()?;
Ok(Async::Ready(Dispatched::Shutdown))
} else {
Ok(Async::NotReady)
}
}
fn poll_loop(&mut self) -> Poll<(), ::Error> {
// Limit the looping on this connection, in case it is ready far too
// often, so that other futures don't starve.
//
// 16 was chosen arbitrarily, as that is number of pipelined requests
// benchmarks often use. Perhaps it should be a config option instead.
for _ in 0..16 {
self.poll_read()?; self.poll_read()?;
self.poll_write()?; self.poll_write()?;
self.poll_flush()?; self.poll_flush()?;
@@ -112,21 +139,19 @@ where
// Using this instead of task::current() and notify() inside // Using this instead of task::current() and notify() inside
// the Conn is noticeably faster in pipelined benchmarks. // the Conn is noticeably faster in pipelined benchmarks.
if !self.conn.wants_read_again() { if !self.conn.wants_read_again() {
break; //break;
return Ok(Async::Ready(()));
} }
} }
if self.is_done() { trace!("poll_loop yielding (self = {:p})", self);
if let Some(pending) = self.conn.pending_upgrade() {
self.conn.take_error()?; match self.yield_now.poll_yield() {
return Ok(Async::Ready(Dispatched::Upgrade(pending))); Ok(Async::NotReady) => Ok(Async::NotReady),
} else if should_shutdown { // maybe with `!` this can be cleaner...
try_ready!(self.conn.shutdown().map_err(::Error::new_shutdown)); // but for now, just doing this to eliminate branches
} Ok(Async::Ready(never)) |
self.conn.take_error()?; Err(never) => match never {}
Ok(Async::Ready(Dispatched::Shutdown))
} else {
Ok(Async::NotReady)
} }
} }