Merge pull request #1116 from hyperium/parse-till-blocked
fix(conn): always read till blocked when parsing
This commit is contained in:
		| @@ -745,7 +745,6 @@ mod tests { | ||||
|         let mut conn = Conn::<_, http::Chunk, ServerTransaction>::new(io, Default::default()); | ||||
|         conn.state.idle(); | ||||
|  | ||||
|         assert!(conn.poll().unwrap().is_not_ready()); | ||||
|         match conn.poll().unwrap() { | ||||
|             Async::Ready(Some(Frame::Error { .. })) => {}, | ||||
|             other => panic!("frame is not Error: {:?}", other) | ||||
|   | ||||
| @@ -55,31 +55,33 @@ impl<T: AsyncRead + AsyncWrite> Buffered<T> { | ||||
|     } | ||||
|  | ||||
|     pub fn parse<S: Http1Transaction>(&mut self) -> ::Result<Option<MessageHead<S::Incoming>>> { | ||||
|         self.reserve_read_buf(); | ||||
|         match self.read_from_io() { | ||||
|             Ok(0) => { | ||||
|                 trace!("parse eof"); | ||||
|                 return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "parse eof").into()); | ||||
|         loop { | ||||
|             match try!(parse::<S, _>(&mut self.read_buf)) { | ||||
|                 Some(head) => { | ||||
|                     //trace!("parsed {} bytes out of {}", len, self.read_buf.len()); | ||||
|                     return Ok(Some(head.0)) | ||||
|                 }, | ||||
|                 None => { | ||||
|                     if self.read_buf.capacity() >= MAX_BUFFER_SIZE { | ||||
|                         debug!("MAX_BUFFER_SIZE reached, closing"); | ||||
|                         return Err(::Error::TooLarge); | ||||
|                     } | ||||
|                 }, | ||||
|             } | ||||
|             Ok(_) => {}, | ||||
|             Err(e) => match e.kind() { | ||||
|                 io::ErrorKind::WouldBlock => {}, | ||||
|                 _ => return Err(e.into()) | ||||
|             } | ||||
|         } | ||||
|         match try!(parse::<S, _>(&mut self.read_buf)) { | ||||
|             Some(head) => { | ||||
|                 //trace!("parsed {} bytes out of {}", len, self.read_buf.len()); | ||||
|                 Ok(Some(head.0)) | ||||
|             }, | ||||
|             None => { | ||||
|                 if self.read_buf.capacity() >= MAX_BUFFER_SIZE { | ||||
|                     debug!("MAX_BUFFER_SIZE reached, closing"); | ||||
|                     Err(::Error::TooLarge) | ||||
|                 } else { | ||||
|                     Ok(None) | ||||
|             self.reserve_read_buf(); | ||||
|             match self.read_from_io() { | ||||
|                 Ok(0) => { | ||||
|                     trace!("parse eof"); | ||||
|                     return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "parse eof").into()); | ||||
|                 } | ||||
|             }, | ||||
|                 Ok(_) => {}, | ||||
|                 Err(e) => match e.kind() { | ||||
|                     io::ErrorKind::WouldBlock => { | ||||
|                         return Ok(None); | ||||
|                     }, | ||||
|                     _ => return Err(e.into()) | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|     } | ||||
|  | ||||
| @@ -340,3 +342,15 @@ fn test_iobuf_write_empty_slice() { | ||||
|     // when there is nothing to flush | ||||
|     io_buf.flush().expect("should short-circuit flush"); | ||||
| } | ||||
|  | ||||
| #[test] | ||||
| fn test_parse_reads_until_blocked() { | ||||
|     use mock::{AsyncIo, Buf as MockBuf}; | ||||
|     // missing last line ending | ||||
|     let raw = "HTTP/1.1 200 OK\r\n"; | ||||
|  | ||||
|     let mock = AsyncIo::new(MockBuf::wrap(raw.into()), raw.len()); | ||||
|     let mut buffered = Buffered::new(mock); | ||||
|     assert_eq!(buffered.parse::<super::ClientTransaction>().unwrap(), None); | ||||
|     assert!(buffered.io.blocked()); | ||||
| } | ||||
|   | ||||
| @@ -62,6 +62,7 @@ pub struct AsyncIo<T> { | ||||
|     inner: T, | ||||
|     bytes_until_block: usize, | ||||
|     error: Option<io::Error>, | ||||
|     blocked: bool, | ||||
|     flushed: bool, | ||||
| } | ||||
|  | ||||
| @@ -72,6 +73,7 @@ impl<T> AsyncIo<T> { | ||||
|             bytes_until_block: bytes, | ||||
|             error: None, | ||||
|             flushed: false, | ||||
|             blocked: false, | ||||
|         } | ||||
|     } | ||||
|  | ||||
| @@ -92,13 +94,19 @@ impl AsyncIo<Buf> { | ||||
|     pub fn flushed(&self) -> bool { | ||||
|         self.flushed | ||||
|     } | ||||
|  | ||||
|     pub fn blocked(&self) -> bool { | ||||
|         self.blocked | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl<T: Read> Read for AsyncIo<T> { | ||||
|     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { | ||||
|         self.blocked = false; | ||||
|         if let Some(err) = self.error.take() { | ||||
|             Err(err) | ||||
|         } else if self.bytes_until_block == 0 { | ||||
|             self.blocked = true; | ||||
|             Err(io::Error::new(io::ErrorKind::WouldBlock, "mock block")) | ||||
|         } else { | ||||
|             let n = cmp::min(self.bytes_until_block, buf.len()); | ||||
|   | ||||
		Reference in New Issue
	
	Block a user