diff --git a/src/mock.rs b/src/mock.rs index 30bd4a6e..1eb8125d 100644 --- a/src/mock.rs +++ b/src/mock.rs @@ -70,6 +70,7 @@ pub struct AsyncIo { flushed: bool, inner: T, max_read_vecs: usize, + num_writes: usize, } impl AsyncIo { @@ -81,6 +82,7 @@ impl AsyncIo { flushed: false, inner: inner, max_read_vecs: READ_VECS_CNT, + num_writes: 0, } } @@ -107,6 +109,10 @@ impl AsyncIo { pub fn blocked(&self) -> bool { self.blocked } + + pub fn num_writes(&self) -> usize { + self.num_writes + } } impl AsyncIo { @@ -160,6 +166,7 @@ impl Read for AsyncIo { impl Write for AsyncIo { fn write(&mut self, data: &[u8]) -> io::Result { + self.num_writes += 1; if let Some(err) = self.error.take() { Err(err) } else if self.bytes_until_block == 0 { @@ -198,6 +205,9 @@ impl AsyncWrite for AsyncIo { let i = ::bytes::Buf::bytes_vec(&buf, &mut bufs[..self.max_read_vecs]); let mut n = 0; let mut ret = Ok(0); + // each call to write() will increase our count, but we assume + // that if iovecs are used, its really only 1 write call. + let num_writes = self.num_writes; for iovec in &bufs[..i] { match self.write(iovec) { Ok(num) => { @@ -216,6 +226,7 @@ impl AsyncWrite for AsyncIo { } } } + self.num_writes = num_writes + 1; ret }; match r { diff --git a/src/proto/h1/io.rs b/src/proto/h1/io.rs index 0b1b6dba..84c7646c 100644 --- a/src/proto/h1/io.rs +++ b/src/proto/h1/io.rs @@ -1,3 +1,4 @@ +use std::cell::Cell; use std::collections::VecDeque; use std::fmt; use std::io; @@ -55,7 +56,7 @@ where self.write_buf.set_strategy(if enabled { Strategy::Flatten } else { - Strategy::Queue + Strategy::Auto }); } @@ -68,6 +69,11 @@ where self.read_buf.as_ref() } + //TODO(perf): don't return a `&mut Vec`, but a wrapper + //that protects the Vec when growing. Specifically, if this + //Vec couldn't be reset, as it's position isn't at the end, + //any new reserves will copy the bytes before the position, + //which is unnecessary. pub fn write_buf_mut(&mut self) -> &mut Vec { let buf = self.write_buf.head_mut(); buf.maybe_reset(); @@ -154,7 +160,7 @@ where try_nb!(self.io.flush()); } else { loop { - let n = try_ready!(self.io.write_buf(&mut self.write_buf)); + let n = try_ready!(self.io.write_buf(&mut self.write_buf.auto())); debug!("flushed {} bytes", n); if self.write_buf.remaining() == 0 { break; @@ -263,7 +269,7 @@ impl WriteBuf { WriteBuf { buf: BufDeque::new(), max_buf_size: DEFAULT_MAX_BUFFER_SIZE, - strategy: Strategy::Queue, + strategy: Strategy::Auto, } } } @@ -277,6 +283,11 @@ where self.strategy = strategy; } + #[inline] + fn auto(&mut self) -> WriteBufAuto { + WriteBufAuto::new(self) + } + fn buffer(&mut self, buf: B) { match self.strategy { Strategy::Flatten => { @@ -284,7 +295,7 @@ where head.maybe_reset(); head.bytes.put(buf); }, - Strategy::Queue => { + Strategy::Auto | Strategy::Queue => { self.buf.bufs.push_back(VecOrBuf::Buf(buf)); }, } @@ -295,8 +306,7 @@ where Strategy::Flatten => { self.remaining() < self.max_buf_size }, - Strategy::Queue => { - // for now, the simplest of heuristics + Strategy::Auto | Strategy::Queue => { self.buf.bufs.len() < MAX_BUF_LIST_BUFFERS && self.remaining() < self.max_buf_size }, @@ -355,8 +365,68 @@ impl Buf for WriteBuf { } } +/// Detects when wrapped `WriteBuf` is used for vectored IO, and +/// adjusts the `WriteBuf` strategy if not. +struct WriteBufAuto<'a, B: Buf + 'a> { + bytes_called: Cell, + bytes_vec_called: Cell, + inner: &'a mut WriteBuf, +} + +impl<'a, B: Buf> WriteBufAuto<'a, B> { + fn new(inner: &'a mut WriteBuf) -> WriteBufAuto<'a, B> { + WriteBufAuto { + bytes_called: Cell::new(false), + bytes_vec_called: Cell::new(false), + inner: inner, + } + } +} + +impl<'a, B: Buf> Buf for WriteBufAuto<'a, B> { + #[inline] + fn remaining(&self) -> usize { + self.inner.remaining() + } + + #[inline] + fn bytes(&self) -> &[u8] { + self.bytes_called.set(true); + self.inner.bytes() + } + + #[inline] + fn advance(&mut self, cnt: usize) { + self.inner.advance(cnt) + } + + #[inline] + fn bytes_vec<'t>(&'t self, dst: &mut [&'t IoVec]) -> usize { + self.bytes_vec_called.set(true); + self.inner.bytes_vec(dst) + } +} + +impl<'a, B: Buf + 'a> Drop for WriteBufAuto<'a, B> { + fn drop(&mut self) { + if let Strategy::Auto = self.inner.strategy { + if self.bytes_vec_called.get() { + self.inner.strategy = Strategy::Queue; + } else if self.bytes_called.get() { + trace!("detected no usage of vectored write, flattening"); + self.inner.strategy = Strategy::Flatten; + let mut vec = Vec::new(); + vec.put(&mut self.inner.buf); + self.inner.buf.bufs.push_back(VecOrBuf::Vec(Cursor::new(vec))); + } + } + } +} + + #[derive(Debug)] enum Strategy { + Auto, Flatten, Queue, } @@ -536,4 +606,117 @@ mod tests { buffered.flush().unwrap(); assert_eq!(buffered.io, b"hello"); } + + #[test] + fn write_buf_queue() { + extern crate pretty_env_logger; + let _ = pretty_env_logger::try_init(); + + let mock = AsyncIo::new_buf(vec![], 1024); + let mut buffered = Buffered::<_, Cursor>>::new(mock); + + buffered.write_buf_mut().extend(b"hello "); + buffered.buffer(Cursor::new(b"world, ".to_vec())); + buffered.write_buf_mut().extend(b"it's "); + buffered.buffer(Cursor::new(b"hyper!".to_vec())); + buffered.flush().unwrap(); + + assert_eq!(buffered.io, b"hello world, it's hyper!"); + assert_eq!(buffered.io.num_writes(), 1); + } + + #[test] + fn write_buf_reclaim_vec() { + extern crate pretty_env_logger; + let _ = pretty_env_logger::try_init(); + + let mock = AsyncIo::new_buf(vec![], 1024); + let mut buffered = Buffered::<_, Cursor>>::new(mock); + + buffered.write_buf_mut().extend(b"hello "); + assert_eq!(buffered.write_buf.buf.bufs.len(), 1); + buffered.write_buf_mut().extend(b"world, "); + assert_eq!(buffered.write_buf.buf.bufs.len(), 1); + + // after flushing, reclaim the Vec + buffered.flush().unwrap(); + assert_eq!(buffered.write_buf.remaining(), 0); + assert_eq!(buffered.write_buf.buf.bufs.len(), 1); + + // add a user buf in the way + buffered.buffer(Cursor::new(b"it's ".to_vec())); + // and then add more hyper bytes + buffered.write_buf_mut().extend(b"hyper!"); + buffered.flush().unwrap(); + assert_eq!(buffered.write_buf.buf.bufs.len(), 1); + + assert_eq!(buffered.io, b"hello world, it's hyper!"); + } + + #[test] + fn write_buf_flatten() { + extern crate pretty_env_logger; + let _ = pretty_env_logger::try_init(); + + let mock = AsyncIo::new_buf(vec![], 1024); + let mut buffered = Buffered::<_, Cursor>>::new(mock); + buffered.write_buf.set_strategy(Strategy::Flatten); + + buffered.write_buf_mut().extend(b"hello "); + buffered.buffer(Cursor::new(b"world, ".to_vec())); + buffered.write_buf_mut().extend(b"it's "); + buffered.buffer(Cursor::new(b"hyper!".to_vec())); + assert_eq!(buffered.write_buf.buf.bufs.len(), 1); + + buffered.flush().unwrap(); + + assert_eq!(buffered.io, b"hello world, it's hyper!"); + assert_eq!(buffered.io.num_writes(), 1); + assert_eq!(buffered.write_buf.buf.bufs.len(), 1); + } + + #[test] + fn write_buf_auto_flatten() { + extern crate pretty_env_logger; + let _ = pretty_env_logger::try_init(); + + let mut mock = AsyncIo::new_buf(vec![], 1024); + mock.max_read_vecs(0); // disable vectored IO + let mut buffered = Buffered::<_, Cursor>>::new(mock); + + // we have 4 buffers, but hope to detect that vectored IO isn't + // being used, and switch to flattening automatically, + // resulting in only 2 writes + buffered.write_buf_mut().extend(b"hello "); + buffered.buffer(Cursor::new(b"world, ".to_vec())); + buffered.write_buf_mut().extend(b"it's hyper!"); + //buffered.buffer(Cursor::new(b"hyper!".to_vec())); + buffered.flush().unwrap(); + + assert_eq!(buffered.io, b"hello world, it's hyper!"); + assert_eq!(buffered.io.num_writes(), 2); + assert_eq!(buffered.write_buf.buf.bufs.len(), 1); + } + + #[test] + fn write_buf_queue_does_not_auto() { + extern crate pretty_env_logger; + let _ = pretty_env_logger::try_init(); + + let mut mock = AsyncIo::new_buf(vec![], 1024); + mock.max_read_vecs(0); // disable vectored IO + let mut buffered = Buffered::<_, Cursor>>::new(mock); + buffered.write_buf.set_strategy(Strategy::Queue); + + // we have 4 buffers, and vec IO disabled, but explicitly said + // don't try to auto detect (via setting strategy above) + buffered.write_buf_mut().extend(b"hello "); + buffered.buffer(Cursor::new(b"world, ".to_vec())); + buffered.write_buf_mut().extend(b"it's "); + buffered.buffer(Cursor::new(b"hyper!".to_vec())); + buffered.flush().unwrap(); + + assert_eq!(buffered.io, b"hello world, it's hyper!"); + assert_eq!(buffered.io.num_writes(), 4); + } }