diff --git a/src/http/conn.rs b/src/http/conn.rs index f25cd0c9..f28d7f2b 100644 --- a/src/http/conn.rs +++ b/src/http/conn.rs @@ -192,6 +192,11 @@ where I: Io, } } + fn try_keep_alive(&mut self) { + self.state.try_keep_alive(); + self.maybe_unpark(); + } + fn can_write_head(&self) -> bool { match self.state.writing { Writing::Init => true, @@ -209,6 +214,13 @@ where I: Io, } } + fn has_queued_body(&self) -> bool { + match self.state.writing { + Writing::Body(_, Some(_)) => true, + _ => false, + } + } + fn write_head(&mut self, mut head: http::MessageHead, body: bool) -> StartSend,io::Error> { debug_assert!(self.can_write_head()); if !body { @@ -239,6 +251,10 @@ where I: Io, fn write_body(&mut self, chunk: Option) -> StartSend, io::Error> { debug_assert!(self.can_write_body()); + if self.has_queued_body() { + try!(self.flush()); + } + let state = match self.state.writing { Writing::Body(ref mut encoder, ref mut queued) => { if queued.is_some() { @@ -319,12 +335,16 @@ where I: Io, } fn flush(&mut self) -> Poll<(), io::Error> { - let ret = try!(self.write_queued()); - try_nb!(self.io.flush()); - self.state.try_keep_alive(); + loop { + let queue_finished = try!(self.write_queued()).is_ready(); + try_nb!(self.io.flush()); + if queue_finished { + break; + } + } + self.try_keep_alive(); trace!("flushed {:?}", self.state); - self.maybe_unpark(); - Ok(ret) + Ok(Async::Ready(())) } } @@ -652,6 +672,15 @@ mod tests { use std::str::FromStr; + impl Writing { + fn is_queued(&self) -> bool { + match *self { + Writing::Body(_, Some(_)) => true, + _ => false, + } + } + } + #[test] fn test_conn_init_read() { let good_message = b"GET / HTTP/1.1\r\n\r\n".to_vec(); @@ -709,17 +738,10 @@ mod tests { conn.state.writing = Writing::Body(Encoder::length((max * 2) as u64), None); assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'a'; 1024 * 4].into()) }).unwrap().is_ready()); - match conn.state.writing { - Writing::Body(_, None) => {}, - _ => panic!("writing did not queue chunk: {:?}", conn.state.writing), - } + assert!(!conn.state.writing.is_queued()); assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'b'; max].into()) }).unwrap().is_ready()); - - match conn.state.writing { - Writing::Body(_, Some(_)) => {}, - _ => panic!("writing did not queue chunk: {:?}", conn.state.writing), - } + assert!(conn.state.writing.is_queued()); assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'b'; 1024 * 4].into()) }).unwrap().is_not_ready()); @@ -728,7 +750,6 @@ mod tests { conn.io.io_mut().block_in(1024 * 3); assert!(conn.poll_complete().unwrap().is_not_ready()); conn.io.io_mut().block_in(max * 2); - assert!(conn.poll_complete().unwrap().is_not_ready()); assert!(conn.poll_complete().unwrap().is_ready()); assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'c'; 1024 * 4].into()) }).unwrap().is_ready()); @@ -749,6 +770,22 @@ mod tests { }).wait(); } + #[test] + fn test_conn_body_flush() { + let _: Result<(), ()> = ::futures::lazy(|| { + let io = AsyncIo::new_buf(vec![], 1024 * 1024 * 5); + let mut conn = Conn::<_, http::Chunk, ServerTransaction>::new(io, Default::default()); + conn.state.writing = Writing::Body(Encoder::length(1024 * 1024), None); + assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'a'; 1024 * 1024].into()) }).unwrap().is_ready()); + println!("{:#?}", conn); + assert!(conn.state.writing.is_queued()); + assert!(conn.poll_complete().unwrap().is_ready()); + assert!(!conn.state.writing.is_queued()); + + Ok(()) + }).wait(); + } + #[test] fn test_conn_parking() { use std::sync::Arc; diff --git a/src/http/io.rs b/src/http/io.rs index c7a9924e..20a981ca 100644 --- a/src/http/io.rs +++ b/src/http/io.rs @@ -269,7 +269,7 @@ impl WriteBuf { let mut vec = &mut self.0.bytes; let cap = vec.capacity(); if cap == 0 { - let init = cmp::max(INIT_BUFFER_SIZE, needed); + let init = cmp::min(MAX_BUFFER_SIZE, cmp::max(INIT_BUFFER_SIZE, needed)); trace!("WriteBuf reserving initial {}", init); vec.reserve(init); } else if cap < MAX_BUFFER_SIZE {