Merge pull request #1074 from hyperium/park-less

fix(http): fix busy looping when in keep-alive
This commit is contained in:
Sean McArthur
2017-02-22 17:54:48 -08:00
committed by GitHub

View File

@@ -4,6 +4,7 @@ use std::marker::PhantomData;
use std::time::Instant;
use futures::{Poll, Async, AsyncSink, Stream, Sink, StartSend};
use futures::task::Task;
use tokio::io::Io;
use tokio_proto::streaming::pipeline::{Frame, Transport};
@@ -39,6 +40,7 @@ where I: Io,
state: State {
reading: Reading::Init,
writing: Writing::Init,
read_task: None,
keep_alive: keep_alive,
},
_marker: PhantomData,
@@ -49,13 +51,6 @@ where I: Io,
self.io.parse::<T>()
}
fn is_read_ready(&mut self) -> bool {
match self.state.reading {
Reading::Init |
Reading::Body(..) => self.io.poll_read().is_ready(),
Reading::KeepAlive | Reading::Closed => true,
}
}
fn is_read_closed(&self) -> bool {
self.state.is_read_closed()
@@ -91,7 +86,7 @@ where I: Io,
self.state.close_read();
self.io.consume_leading_lines();
let was_mid_parse = !self.io.read_buf().is_empty();
let must_respond_with_error = !self.state.was_idle();
let must_respond_with_error = !self.state.is_idle();
return if was_mid_parse {
debug!("parse error ({}) with bytes: {:?}", e, self.io.read_buf());
Ok(Async::Ready(Some(Frame::Error { error: e })))
@@ -159,6 +154,43 @@ where I: Io,
ret
}
fn maybe_park_read(&mut self) {
if self.io.poll_read().is_ready() {
// the Io object is ready to read, which means it will never alert
// us that it is ready until we drain it. However, we're currently
// finished reading, so we need to park the task to be able to
// wake back up later when more reading should happen.
self.state.read_task = Some(::futures::task::park());
}
}
fn maybe_unpark(&mut self) {
// its possible that we returned NotReady from poll() without having
// exhausted the underlying Io. We would have done this when we
// determined we couldn't keep reading until we knew how writing
// would finish.
//
// When writing finishes, we need to wake the task up in case there
// is more reading that can be done, to start a new message.
match self.state.reading {
Reading::Body(..) => return,
Reading::Init |
Reading::KeepAlive | Reading::Closed => (),
}
match self.state.writing {
Writing::Body(..) |
Writing::Ending(..) => return,
Writing::Init |
Writing::KeepAlive |
Writing::Closed => (),
}
if let Some(task) = self.state.read_task.take() {
task.unpark();
}
}
fn can_write_head(&self) -> bool {
match self.state.writing {
Writing::Init => true,
@@ -290,9 +322,7 @@ where I: Io,
try_nb!(self.io.flush());
self.state.try_keep_alive();
trace!("flushed {:?}", self.state);
if self.is_read_ready() {
::futures::task::park().unpark();
}
self.maybe_unpark();
Ok(ret)
}
@@ -309,6 +339,7 @@ where I: Io,
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
trace!("Conn::poll()");
self.state.read_task.take();
if self.is_read_closed() {
trace!("Conn::poll when closed");
@@ -326,6 +357,7 @@ where I: Io,
})
} else {
trace!("poll when on keep-alive");
self.maybe_park_read();
Ok(Async::NotReady)
}
}
@@ -418,6 +450,7 @@ impl<I, B: AsRef<[u8]>, T, K: fmt::Debug> fmt::Debug for Conn<I, B, T, K> {
struct State<B, K> {
reading: Reading,
writing: Writing<B>,
read_task: Option<Task>,
keep_alive: K,
}
@@ -443,6 +476,7 @@ impl<B: AsRef<[u8]>, K: fmt::Debug> fmt::Debug for State<B, K> {
.field("reading", &self.reading)
.field("writing", &self.writing)
.field("keep_alive", &self.keep_alive)
.field("read_task", &self.read_task)
.finish()
}
}
@@ -543,7 +577,7 @@ impl<B, K: KeepAlive> State<B, K> {
}
}
fn was_idle(&self) -> bool {
fn is_idle(&self) -> bool {
if let KA::Idle(..) = self.keep_alive.status() {
true
} else {
@@ -605,14 +639,14 @@ impl<'a, T: fmt::Debug + 'a, B: AsRef<[u8]> + 'a> fmt::Debug for DebugFrame<'a,
#[cfg(test)]
mod tests {
use futures::{Async, Stream, Sink};
use futures::{Async, Future, Stream, Sink};
use tokio_proto::streaming::pipeline::Frame;
use http::{self, MessageHead, ServerTransaction};
use http::h1::Encoder;
use mock::AsyncIo;
use super::{Conn, Writing};
use super::{Conn, Reading, Writing};
use ::uri::Uri;
use std::str::FromStr;
@@ -637,17 +671,20 @@ mod tests {
#[test]
fn test_conn_parse_partial() {
let good_message = b"GET / HTTP/1.1\r\nHost: foo.bar\r\n\r\n".to_vec();
let io = AsyncIo::new_buf(good_message, 10);
let mut conn = Conn::<_, http::Chunk, ServerTransaction>::new(io, Default::default());
assert!(conn.poll().unwrap().is_not_ready());
conn.io.io_mut().block_in(50);
let async = conn.poll().unwrap();
assert!(async.is_ready());
match async {
Async::Ready(Some(Frame::Message { .. })) => (),
f => panic!("frame is not Message: {:?}", f),
}
let _: Result<(), ()> = ::futures::lazy(|| {
let good_message = b"GET / HTTP/1.1\r\nHost: foo.bar\r\n\r\n".to_vec();
let io = AsyncIo::new_buf(good_message, 10);
let mut conn = Conn::<_, http::Chunk, ServerTransaction>::new(io, Default::default());
assert!(conn.poll().unwrap().is_not_ready());
conn.io.io_mut().block_in(50);
let async = conn.poll().unwrap();
assert!(async.is_ready());
match async {
Async::Ready(Some(Frame::Message { .. })) => (),
f => panic!("frame is not Message: {:?}", f),
}
Ok(())
}).wait();
}
#[test]
@@ -664,9 +701,6 @@ mod tests {
#[test]
fn test_conn_body_write_length() {
extern crate pretty_env_logger;
use ::futures::Future;
let _ = pretty_env_logger::init();
let _: Result<(), ()> = ::futures::lazy(|| {
let io = AsyncIo::new_buf(vec![], 0);
let mut conn = Conn::<_, http::Chunk, ServerTransaction>::new(io, Default::default());
@@ -703,7 +737,6 @@ mod tests {
#[test]
fn test_conn_body_write_chunked() {
use ::futures::Future;
let _: Result<(), ()> = ::futures::lazy(|| {
let io = AsyncIo::new_buf(vec![], 4096);
let mut conn = Conn::<_, http::Chunk, ServerTransaction>::new(io, Default::default());
@@ -714,6 +747,65 @@ mod tests {
Ok(())
}).wait();
}
#[test]
fn test_conn_parking() {
use std::sync::Arc;
use futures::task::Unpark;
struct Car {
permit: bool,
}
impl Unpark for Car {
fn unpark(&self) {
assert!(self.permit, "unparked without permit");
}
}
fn car(permit: bool) -> Arc<Unpark> {
Arc::new(Car {
permit: permit,
})
}
// test that once writing is done, unparks
let f = ::futures::lazy(|| {
let io = AsyncIo::new_buf(vec![], 4096);
let mut conn = Conn::<_, http::Chunk, ServerTransaction>::new(io, Default::default());
conn.state.reading = Reading::KeepAlive;
assert!(conn.poll().unwrap().is_not_ready());
conn.state.writing = Writing::KeepAlive;
assert!(conn.poll_complete().unwrap().is_ready());
Ok::<(), ()>(())
});
::futures::executor::spawn(f).poll_future(car(true)).unwrap();
// test that flushing when not waiting on read doesn't unpark
let f = ::futures::lazy(|| {
let io = AsyncIo::new_buf(vec![], 4096);
let mut conn = Conn::<_, http::Chunk, ServerTransaction>::new(io, Default::default());
conn.state.writing = Writing::KeepAlive;
assert!(conn.poll_complete().unwrap().is_ready());
Ok::<(), ()>(())
});
::futures::executor::spawn(f).poll_future(car(false)).unwrap();
// test that flushing and writing isn't done doesn't unpark
let f = ::futures::lazy(|| {
let io = AsyncIo::new_buf(vec![], 4096);
let mut conn = Conn::<_, http::Chunk, ServerTransaction>::new(io, Default::default());
conn.state.reading = Reading::KeepAlive;
assert!(conn.poll().unwrap().is_not_ready());
conn.state.writing = Writing::Body(Encoder::length(5_000), None);
assert!(conn.poll_complete().unwrap().is_ready());
Ok::<(), ()>(())
});
::futures::executor::spawn(f).poll_future(car(false)).unwrap();
}
#[test]
fn test_conn_closed_write() {
let io = AsyncIo::new_buf(vec![], 0);