fix(http): more eagerly flush when the Conn is full

Closes #1078
This commit is contained in:
Sean McArthur
2017-02-28 16:05:50 -08:00
parent 359f5ff4d7
commit ab939511f0
2 changed files with 53 additions and 16 deletions

View File

@@ -192,6 +192,11 @@ where I: Io,
}
}
fn try_keep_alive(&mut self) {
self.state.try_keep_alive();
self.maybe_unpark();
}
fn can_write_head(&self) -> bool {
match self.state.writing {
Writing::Init => true,
@@ -209,6 +214,13 @@ where I: Io,
}
}
fn has_queued_body(&self) -> bool {
match self.state.writing {
Writing::Body(_, Some(_)) => true,
_ => false,
}
}
fn write_head(&mut self, mut head: http::MessageHead<T::Outgoing>, body: bool) -> StartSend<http::MessageHead<T::Outgoing>,io::Error> {
debug_assert!(self.can_write_head());
if !body {
@@ -239,6 +251,10 @@ where I: Io,
fn write_body(&mut self, chunk: Option<B>) -> StartSend<Option<B>, io::Error> {
debug_assert!(self.can_write_body());
if self.has_queued_body() {
try!(self.flush());
}
let state = match self.state.writing {
Writing::Body(ref mut encoder, ref mut queued) => {
if queued.is_some() {
@@ -319,12 +335,16 @@ where I: Io,
}
fn flush(&mut self) -> Poll<(), io::Error> {
let ret = try!(self.write_queued());
try_nb!(self.io.flush());
self.state.try_keep_alive();
loop {
let queue_finished = try!(self.write_queued()).is_ready();
try_nb!(self.io.flush());
if queue_finished {
break;
}
}
self.try_keep_alive();
trace!("flushed {:?}", self.state);
self.maybe_unpark();
Ok(ret)
Ok(Async::Ready(()))
}
}
@@ -652,6 +672,15 @@ mod tests {
use std::str::FromStr;
impl<T> Writing<T> {
fn is_queued(&self) -> bool {
match *self {
Writing::Body(_, Some(_)) => true,
_ => false,
}
}
}
#[test]
fn test_conn_init_read() {
let good_message = b"GET / HTTP/1.1\r\n\r\n".to_vec();
@@ -709,17 +738,10 @@ mod tests {
conn.state.writing = Writing::Body(Encoder::length((max * 2) as u64), None);
assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'a'; 1024 * 4].into()) }).unwrap().is_ready());
match conn.state.writing {
Writing::Body(_, None) => {},
_ => panic!("writing did not queue chunk: {:?}", conn.state.writing),
}
assert!(!conn.state.writing.is_queued());
assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'b'; max].into()) }).unwrap().is_ready());
match conn.state.writing {
Writing::Body(_, Some(_)) => {},
_ => panic!("writing did not queue chunk: {:?}", conn.state.writing),
}
assert!(conn.state.writing.is_queued());
assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'b'; 1024 * 4].into()) }).unwrap().is_not_ready());
@@ -728,7 +750,6 @@ mod tests {
conn.io.io_mut().block_in(1024 * 3);
assert!(conn.poll_complete().unwrap().is_not_ready());
conn.io.io_mut().block_in(max * 2);
assert!(conn.poll_complete().unwrap().is_not_ready());
assert!(conn.poll_complete().unwrap().is_ready());
assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'c'; 1024 * 4].into()) }).unwrap().is_ready());
@@ -749,6 +770,22 @@ mod tests {
}).wait();
}
#[test]
fn test_conn_body_flush() {
let _: Result<(), ()> = ::futures::lazy(|| {
let io = AsyncIo::new_buf(vec![], 1024 * 1024 * 5);
let mut conn = Conn::<_, http::Chunk, ServerTransaction>::new(io, Default::default());
conn.state.writing = Writing::Body(Encoder::length(1024 * 1024), None);
assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'a'; 1024 * 1024].into()) }).unwrap().is_ready());
println!("{:#?}", conn);
assert!(conn.state.writing.is_queued());
assert!(conn.poll_complete().unwrap().is_ready());
assert!(!conn.state.writing.is_queued());
Ok(())
}).wait();
}
#[test]
fn test_conn_parking() {
use std::sync::Arc;

View File

@@ -269,7 +269,7 @@ impl WriteBuf {
let mut vec = &mut self.0.bytes;
let cap = vec.capacity();
if cap == 0 {
let init = cmp::max(INIT_BUFFER_SIZE, needed);
let init = cmp::min(MAX_BUFFER_SIZE, cmp::max(INIT_BUFFER_SIZE, needed));
trace!("WriteBuf reserving initial {}", init);
vec.reserve(init);
} else if cap < MAX_BUFFER_SIZE {