@@ -745,7 +745,6 @@ mod tests {
|
||||
let mut conn = Conn::<_, http::Chunk, ServerTransaction>::new(io, Default::default());
|
||||
conn.state.idle();
|
||||
|
||||
assert!(conn.poll().unwrap().is_not_ready());
|
||||
match conn.poll().unwrap() {
|
||||
Async::Ready(Some(Frame::Error { .. })) => {},
|
||||
other => panic!("frame is not Error: {:?}", other)
|
||||
|
||||
@@ -55,31 +55,33 @@ impl<T: AsyncRead + AsyncWrite> Buffered<T> {
|
||||
}
|
||||
|
||||
pub fn parse<S: Http1Transaction>(&mut self) -> ::Result<Option<MessageHead<S::Incoming>>> {
|
||||
self.reserve_read_buf();
|
||||
match self.read_from_io() {
|
||||
Ok(0) => {
|
||||
trace!("parse eof");
|
||||
return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "parse eof").into());
|
||||
loop {
|
||||
match try!(parse::<S, _>(&mut self.read_buf)) {
|
||||
Some(head) => {
|
||||
//trace!("parsed {} bytes out of {}", len, self.read_buf.len());
|
||||
return Ok(Some(head.0))
|
||||
},
|
||||
None => {
|
||||
if self.read_buf.capacity() >= MAX_BUFFER_SIZE {
|
||||
debug!("MAX_BUFFER_SIZE reached, closing");
|
||||
return Err(::Error::TooLarge);
|
||||
}
|
||||
},
|
||||
}
|
||||
Ok(_) => {},
|
||||
Err(e) => match e.kind() {
|
||||
io::ErrorKind::WouldBlock => {},
|
||||
_ => return Err(e.into())
|
||||
}
|
||||
}
|
||||
match try!(parse::<S, _>(&mut self.read_buf)) {
|
||||
Some(head) => {
|
||||
//trace!("parsed {} bytes out of {}", len, self.read_buf.len());
|
||||
Ok(Some(head.0))
|
||||
},
|
||||
None => {
|
||||
if self.read_buf.capacity() >= MAX_BUFFER_SIZE {
|
||||
debug!("MAX_BUFFER_SIZE reached, closing");
|
||||
Err(::Error::TooLarge)
|
||||
} else {
|
||||
Ok(None)
|
||||
self.reserve_read_buf();
|
||||
match self.read_from_io() {
|
||||
Ok(0) => {
|
||||
trace!("parse eof");
|
||||
return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "parse eof").into());
|
||||
}
|
||||
},
|
||||
Ok(_) => {},
|
||||
Err(e) => match e.kind() {
|
||||
io::ErrorKind::WouldBlock => {
|
||||
return Ok(None);
|
||||
},
|
||||
_ => return Err(e.into())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -340,3 +342,15 @@ fn test_iobuf_write_empty_slice() {
|
||||
// when there is nothing to flush
|
||||
io_buf.flush().expect("should short-circuit flush");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_reads_until_blocked() {
|
||||
use mock::{AsyncIo, Buf as MockBuf};
|
||||
// missing last line ending
|
||||
let raw = "HTTP/1.1 200 OK\r\n";
|
||||
|
||||
let mock = AsyncIo::new(MockBuf::wrap(raw.into()), raw.len());
|
||||
let mut buffered = Buffered::new(mock);
|
||||
assert_eq!(buffered.parse::<super::ClientTransaction>().unwrap(), None);
|
||||
assert!(buffered.io.blocked());
|
||||
}
|
||||
|
||||
@@ -62,6 +62,7 @@ pub struct AsyncIo<T> {
|
||||
inner: T,
|
||||
bytes_until_block: usize,
|
||||
error: Option<io::Error>,
|
||||
blocked: bool,
|
||||
flushed: bool,
|
||||
}
|
||||
|
||||
@@ -72,6 +73,7 @@ impl<T> AsyncIo<T> {
|
||||
bytes_until_block: bytes,
|
||||
error: None,
|
||||
flushed: false,
|
||||
blocked: false,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -92,13 +94,19 @@ impl AsyncIo<Buf> {
|
||||
pub fn flushed(&self) -> bool {
|
||||
self.flushed
|
||||
}
|
||||
|
||||
pub fn blocked(&self) -> bool {
|
||||
self.blocked
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Read> Read for AsyncIo<T> {
|
||||
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
|
||||
self.blocked = false;
|
||||
if let Some(err) = self.error.take() {
|
||||
Err(err)
|
||||
} else if self.bytes_until_block == 0 {
|
||||
self.blocked = true;
|
||||
Err(io::Error::new(io::ErrorKind::WouldBlock, "mock block"))
|
||||
} else {
|
||||
let n = cmp::min(self.bytes_until_block, buf.len());
|
||||
|
||||
Reference in New Issue
Block a user