fix(http): fix busy looping when in keep-alive
This commit is contained in:
		
							
								
								
									
										150
									
								
								src/http/conn.rs
									
									
									
									
									
								
							
							
						
						
									
										150
									
								
								src/http/conn.rs
									
									
									
									
									
								
							| @@ -4,6 +4,7 @@ use std::marker::PhantomData; | |||||||
| use std::time::Instant; | use std::time::Instant; | ||||||
|  |  | ||||||
| use futures::{Poll, Async, AsyncSink, Stream, Sink, StartSend}; | use futures::{Poll, Async, AsyncSink, Stream, Sink, StartSend}; | ||||||
|  | use futures::task::Task; | ||||||
| use tokio::io::Io; | use tokio::io::Io; | ||||||
| use tokio_proto::streaming::pipeline::{Frame, Transport}; | use tokio_proto::streaming::pipeline::{Frame, Transport}; | ||||||
|  |  | ||||||
| @@ -39,6 +40,7 @@ where I: Io, | |||||||
|             state: State { |             state: State { | ||||||
|                 reading: Reading::Init, |                 reading: Reading::Init, | ||||||
|                 writing: Writing::Init, |                 writing: Writing::Init, | ||||||
|  |                 read_task: None, | ||||||
|                 keep_alive: keep_alive, |                 keep_alive: keep_alive, | ||||||
|             }, |             }, | ||||||
|             _marker: PhantomData, |             _marker: PhantomData, | ||||||
| @@ -49,13 +51,6 @@ where I: Io, | |||||||
|         self.io.parse::<T>() |         self.io.parse::<T>() | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     fn is_read_ready(&mut self) -> bool { |  | ||||||
|         match self.state.reading { |  | ||||||
|             Reading::Init | |  | ||||||
|             Reading::Body(..) => self.io.poll_read().is_ready(), |  | ||||||
|             Reading::KeepAlive | Reading::Closed => true, |  | ||||||
|         } |  | ||||||
|     } |  | ||||||
|  |  | ||||||
|     fn is_read_closed(&self) -> bool { |     fn is_read_closed(&self) -> bool { | ||||||
|         self.state.is_read_closed() |         self.state.is_read_closed() | ||||||
| @@ -91,7 +86,7 @@ where I: Io, | |||||||
|                 self.state.close_read(); |                 self.state.close_read(); | ||||||
|                 self.io.consume_leading_lines(); |                 self.io.consume_leading_lines(); | ||||||
|                 let was_mid_parse = !self.io.read_buf().is_empty(); |                 let was_mid_parse = !self.io.read_buf().is_empty(); | ||||||
|                 let must_respond_with_error = !self.state.was_idle(); |                 let must_respond_with_error = !self.state.is_idle(); | ||||||
|                 return if was_mid_parse { |                 return if was_mid_parse { | ||||||
|                     debug!("parse error ({}) with bytes: {:?}", e, self.io.read_buf()); |                     debug!("parse error ({}) with bytes: {:?}", e, self.io.read_buf()); | ||||||
|                     Ok(Async::Ready(Some(Frame::Error { error: e }))) |                     Ok(Async::Ready(Some(Frame::Error { error: e }))) | ||||||
| @@ -159,6 +154,43 @@ where I: Io, | |||||||
|         ret |         ret | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  |     fn maybe_park_read(&mut self) { | ||||||
|  |         if self.io.poll_read().is_ready() { | ||||||
|  |             // the Io object is ready to read, which means it will never alert | ||||||
|  |             // us that it is ready until we drain it. However, we're currently | ||||||
|  |             // finished reading, so we need to park the task to be able to | ||||||
|  |             // wake back up later when more reading should happen. | ||||||
|  |             self.state.read_task = Some(::futures::task::park()); | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     fn maybe_unpark(&mut self) { | ||||||
|  |         // its possible that we returned NotReady from poll() without having | ||||||
|  |         // exhausted the underlying Io. We would have done this when we | ||||||
|  |         // determined we couldn't keep reading until we knew how writing | ||||||
|  |         // would finish. | ||||||
|  |         // | ||||||
|  |         // When writing finishes, we need to wake the task up in case there | ||||||
|  |         // is more reading that can be done, to start a new message. | ||||||
|  |         match self.state.reading { | ||||||
|  |             Reading::Body(..) => return, | ||||||
|  |             Reading::Init | | ||||||
|  |             Reading::KeepAlive | Reading::Closed => (), | ||||||
|  |         } | ||||||
|  |  | ||||||
|  |         match self.state.writing { | ||||||
|  |             Writing::Body(..) | | ||||||
|  |             Writing::Ending(..) => return, | ||||||
|  |             Writing::Init | | ||||||
|  |             Writing::KeepAlive | | ||||||
|  |             Writing::Closed => (), | ||||||
|  |         } | ||||||
|  |  | ||||||
|  |         if let Some(task) = self.state.read_task.take() { | ||||||
|  |             task.unpark(); | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  |  | ||||||
|     fn can_write_head(&self) -> bool { |     fn can_write_head(&self) -> bool { | ||||||
|         match self.state.writing { |         match self.state.writing { | ||||||
|             Writing::Init => true, |             Writing::Init => true, | ||||||
| @@ -290,9 +322,7 @@ where I: Io, | |||||||
|         try_nb!(self.io.flush()); |         try_nb!(self.io.flush()); | ||||||
|         self.state.try_keep_alive(); |         self.state.try_keep_alive(); | ||||||
|         trace!("flushed {:?}", self.state); |         trace!("flushed {:?}", self.state); | ||||||
|         if self.is_read_ready() { |         self.maybe_unpark(); | ||||||
|             ::futures::task::park().unpark(); |  | ||||||
|         } |  | ||||||
|         Ok(ret) |         Ok(ret) | ||||||
|  |  | ||||||
|     } |     } | ||||||
| @@ -309,6 +339,7 @@ where I: Io, | |||||||
|  |  | ||||||
|     fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { |     fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { | ||||||
|         trace!("Conn::poll()"); |         trace!("Conn::poll()"); | ||||||
|  |         self.state.read_task.take(); | ||||||
|  |  | ||||||
|         if self.is_read_closed() { |         if self.is_read_closed() { | ||||||
|             trace!("Conn::poll when closed"); |             trace!("Conn::poll when closed"); | ||||||
| @@ -326,6 +357,7 @@ where I: Io, | |||||||
|                 }) |                 }) | ||||||
|         } else { |         } else { | ||||||
|             trace!("poll when on keep-alive"); |             trace!("poll when on keep-alive"); | ||||||
|  |             self.maybe_park_read(); | ||||||
|             Ok(Async::NotReady) |             Ok(Async::NotReady) | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
| @@ -418,6 +450,7 @@ impl<I, B: AsRef<[u8]>, T, K: fmt::Debug> fmt::Debug for Conn<I, B, T, K> { | |||||||
| struct State<B, K> { | struct State<B, K> { | ||||||
|     reading: Reading, |     reading: Reading, | ||||||
|     writing: Writing<B>, |     writing: Writing<B>, | ||||||
|  |     read_task: Option<Task>, | ||||||
|     keep_alive: K, |     keep_alive: K, | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -443,6 +476,7 @@ impl<B: AsRef<[u8]>, K: fmt::Debug> fmt::Debug for State<B, K> { | |||||||
|             .field("reading", &self.reading) |             .field("reading", &self.reading) | ||||||
|             .field("writing", &self.writing) |             .field("writing", &self.writing) | ||||||
|             .field("keep_alive", &self.keep_alive) |             .field("keep_alive", &self.keep_alive) | ||||||
|  |             .field("read_task", &self.read_task) | ||||||
|             .finish() |             .finish() | ||||||
|     } |     } | ||||||
| } | } | ||||||
| @@ -543,7 +577,7 @@ impl<B, K: KeepAlive> State<B, K> { | |||||||
|         } |         } | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     fn was_idle(&self) -> bool { |     fn is_idle(&self) -> bool { | ||||||
|         if let KA::Idle(..) = self.keep_alive.status() { |         if let KA::Idle(..) = self.keep_alive.status() { | ||||||
|             true |             true | ||||||
|         } else { |         } else { | ||||||
| @@ -605,14 +639,14 @@ impl<'a, T: fmt::Debug + 'a, B: AsRef<[u8]> + 'a> fmt::Debug for DebugFrame<'a, | |||||||
|  |  | ||||||
| #[cfg(test)] | #[cfg(test)] | ||||||
| mod tests { | mod tests { | ||||||
|     use futures::{Async, Stream, Sink}; |     use futures::{Async, Future, Stream, Sink}; | ||||||
|     use tokio_proto::streaming::pipeline::Frame; |     use tokio_proto::streaming::pipeline::Frame; | ||||||
|  |  | ||||||
|     use http::{self, MessageHead, ServerTransaction}; |     use http::{self, MessageHead, ServerTransaction}; | ||||||
|     use http::h1::Encoder; |     use http::h1::Encoder; | ||||||
|     use mock::AsyncIo; |     use mock::AsyncIo; | ||||||
|  |  | ||||||
|     use super::{Conn, Writing}; |     use super::{Conn, Reading, Writing}; | ||||||
|     use ::uri::Uri; |     use ::uri::Uri; | ||||||
|  |  | ||||||
|     use std::str::FromStr; |     use std::str::FromStr; | ||||||
| @@ -637,17 +671,20 @@ mod tests { | |||||||
|  |  | ||||||
|     #[test] |     #[test] | ||||||
|     fn test_conn_parse_partial() { |     fn test_conn_parse_partial() { | ||||||
|         let good_message = b"GET / HTTP/1.1\r\nHost: foo.bar\r\n\r\n".to_vec(); |         let _: Result<(), ()> = ::futures::lazy(|| { | ||||||
|         let io = AsyncIo::new_buf(good_message, 10); |             let good_message = b"GET / HTTP/1.1\r\nHost: foo.bar\r\n\r\n".to_vec(); | ||||||
|         let mut conn = Conn::<_, http::Chunk, ServerTransaction>::new(io, Default::default()); |             let io = AsyncIo::new_buf(good_message, 10); | ||||||
|         assert!(conn.poll().unwrap().is_not_ready()); |             let mut conn = Conn::<_, http::Chunk, ServerTransaction>::new(io, Default::default()); | ||||||
|         conn.io.io_mut().block_in(50); |             assert!(conn.poll().unwrap().is_not_ready()); | ||||||
|         let async = conn.poll().unwrap(); |             conn.io.io_mut().block_in(50); | ||||||
|         assert!(async.is_ready()); |             let async = conn.poll().unwrap(); | ||||||
|         match async { |             assert!(async.is_ready()); | ||||||
|             Async::Ready(Some(Frame::Message { .. })) => (), |             match async { | ||||||
|             f => panic!("frame is not Message: {:?}", f), |                 Async::Ready(Some(Frame::Message { .. })) => (), | ||||||
|         } |                 f => panic!("frame is not Message: {:?}", f), | ||||||
|  |             } | ||||||
|  |             Ok(()) | ||||||
|  |         }).wait(); | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     #[test] |     #[test] | ||||||
| @@ -664,9 +701,6 @@ mod tests { | |||||||
|  |  | ||||||
|     #[test] |     #[test] | ||||||
|     fn test_conn_body_write_length() { |     fn test_conn_body_write_length() { | ||||||
|         extern crate pretty_env_logger; |  | ||||||
|         use ::futures::Future; |  | ||||||
|         let _ = pretty_env_logger::init(); |  | ||||||
|         let _: Result<(), ()> = ::futures::lazy(|| { |         let _: Result<(), ()> = ::futures::lazy(|| { | ||||||
|             let io = AsyncIo::new_buf(vec![], 0); |             let io = AsyncIo::new_buf(vec![], 0); | ||||||
|             let mut conn = Conn::<_, http::Chunk, ServerTransaction>::new(io, Default::default()); |             let mut conn = Conn::<_, http::Chunk, ServerTransaction>::new(io, Default::default()); | ||||||
| @@ -703,7 +737,6 @@ mod tests { | |||||||
|  |  | ||||||
|     #[test] |     #[test] | ||||||
|     fn test_conn_body_write_chunked() { |     fn test_conn_body_write_chunked() { | ||||||
|         use ::futures::Future; |  | ||||||
|         let _: Result<(), ()> = ::futures::lazy(|| { |         let _: Result<(), ()> = ::futures::lazy(|| { | ||||||
|             let io = AsyncIo::new_buf(vec![], 4096); |             let io = AsyncIo::new_buf(vec![], 4096); | ||||||
|             let mut conn = Conn::<_, http::Chunk, ServerTransaction>::new(io, Default::default()); |             let mut conn = Conn::<_, http::Chunk, ServerTransaction>::new(io, Default::default()); | ||||||
| @@ -714,6 +747,65 @@ mod tests { | |||||||
|             Ok(()) |             Ok(()) | ||||||
|         }).wait(); |         }).wait(); | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  |     #[test] | ||||||
|  |     fn test_conn_parking() { | ||||||
|  |         use std::sync::Arc; | ||||||
|  |         use futures::task::Unpark; | ||||||
|  |  | ||||||
|  |         struct Car { | ||||||
|  |             permit: bool, | ||||||
|  |         } | ||||||
|  |         impl Unpark for Car { | ||||||
|  |             fn unpark(&self) { | ||||||
|  |                 assert!(self.permit, "unparked without permit"); | ||||||
|  |             } | ||||||
|  |         } | ||||||
|  |  | ||||||
|  |         fn car(permit: bool) -> Arc<Unpark> { | ||||||
|  |             Arc::new(Car { | ||||||
|  |                 permit: permit, | ||||||
|  |             }) | ||||||
|  |         } | ||||||
|  |  | ||||||
|  |         // test that once writing is done, unparks | ||||||
|  |         let f = ::futures::lazy(|| { | ||||||
|  |             let io = AsyncIo::new_buf(vec![], 4096); | ||||||
|  |             let mut conn = Conn::<_, http::Chunk, ServerTransaction>::new(io, Default::default()); | ||||||
|  |             conn.state.reading = Reading::KeepAlive; | ||||||
|  |             assert!(conn.poll().unwrap().is_not_ready()); | ||||||
|  |  | ||||||
|  |             conn.state.writing = Writing::KeepAlive; | ||||||
|  |             assert!(conn.poll_complete().unwrap().is_ready()); | ||||||
|  |             Ok::<(), ()>(()) | ||||||
|  |         }); | ||||||
|  |         ::futures::executor::spawn(f).poll_future(car(true)).unwrap(); | ||||||
|  |  | ||||||
|  |  | ||||||
|  |         // test that flushing when not waiting on read doesn't unpark | ||||||
|  |         let f = ::futures::lazy(|| { | ||||||
|  |             let io = AsyncIo::new_buf(vec![], 4096); | ||||||
|  |             let mut conn = Conn::<_, http::Chunk, ServerTransaction>::new(io, Default::default()); | ||||||
|  |             conn.state.writing = Writing::KeepAlive; | ||||||
|  |             assert!(conn.poll_complete().unwrap().is_ready()); | ||||||
|  |             Ok::<(), ()>(()) | ||||||
|  |         }); | ||||||
|  |         ::futures::executor::spawn(f).poll_future(car(false)).unwrap(); | ||||||
|  |  | ||||||
|  |  | ||||||
|  |         // test that flushing and writing isn't done doesn't unpark | ||||||
|  |         let f = ::futures::lazy(|| { | ||||||
|  |             let io = AsyncIo::new_buf(vec![], 4096); | ||||||
|  |             let mut conn = Conn::<_, http::Chunk, ServerTransaction>::new(io, Default::default()); | ||||||
|  |             conn.state.reading = Reading::KeepAlive; | ||||||
|  |             assert!(conn.poll().unwrap().is_not_ready()); | ||||||
|  |             conn.state.writing = Writing::Body(Encoder::length(5_000), None); | ||||||
|  |             assert!(conn.poll_complete().unwrap().is_ready()); | ||||||
|  |             Ok::<(), ()>(()) | ||||||
|  |         }); | ||||||
|  |         ::futures::executor::spawn(f).poll_future(car(false)).unwrap(); | ||||||
|  |     } | ||||||
|  |  | ||||||
|     #[test] |     #[test] | ||||||
|     fn test_conn_closed_write() { |     fn test_conn_closed_write() { | ||||||
|         let io = AsyncIo::new_buf(vec![], 0); |         let io = AsyncIo::new_buf(vec![], 0); | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user