fix(conn): always flush io from poll_complete

Closes #1108
This commit is contained in:
Sean McArthur
2017-04-03 10:06:39 -07:00
parent d63b7de44f
commit 997a64d770
3 changed files with 12 additions and 2 deletions

View File

@@ -827,6 +827,7 @@ mod tests {
assert!(conn.state.writing.is_queued()); assert!(conn.state.writing.is_queued());
assert!(conn.poll_complete().unwrap().is_ready()); assert!(conn.poll_complete().unwrap().is_ready());
assert!(!conn.state.writing.is_queued()); assert!(!conn.state.writing.is_queued());
assert!(conn.io.io_mut().flushed());
Ok(()) Ok(())
}).wait(); }).wait();

View File

@@ -134,15 +134,16 @@ impl<T: Write> Write for Buffered<T> {
fn flush(&mut self) -> io::Result<()> { fn flush(&mut self) -> io::Result<()> {
if self.write_buf.remaining() == 0 { if self.write_buf.remaining() == 0 {
Ok(()) self.io.flush()
} else { } else {
loop { loop {
let n = try!(self.write_buf.write_into(&mut self.io)); let n = try!(self.write_buf.write_into(&mut self.io));
debug!("flushed {} bytes", n); debug!("flushed {} bytes", n);
if self.write_buf.remaining() == 0 { if self.write_buf.remaining() == 0 {
return Ok(()) break;
} }
} }
self.io.flush()
} }
} }
} }

View File

@@ -62,6 +62,7 @@ pub struct AsyncIo<T> {
inner: T, inner: T,
bytes_until_block: usize, bytes_until_block: usize,
error: Option<io::Error>, error: Option<io::Error>,
flushed: bool,
} }
impl<T> AsyncIo<T> { impl<T> AsyncIo<T> {
@@ -70,6 +71,7 @@ impl<T> AsyncIo<T> {
inner: inner, inner: inner,
bytes_until_block: bytes, bytes_until_block: bytes,
error: None, error: None,
flushed: false,
} }
} }
@@ -86,6 +88,10 @@ impl AsyncIo<Buf> {
pub fn new_buf<T: Into<Vec<u8>>>(buf: T, bytes: usize) -> AsyncIo<Buf> { pub fn new_buf<T: Into<Vec<u8>>>(buf: T, bytes: usize) -> AsyncIo<Buf> {
AsyncIo::new(Buf::wrap(buf.into()), bytes) AsyncIo::new(Buf::wrap(buf.into()), bytes)
} }
pub fn flushed(&self) -> bool {
self.flushed
}
} }
impl<T: Read> Read for AsyncIo<T> { impl<T: Read> Read for AsyncIo<T> {
@@ -111,6 +117,7 @@ impl<T: Write> Write for AsyncIo<T> {
Err(io::Error::new(io::ErrorKind::WouldBlock, "mock block")) Err(io::Error::new(io::ErrorKind::WouldBlock, "mock block"))
} else { } else {
trace!("AsyncIo::write() block_in = {}, data.len() = {}", self.bytes_until_block, data.len()); trace!("AsyncIo::write() block_in = {}, data.len() = {}", self.bytes_until_block, data.len());
self.flushed = false;
let n = cmp::min(self.bytes_until_block, data.len()); let n = cmp::min(self.bytes_until_block, data.len());
let n = try!(self.inner.write(&data[..n])); let n = try!(self.inner.write(&data[..n]));
self.bytes_until_block -= n; self.bytes_until_block -= n;
@@ -119,6 +126,7 @@ impl<T: Write> Write for AsyncIo<T> {
} }
fn flush(&mut self) -> io::Result<()> { fn flush(&mut self) -> io::Result<()> {
self.flushed = true;
self.inner.flush() self.inner.flush()
} }
} }