diff --git a/src/http/conn.rs b/src/http/conn.rs index a93da59a..21b50394 100644 --- a/src/http/conn.rs +++ b/src/http/conn.rs @@ -4,6 +4,7 @@ use std::marker::PhantomData; use std::time::Instant; use futures::{Poll, Async, AsyncSink, Stream, Sink, StartSend}; +use futures::task::Task; use tokio::io::Io; use tokio_proto::streaming::pipeline::{Frame, Transport}; @@ -39,6 +40,7 @@ where I: Io, state: State { reading: Reading::Init, writing: Writing::Init, + read_task: None, keep_alive: keep_alive, }, _marker: PhantomData, @@ -49,13 +51,6 @@ where I: Io, self.io.parse::() } - fn is_read_ready(&mut self) -> bool { - match self.state.reading { - Reading::Init | - Reading::Body(..) => self.io.poll_read().is_ready(), - Reading::KeepAlive | Reading::Closed => true, - } - } fn is_read_closed(&self) -> bool { self.state.is_read_closed() @@ -91,7 +86,7 @@ where I: Io, self.state.close_read(); self.io.consume_leading_lines(); let was_mid_parse = !self.io.read_buf().is_empty(); - let must_respond_with_error = !self.state.was_idle(); + let must_respond_with_error = !self.state.is_idle(); return if was_mid_parse { debug!("parse error ({}) with bytes: {:?}", e, self.io.read_buf()); Ok(Async::Ready(Some(Frame::Error { error: e }))) @@ -159,6 +154,43 @@ where I: Io, ret } + fn maybe_park_read(&mut self) { + if self.io.poll_read().is_ready() { + // the Io object is ready to read, which means it will never alert + // us that it is ready until we drain it. However, we're currently + // finished reading, so we need to park the task to be able to + // wake back up later when more reading should happen. + self.state.read_task = Some(::futures::task::park()); + } + } + + fn maybe_unpark(&mut self) { + // its possible that we returned NotReady from poll() without having + // exhausted the underlying Io. We would have done this when we + // determined we couldn't keep reading until we knew how writing + // would finish. + // + // When writing finishes, we need to wake the task up in case there + // is more reading that can be done, to start a new message. + match self.state.reading { + Reading::Body(..) => return, + Reading::Init | + Reading::KeepAlive | Reading::Closed => (), + } + + match self.state.writing { + Writing::Body(..) | + Writing::Ending(..) => return, + Writing::Init | + Writing::KeepAlive | + Writing::Closed => (), + } + + if let Some(task) = self.state.read_task.take() { + task.unpark(); + } + } + fn can_write_head(&self) -> bool { match self.state.writing { Writing::Init => true, @@ -290,9 +322,7 @@ where I: Io, try_nb!(self.io.flush()); self.state.try_keep_alive(); trace!("flushed {:?}", self.state); - if self.is_read_ready() { - ::futures::task::park().unpark(); - } + self.maybe_unpark(); Ok(ret) } @@ -309,6 +339,7 @@ where I: Io, fn poll(&mut self) -> Poll, Self::Error> { trace!("Conn::poll()"); + self.state.read_task.take(); if self.is_read_closed() { trace!("Conn::poll when closed"); @@ -326,6 +357,7 @@ where I: Io, }) } else { trace!("poll when on keep-alive"); + self.maybe_park_read(); Ok(Async::NotReady) } } @@ -418,6 +450,7 @@ impl, T, K: fmt::Debug> fmt::Debug for Conn { struct State { reading: Reading, writing: Writing, + read_task: Option, keep_alive: K, } @@ -443,6 +476,7 @@ impl, K: fmt::Debug> fmt::Debug for State { .field("reading", &self.reading) .field("writing", &self.writing) .field("keep_alive", &self.keep_alive) + .field("read_task", &self.read_task) .finish() } } @@ -543,7 +577,7 @@ impl State { } } - fn was_idle(&self) -> bool { + fn is_idle(&self) -> bool { if let KA::Idle(..) = self.keep_alive.status() { true } else { @@ -605,14 +639,14 @@ impl<'a, T: fmt::Debug + 'a, B: AsRef<[u8]> + 'a> fmt::Debug for DebugFrame<'a, #[cfg(test)] mod tests { - use futures::{Async, Stream, Sink}; + use futures::{Async, Future, Stream, Sink}; use tokio_proto::streaming::pipeline::Frame; use http::{self, MessageHead, ServerTransaction}; use http::h1::Encoder; use mock::AsyncIo; - use super::{Conn, Writing}; + use super::{Conn, Reading, Writing}; use ::uri::Uri; use std::str::FromStr; @@ -637,17 +671,20 @@ mod tests { #[test] fn test_conn_parse_partial() { - let good_message = b"GET / HTTP/1.1\r\nHost: foo.bar\r\n\r\n".to_vec(); - let io = AsyncIo::new_buf(good_message, 10); - let mut conn = Conn::<_, http::Chunk, ServerTransaction>::new(io, Default::default()); - assert!(conn.poll().unwrap().is_not_ready()); - conn.io.io_mut().block_in(50); - let async = conn.poll().unwrap(); - assert!(async.is_ready()); - match async { - Async::Ready(Some(Frame::Message { .. })) => (), - f => panic!("frame is not Message: {:?}", f), - } + let _: Result<(), ()> = ::futures::lazy(|| { + let good_message = b"GET / HTTP/1.1\r\nHost: foo.bar\r\n\r\n".to_vec(); + let io = AsyncIo::new_buf(good_message, 10); + let mut conn = Conn::<_, http::Chunk, ServerTransaction>::new(io, Default::default()); + assert!(conn.poll().unwrap().is_not_ready()); + conn.io.io_mut().block_in(50); + let async = conn.poll().unwrap(); + assert!(async.is_ready()); + match async { + Async::Ready(Some(Frame::Message { .. })) => (), + f => panic!("frame is not Message: {:?}", f), + } + Ok(()) + }).wait(); } #[test] @@ -664,9 +701,6 @@ mod tests { #[test] fn test_conn_body_write_length() { - extern crate pretty_env_logger; - use ::futures::Future; - let _ = pretty_env_logger::init(); let _: Result<(), ()> = ::futures::lazy(|| { let io = AsyncIo::new_buf(vec![], 0); let mut conn = Conn::<_, http::Chunk, ServerTransaction>::new(io, Default::default()); @@ -703,7 +737,6 @@ mod tests { #[test] fn test_conn_body_write_chunked() { - use ::futures::Future; let _: Result<(), ()> = ::futures::lazy(|| { let io = AsyncIo::new_buf(vec![], 4096); let mut conn = Conn::<_, http::Chunk, ServerTransaction>::new(io, Default::default()); @@ -714,6 +747,65 @@ mod tests { Ok(()) }).wait(); } + + #[test] + fn test_conn_parking() { + use std::sync::Arc; + use futures::task::Unpark; + + struct Car { + permit: bool, + } + impl Unpark for Car { + fn unpark(&self) { + assert!(self.permit, "unparked without permit"); + } + } + + fn car(permit: bool) -> Arc { + Arc::new(Car { + permit: permit, + }) + } + + // test that once writing is done, unparks + let f = ::futures::lazy(|| { + let io = AsyncIo::new_buf(vec![], 4096); + let mut conn = Conn::<_, http::Chunk, ServerTransaction>::new(io, Default::default()); + conn.state.reading = Reading::KeepAlive; + assert!(conn.poll().unwrap().is_not_ready()); + + conn.state.writing = Writing::KeepAlive; + assert!(conn.poll_complete().unwrap().is_ready()); + Ok::<(), ()>(()) + }); + ::futures::executor::spawn(f).poll_future(car(true)).unwrap(); + + + // test that flushing when not waiting on read doesn't unpark + let f = ::futures::lazy(|| { + let io = AsyncIo::new_buf(vec![], 4096); + let mut conn = Conn::<_, http::Chunk, ServerTransaction>::new(io, Default::default()); + conn.state.writing = Writing::KeepAlive; + assert!(conn.poll_complete().unwrap().is_ready()); + Ok::<(), ()>(()) + }); + ::futures::executor::spawn(f).poll_future(car(false)).unwrap(); + + + // test that flushing and writing isn't done doesn't unpark + let f = ::futures::lazy(|| { + let io = AsyncIo::new_buf(vec![], 4096); + let mut conn = Conn::<_, http::Chunk, ServerTransaction>::new(io, Default::default()); + conn.state.reading = Reading::KeepAlive; + assert!(conn.poll().unwrap().is_not_ready()); + conn.state.writing = Writing::Body(Encoder::length(5_000), None); + assert!(conn.poll_complete().unwrap().is_ready()); + Ok::<(), ()>(()) + }); + ::futures::executor::spawn(f).poll_future(car(false)).unwrap(); + } + #[test] fn test_conn_closed_write() { let io = AsyncIo::new_buf(vec![], 0);