diff --git a/src/http/conn.rs b/src/http/conn.rs index 2136853c..09694917 100644 --- a/src/http/conn.rs +++ b/src/http/conn.rs @@ -745,7 +745,6 @@ mod tests { let mut conn = Conn::<_, http::Chunk, ServerTransaction>::new(io, Default::default()); conn.state.idle(); - assert!(conn.poll().unwrap().is_not_ready()); match conn.poll().unwrap() { Async::Ready(Some(Frame::Error { .. })) => {}, other => panic!("frame is not Error: {:?}", other) diff --git a/src/http/io.rs b/src/http/io.rs index f8d6033a..5c133f06 100644 --- a/src/http/io.rs +++ b/src/http/io.rs @@ -55,31 +55,33 @@ impl Buffered { } pub fn parse(&mut self) -> ::Result>> { - self.reserve_read_buf(); - match self.read_from_io() { - Ok(0) => { - trace!("parse eof"); - return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "parse eof").into()); + loop { + match try!(parse::(&mut self.read_buf)) { + Some(head) => { + //trace!("parsed {} bytes out of {}", len, self.read_buf.len()); + return Ok(Some(head.0)) + }, + None => { + if self.read_buf.capacity() >= MAX_BUFFER_SIZE { + debug!("MAX_BUFFER_SIZE reached, closing"); + return Err(::Error::TooLarge); + } + }, } - Ok(_) => {}, - Err(e) => match e.kind() { - io::ErrorKind::WouldBlock => {}, - _ => return Err(e.into()) - } - } - match try!(parse::(&mut self.read_buf)) { - Some(head) => { - //trace!("parsed {} bytes out of {}", len, self.read_buf.len()); - Ok(Some(head.0)) - }, - None => { - if self.read_buf.capacity() >= MAX_BUFFER_SIZE { - debug!("MAX_BUFFER_SIZE reached, closing"); - Err(::Error::TooLarge) - } else { - Ok(None) + self.reserve_read_buf(); + match self.read_from_io() { + Ok(0) => { + trace!("parse eof"); + return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "parse eof").into()); } - }, + Ok(_) => {}, + Err(e) => match e.kind() { + io::ErrorKind::WouldBlock => { + return Ok(None); + }, + _ => return Err(e.into()) + } + } } } @@ -340,3 +342,15 @@ fn test_iobuf_write_empty_slice() { // when there is nothing to flush io_buf.flush().expect("should short-circuit flush"); } + +#[test] +fn test_parse_reads_until_blocked() { + use mock::{AsyncIo, Buf as MockBuf}; + // missing last line ending + let raw = "HTTP/1.1 200 OK\r\n"; + + let mock = AsyncIo::new(MockBuf::wrap(raw.into()), raw.len()); + let mut buffered = Buffered::new(mock); + assert_eq!(buffered.parse::().unwrap(), None); + assert!(buffered.io.blocked()); +} diff --git a/src/mock.rs b/src/mock.rs index 53388645..4e5f1163 100644 --- a/src/mock.rs +++ b/src/mock.rs @@ -62,6 +62,7 @@ pub struct AsyncIo { inner: T, bytes_until_block: usize, error: Option, + blocked: bool, flushed: bool, } @@ -72,6 +73,7 @@ impl AsyncIo { bytes_until_block: bytes, error: None, flushed: false, + blocked: false, } } @@ -92,13 +94,19 @@ impl AsyncIo { pub fn flushed(&self) -> bool { self.flushed } + + pub fn blocked(&self) -> bool { + self.blocked + } } impl Read for AsyncIo { fn read(&mut self, buf: &mut [u8]) -> io::Result { + self.blocked = false; if let Some(err) = self.error.take() { Err(err) } else if self.bytes_until_block == 0 { + self.blocked = true; Err(io::Error::new(io::ErrorKind::WouldBlock, "mock block")) } else { let n = cmp::min(self.bytes_until_block, buf.len());