Merge pull request #1110 from hyperium/flush
fix(conn): always flush io from poll_complete
This commit is contained in:
@@ -827,6 +827,7 @@ mod tests {
|
|||||||
assert!(conn.state.writing.is_queued());
|
assert!(conn.state.writing.is_queued());
|
||||||
assert!(conn.poll_complete().unwrap().is_ready());
|
assert!(conn.poll_complete().unwrap().is_ready());
|
||||||
assert!(!conn.state.writing.is_queued());
|
assert!(!conn.state.writing.is_queued());
|
||||||
|
assert!(conn.io.io_mut().flushed());
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}).wait();
|
}).wait();
|
||||||
|
|||||||
@@ -134,15 +134,16 @@ impl<T: Write> Write for Buffered<T> {
|
|||||||
|
|
||||||
fn flush(&mut self) -> io::Result<()> {
|
fn flush(&mut self) -> io::Result<()> {
|
||||||
if self.write_buf.remaining() == 0 {
|
if self.write_buf.remaining() == 0 {
|
||||||
Ok(())
|
self.io.flush()
|
||||||
} else {
|
} else {
|
||||||
loop {
|
loop {
|
||||||
let n = try!(self.write_buf.write_into(&mut self.io));
|
let n = try!(self.write_buf.write_into(&mut self.io));
|
||||||
debug!("flushed {} bytes", n);
|
debug!("flushed {} bytes", n);
|
||||||
if self.write_buf.remaining() == 0 {
|
if self.write_buf.remaining() == 0 {
|
||||||
return Ok(())
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
self.io.flush()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -62,6 +62,7 @@ pub struct AsyncIo<T> {
|
|||||||
inner: T,
|
inner: T,
|
||||||
bytes_until_block: usize,
|
bytes_until_block: usize,
|
||||||
error: Option<io::Error>,
|
error: Option<io::Error>,
|
||||||
|
flushed: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> AsyncIo<T> {
|
impl<T> AsyncIo<T> {
|
||||||
@@ -70,6 +71,7 @@ impl<T> AsyncIo<T> {
|
|||||||
inner: inner,
|
inner: inner,
|
||||||
bytes_until_block: bytes,
|
bytes_until_block: bytes,
|
||||||
error: None,
|
error: None,
|
||||||
|
flushed: false,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -86,6 +88,10 @@ impl AsyncIo<Buf> {
|
|||||||
pub fn new_buf<T: Into<Vec<u8>>>(buf: T, bytes: usize) -> AsyncIo<Buf> {
|
pub fn new_buf<T: Into<Vec<u8>>>(buf: T, bytes: usize) -> AsyncIo<Buf> {
|
||||||
AsyncIo::new(Buf::wrap(buf.into()), bytes)
|
AsyncIo::new(Buf::wrap(buf.into()), bytes)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn flushed(&self) -> bool {
|
||||||
|
self.flushed
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: Read> Read for AsyncIo<T> {
|
impl<T: Read> Read for AsyncIo<T> {
|
||||||
@@ -111,6 +117,7 @@ impl<T: Write> Write for AsyncIo<T> {
|
|||||||
Err(io::Error::new(io::ErrorKind::WouldBlock, "mock block"))
|
Err(io::Error::new(io::ErrorKind::WouldBlock, "mock block"))
|
||||||
} else {
|
} else {
|
||||||
trace!("AsyncIo::write() block_in = {}, data.len() = {}", self.bytes_until_block, data.len());
|
trace!("AsyncIo::write() block_in = {}, data.len() = {}", self.bytes_until_block, data.len());
|
||||||
|
self.flushed = false;
|
||||||
let n = cmp::min(self.bytes_until_block, data.len());
|
let n = cmp::min(self.bytes_until_block, data.len());
|
||||||
let n = try!(self.inner.write(&data[..n]));
|
let n = try!(self.inner.write(&data[..n]));
|
||||||
self.bytes_until_block -= n;
|
self.bytes_until_block -= n;
|
||||||
@@ -119,6 +126,7 @@ impl<T: Write> Write for AsyncIo<T> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn flush(&mut self) -> io::Result<()> {
|
fn flush(&mut self) -> io::Result<()> {
|
||||||
|
self.flushed = true;
|
||||||
self.inner.flush()
|
self.inner.flush()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user