fix(client): detect valid eof after reading a body
If a connection closes immediately after reading the end of the body, the next call to `conn.poll()` should not error, but just indicate that the connection is done. Closes #1396
This commit is contained in:
		| @@ -88,6 +88,9 @@ where I: AsyncRead + AsyncWrite, | |||||||
|                 trace!("poll when on keep-alive"); |                 trace!("poll when on keep-alive"); | ||||||
|                 if !T::should_read_first() { |                 if !T::should_read_first() { | ||||||
|                     self.try_empty_read()?; |                     self.try_empty_read()?; | ||||||
|  |                     if self.is_read_closed() { | ||||||
|  |                         return Ok(Async::Ready(None)); | ||||||
|  |                     } | ||||||
|                 } |                 } | ||||||
|                 self.maybe_park_read(); |                 self.maybe_park_read(); | ||||||
|                 return Ok(Async::NotReady); |                 return Ok(Async::NotReady); | ||||||
| @@ -134,6 +137,11 @@ where I: AsyncRead + AsyncWrite, | |||||||
|         } |         } | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  |     fn should_error_on_eof(&self) -> bool { | ||||||
|  |         // If we're idle, it's probably just the connection closing gracefully. | ||||||
|  |         T::should_error_on_parse_eof() && !self.state.is_idle() | ||||||
|  |     } | ||||||
|  |  | ||||||
|     pub fn read_head(&mut self) -> Poll<Option<(super::MessageHead<T::Incoming>, bool)>, ::Error> { |     pub fn read_head(&mut self) -> Poll<Option<(super::MessageHead<T::Incoming>, bool)>, ::Error> { | ||||||
|         debug_assert!(self.can_read_head()); |         debug_assert!(self.can_read_head()); | ||||||
|         trace!("Conn::read_head"); |         trace!("Conn::read_head"); | ||||||
| @@ -145,7 +153,7 @@ where I: AsyncRead + AsyncWrite, | |||||||
|                 // If we are currently waiting on a message, then an empty |                 // If we are currently waiting on a message, then an empty | ||||||
|                 // message should be reported as an error. If not, it is just |                 // message should be reported as an error. If not, it is just | ||||||
|                 // the connection closing gracefully. |                 // the connection closing gracefully. | ||||||
|                 let must_error = !self.state.is_idle() && T::should_error_on_parse_eof(); |                 let must_error = self.should_error_on_eof(); | ||||||
|                 self.state.close_read(); |                 self.state.close_read(); | ||||||
|                 self.io.consume_leading_lines(); |                 self.io.consume_leading_lines(); | ||||||
|                 let was_mid_parse = !self.io.read_buf().is_empty(); |                 let was_mid_parse = !self.io.read_buf().is_empty(); | ||||||
| @@ -185,6 +193,9 @@ where I: AsyncRead + AsyncWrite, | |||||||
|                     (true, Reading::Body(decoder)) |                     (true, Reading::Body(decoder)) | ||||||
|                 }; |                 }; | ||||||
|                 self.state.reading = reading; |                 self.state.reading = reading; | ||||||
|  |                 if !body { | ||||||
|  |                     self.try_keep_alive(); | ||||||
|  |                 } | ||||||
|                 Ok(Async::Ready(Some((head, body)))) |                 Ok(Async::Ready(Some((head, body)))) | ||||||
|             }, |             }, | ||||||
|             _ => { |             _ => { | ||||||
| @@ -219,6 +230,7 @@ where I: AsyncRead + AsyncWrite, | |||||||
|         }; |         }; | ||||||
|  |  | ||||||
|         self.state.reading = reading; |         self.state.reading = reading; | ||||||
|  |         self.try_keep_alive(); | ||||||
|         ret |         ret | ||||||
|     } |     } | ||||||
|  |  | ||||||
| @@ -251,11 +263,12 @@ where I: AsyncRead + AsyncWrite, | |||||||
|         } else { |         } else { | ||||||
|              match self.io.read_from_io() { |              match self.io.read_from_io() { | ||||||
|                 Ok(Async::Ready(0)) => { |                 Ok(Async::Ready(0)) => { | ||||||
|                     trace!("try_empty_read; found EOF on connection"); |                     trace!("try_empty_read; found EOF on connection: {:?}", self.state); | ||||||
|  |                     let must_error = self.should_error_on_eof(); | ||||||
|  |                     // order is important: must_error needs state BEFORE close_read | ||||||
|                     self.state.close_read(); |                     self.state.close_read(); | ||||||
|                     let must_error = !self.state.is_idle() && T::should_error_on_parse_eof(); |  | ||||||
|                     if must_error { |                     if must_error { | ||||||
|                         Err(io::ErrorKind::UnexpectedEof.into()) |                         Err(io::Error::new(io::ErrorKind::UnexpectedEof, "unexpected EOF waiting for response")) | ||||||
|                     } else { |                     } else { | ||||||
|                         Ok(()) |                         Ok(()) | ||||||
|                     } |                     } | ||||||
| @@ -860,7 +873,7 @@ mod tests { | |||||||
|     use super::super::h1::Encoder; |     use super::super::h1::Encoder; | ||||||
|     use mock::AsyncIo; |     use mock::AsyncIo; | ||||||
|  |  | ||||||
|     use super::{Conn, Reading, Writing}; |     use super::{Conn, Decoder, Reading, Writing}; | ||||||
|     use ::uri::Uri; |     use ::uri::Uri; | ||||||
|  |  | ||||||
|     use std::str::FromStr; |     use std::str::FromStr; | ||||||
| @@ -960,6 +973,55 @@ mod tests { | |||||||
|         }).wait(); |         }).wait(); | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  |     #[test] | ||||||
|  |     fn test_conn_body_finish_read_eof() { | ||||||
|  |         let _: Result<(), ()> = future::lazy(|| { | ||||||
|  |             let io = AsyncIo::new_eof(); | ||||||
|  |             let mut conn = Conn::<_, proto::Chunk, ClientTransaction>::new(io, Default::default()); | ||||||
|  |             conn.state.busy(); | ||||||
|  |             conn.state.writing = Writing::KeepAlive; | ||||||
|  |             conn.state.reading = Reading::Body(Decoder::length(0)); | ||||||
|  |  | ||||||
|  |             match conn.poll() { | ||||||
|  |                 Ok(Async::Ready(Some(Frame::Body { chunk: None }))) => (), | ||||||
|  |                 other => panic!("unexpected frame: {:?}", other) | ||||||
|  |             } | ||||||
|  |  | ||||||
|  |             // conn eofs, but tokio-proto will call poll() again, before calling flush() | ||||||
|  |             // the conn eof in this case is perfectly fine | ||||||
|  |  | ||||||
|  |             match conn.poll() { | ||||||
|  |                 Ok(Async::Ready(None)) => (), | ||||||
|  |                 other => panic!("unexpected frame: {:?}", other) | ||||||
|  |             } | ||||||
|  |             Ok(()) | ||||||
|  |         }).wait(); | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     #[test] | ||||||
|  |     fn test_conn_message_empty_body_read_eof() { | ||||||
|  |         let _: Result<(), ()> = future::lazy(|| { | ||||||
|  |             let io = AsyncIo::new_buf(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n".to_vec(), 1024); | ||||||
|  |             let mut conn = Conn::<_, proto::Chunk, ClientTransaction>::new(io, Default::default()); | ||||||
|  |             conn.state.busy(); | ||||||
|  |             conn.state.writing = Writing::KeepAlive; | ||||||
|  |  | ||||||
|  |             match conn.poll() { | ||||||
|  |                 Ok(Async::Ready(Some(Frame::Message { body: false, .. }))) => (), | ||||||
|  |                 other => panic!("unexpected frame: {:?}", other) | ||||||
|  |             } | ||||||
|  |  | ||||||
|  |             // conn eofs, but tokio-proto will call poll() again, before calling flush() | ||||||
|  |             // the conn eof in this case is perfectly fine | ||||||
|  |  | ||||||
|  |             match conn.poll() { | ||||||
|  |                 Ok(Async::Ready(None)) => (), | ||||||
|  |                 other => panic!("unexpected frame: {:?}", other) | ||||||
|  |             } | ||||||
|  |             Ok(()) | ||||||
|  |         }).wait(); | ||||||
|  |     } | ||||||
|  |  | ||||||
|     #[test] |     #[test] | ||||||
|     fn test_conn_closed_read() { |     fn test_conn_closed_read() { | ||||||
|         let io = AsyncIo::new_buf(vec![], 0); |         let io = AsyncIo::new_buf(vec![], 0); | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user