fix(tokio-proto): return end-of-body frame correctly for tokio-proto
Closes #1414
This commit is contained in:
@@ -237,20 +237,17 @@ where I: AsyncRead + AsyncWrite,
|
|||||||
Reading::Body(ref mut decoder) => {
|
Reading::Body(ref mut decoder) => {
|
||||||
match decoder.decode(&mut self.io) {
|
match decoder.decode(&mut self.io) {
|
||||||
Ok(Async::Ready(slice)) => {
|
Ok(Async::Ready(slice)) => {
|
||||||
let chunk = if !slice.is_empty() {
|
let (reading, chunk) = if !slice.is_empty() {
|
||||||
Some(super::Chunk::from(slice))
|
return Ok(Async::Ready(Some(super::Chunk::from(slice))));
|
||||||
} else {
|
} else if decoder.is_eof() {
|
||||||
None
|
|
||||||
};
|
|
||||||
let reading = if decoder.is_eof() {
|
|
||||||
debug!("incoming body completed");
|
debug!("incoming body completed");
|
||||||
Reading::KeepAlive
|
(Reading::KeepAlive, None)
|
||||||
} else if chunk.is_some() {
|
|
||||||
Reading::Body(decoder.clone())
|
|
||||||
} else {
|
} else {
|
||||||
trace!("decode stream unexpectedly ended");
|
trace!("decode stream unexpectedly ended");
|
||||||
//TODO: Should this return an UnexpectedEof?
|
// this should actually be unreachable:
|
||||||
Reading::Closed
|
// the decoder will return an UnexpectedEof if there were
|
||||||
|
// no bytes to read and it isn't eof yet...
|
||||||
|
(Reading::Closed, None)
|
||||||
};
|
};
|
||||||
(reading, Ok(Async::Ready(chunk)))
|
(reading, Ok(Async::Ready(chunk)))
|
||||||
},
|
},
|
||||||
@@ -1078,6 +1075,37 @@ mod tests {
|
|||||||
}).wait();
|
}).wait();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_conn_read_body_end() {
|
||||||
|
let _: Result<(), ()> = future::lazy(|| {
|
||||||
|
let io = AsyncIo::new_buf(b"POST / HTTP/1.1\r\nContent-Length: 5\r\n\r\n12345".to_vec(), 1024);
|
||||||
|
let mut conn = Conn::<_, proto::Chunk, ServerTransaction>::new(io, Default::default());
|
||||||
|
conn.state.busy();
|
||||||
|
|
||||||
|
match conn.poll() {
|
||||||
|
Ok(Async::Ready(Some(Frame::Message { body: true, .. }))) => (),
|
||||||
|
other => panic!("unexpected frame: {:?}", other)
|
||||||
|
}
|
||||||
|
|
||||||
|
match conn.poll() {
|
||||||
|
Ok(Async::Ready(Some(Frame::Body { chunk: Some(_) }))) => (),
|
||||||
|
other => panic!("unexpected frame: {:?}", other)
|
||||||
|
}
|
||||||
|
|
||||||
|
// When the body is done, `poll` MUST return a `Body` frame with chunk set to `None`
|
||||||
|
match conn.poll() {
|
||||||
|
Ok(Async::Ready(Some(Frame::Body { chunk: None }))) => (),
|
||||||
|
other => panic!("unexpected frame: {:?}", other)
|
||||||
|
}
|
||||||
|
|
||||||
|
match conn.poll() {
|
||||||
|
Ok(Async::NotReady) => (),
|
||||||
|
other => panic!("unexpected frame: {:?}", other)
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}).wait();
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_conn_closed_read() {
|
fn test_conn_closed_read() {
|
||||||
let io = AsyncIo::new_buf(vec![], 0);
|
let io = AsyncIo::new_buf(vec![], 0);
|
||||||
|
|||||||
@@ -75,7 +75,6 @@ impl Decoder {
|
|||||||
// methods
|
// methods
|
||||||
|
|
||||||
pub fn is_eof(&self) -> bool {
|
pub fn is_eof(&self) -> bool {
|
||||||
trace!("is_eof? {:?}", self);
|
|
||||||
match self.kind {
|
match self.kind {
|
||||||
Length(0) |
|
Length(0) |
|
||||||
Chunked(ChunkedState::End, _) |
|
Chunked(ChunkedState::End, _) |
|
||||||
@@ -85,16 +84,15 @@ impl Decoder {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn decode<R: MemRead>(&mut self, body: &mut R) -> Poll<Bytes, io::Error> {
|
pub fn decode<R: MemRead>(&mut self, body: &mut R) -> Poll<Bytes, io::Error> {
|
||||||
|
trace!("decode; state={:?}", self.kind);
|
||||||
match self.kind {
|
match self.kind {
|
||||||
Length(ref mut remaining) => {
|
Length(ref mut remaining) => {
|
||||||
trace!("Sized read, remaining={:?}", remaining);
|
|
||||||
if *remaining == 0 {
|
if *remaining == 0 {
|
||||||
Ok(Async::Ready(Bytes::new()))
|
Ok(Async::Ready(Bytes::new()))
|
||||||
} else {
|
} else {
|
||||||
let to_read = *remaining as usize;
|
let to_read = *remaining as usize;
|
||||||
let buf = try_ready!(body.read_mem(to_read));
|
let buf = try_ready!(body.read_mem(to_read));
|
||||||
let num = buf.as_ref().len() as u64;
|
let num = buf.as_ref().len() as u64;
|
||||||
trace!("Length read: {}", num);
|
|
||||||
if num > *remaining {
|
if num > *remaining {
|
||||||
*remaining = 0;
|
*remaining = 0;
|
||||||
} else if num == 0 {
|
} else if num == 0 {
|
||||||
|
|||||||
Reference in New Issue
Block a user