From 5323c2f39c908da724aaead1910f160d0b6b2ed1 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Wed, 30 May 2018 17:19:45 -0700 Subject: [PATCH] perf(h1): optimize write buffer when flattening --- src/proto/h1/conn.rs | 4 +- src/proto/h1/io.rs | 246 ++++++++++++++++--------------------------- 2 files changed, 93 insertions(+), 157 deletions(-) diff --git a/src/proto/h1/conn.rs b/src/proto/h1/conn.rs index f0309a9a..128bc582 100644 --- a/src/proto/h1/conn.rs +++ b/src/proto/h1/conn.rs @@ -168,7 +168,7 @@ where I: AsyncRead + AsyncWrite, self.state.busy(); if msg.expect_continue { let cont = b"HTTP/1.1 100 Continue\r\n\r\n"; - self.io.write_buf_mut().extend_from_slice(cont); + self.io.headers_buf().extend_from_slice(cont); } let wants_keep_alive = msg.keep_alive; self.state.keep_alive &= wants_keep_alive; @@ -397,7 +397,7 @@ where I: AsyncRead + AsyncWrite, self.enforce_version(&mut head); - let buf = self.io.write_buf_mut(); + let buf = self.io.headers_buf(); self.state.writing = match T::encode(Encode { head: &mut head, body, diff --git a/src/proto/h1/io.rs b/src/proto/h1/io.rs index 0e65545a..cbe751b8 100644 --- a/src/proto/h1/io.rs +++ b/src/proto/h1/io.rs @@ -86,7 +86,7 @@ where pub fn set_write_strategy_flatten(&mut self) { // this should always be called only at construction time, // so this assert is here to catch myself - debug_assert!(self.write_buf.buf.bufs.is_empty()); + debug_assert!(self.write_buf.queue.bufs.is_empty()); self.write_buf.set_strategy(Strategy::Flatten); } @@ -94,14 +94,8 @@ where self.read_buf.as_ref() } - //TODO(perf): don't return a `&mut Vec`, but a wrapper - //that protects the Vec when growing. Specifically, if this - //Vec couldn't be reset, as it's position isn't at the end, - //any new reserves will copy the bytes before the position, - //which is unnecessary. - pub fn write_buf_mut(&mut self) -> &mut Vec { - let buf = self.write_buf.head_mut(); - buf.maybe_reset(); + pub fn headers_buf(&mut self) -> &mut Vec { + let buf = self.write_buf.headers_mut(); &mut buf.bytes } @@ -192,6 +186,10 @@ where } else if self.write_buf.remaining() == 0 { try_nb!(self.io.flush()); } else { + match self.write_buf.strategy { + Strategy::Flatten => return self.flush_flattened(), + _ => (), + } loop { let n = try_ready!(self.io.write_buf(&mut self.write_buf.auto())); debug!("flushed {} bytes", n); @@ -206,6 +204,27 @@ where } Ok(Async::Ready(())) } + + /// Specialized version of `flush` when strategy is Flatten. + /// + /// Since all buffered bytes are flattened into the single headers buffer, + /// that skips some bookkeeping around using multiple buffers. + fn flush_flattened(&mut self) -> Poll<(), io::Error> { + loop { + let n = try_nb!(self.io.write(self.write_buf.headers.bytes())); + debug!("flushed {} bytes", n); + self.write_buf.headers.advance(n); + if self.write_buf.headers.remaining() == 0 { + self.write_buf.headers.reset(); + break; + } else if n == 0 { + trace!("write returned zero, but {} bytes remaining", self.write_buf.remaining()); + return Err(io::ErrorKind::WriteZero.into()) + } + } + try_nb!(self.io.flush()); + Ok(Async::Ready(())) + } } pub trait MemRead { @@ -249,17 +268,16 @@ impl> Cursor { #[inline] pub fn consume(&mut self, num: usize) { + debug_assert!(self.pos + num <= self.bytes.as_ref().len()); self.pos += num; } } impl Cursor> { - fn maybe_reset(&mut self) { - if self.pos != 0 && self.remaining() == 0 { - self.pos = 0; - unsafe { - self.bytes.set_len(0); - } + fn reset(&mut self) { + self.pos = 0; + unsafe { + self.bytes.set_len(0); } } } @@ -292,16 +310,20 @@ impl> Buf for Cursor { // an internal buffer to collect writes before flushes struct WriteBuf { - buf: BufDeque, + /// Re-usable buffer that holds message headers + headers: Cursor>, max_buf_size: usize, + /// Deque of user buffers if strategy is Queue + queue: BufDeque, strategy: Strategy, } impl WriteBuf { fn new() -> WriteBuf { WriteBuf { - buf: BufDeque::new(), + headers: Cursor::new(Vec::with_capacity(INIT_BUFFER_SIZE)), max_buf_size: DEFAULT_MAX_BUFFER_SIZE, + queue: BufDeque::new(), strategy: Strategy::Auto, } } @@ -322,14 +344,14 @@ where } fn buffer(&mut self, buf: B) { + debug_assert!(buf.has_remaining()); match self.strategy { Strategy::Flatten => { - let head = self.head_mut(); - head.maybe_reset(); + let head = self.headers_mut(); head.bytes.put(buf); }, Strategy::Auto | Strategy::Queue => { - self.buf.bufs.push_back(VecOrBuf::Buf(buf)); + self.queue.bufs.push_back(buf); }, } } @@ -340,30 +362,15 @@ where self.remaining() < self.max_buf_size }, Strategy::Auto | Strategy::Queue => { - self.buf.bufs.len() < MAX_BUF_LIST_BUFFERS + self.queue.bufs.len() < MAX_BUF_LIST_BUFFERS && self.remaining() < self.max_buf_size }, } } - fn head_mut(&mut self) -> &mut Cursor> { - // this dance is brought to you, The Borrow Checker! - - let reuse_back = if let Some(&VecOrBuf::Vec(_)) = self.buf.bufs.back() { - true - } else { - false - }; - - if !reuse_back { - let head_buf = Cursor::new(Vec::with_capacity(INIT_BUFFER_SIZE)); - self.buf.bufs.push_back(VecOrBuf::Vec(head_buf)); - } - if let Some(&mut VecOrBuf::Vec(ref mut v)) = self.buf.bufs.back_mut() { - v - } else { - unreachable!("head_buf just pushed on back"); - } + fn headers_mut(&mut self) -> &mut Cursor> { + debug_assert!(!self.queue.has_remaining()); + &mut self.headers } } @@ -379,22 +386,37 @@ impl fmt::Debug for WriteBuf { impl Buf for WriteBuf { #[inline] fn remaining(&self) -> usize { - self.buf.remaining() + self.headers.remaining() + self.queue.remaining() } #[inline] fn bytes(&self) -> &[u8] { - self.buf.bytes() + let headers = self.headers.bytes(); + if !headers.is_empty() { + headers + } else { + self.queue.bytes() + } } #[inline] fn advance(&mut self, cnt: usize) { - self.buf.advance(cnt) + let hrem = self.headers.remaining(); + if hrem == cnt { + self.headers.reset(); + } else if hrem > cnt { + self.headers.advance(cnt); + } else { + let qcnt = cnt - hrem; + self.headers.reset(); + self.queue.advance(qcnt); + } } #[inline] fn bytes_vec<'t>(&'t self, dst: &mut [&'t IoVec]) -> usize { - self.buf.bytes_vec(dst) + let n = self.headers.bytes_vec(dst); + self.queue.bytes_vec(&mut dst[n..]) + n } } @@ -448,9 +470,7 @@ impl<'a, B: Buf + 'a> Drop for WriteBufAuto<'a, B> { } else if self.bytes_called.get() { trace!("detected no usage of vectored write, flattening"); self.inner.strategy = Strategy::Flatten; - let mut vec = Vec::new(); - vec.put(&mut self.inner.buf); - self.inner.buf.bufs.push_back(VecOrBuf::Vec(Cursor::new(vec))); + self.inner.headers.bytes.put(&mut self.inner.queue); } } } @@ -464,59 +484,8 @@ enum Strategy { Queue, } -enum VecOrBuf { - Vec(Cursor>), - Buf(B), -} - -impl Buf for VecOrBuf { - #[inline] - fn remaining(&self) -> usize { - match *self { - VecOrBuf::Vec(ref v) => v.remaining(), - VecOrBuf::Buf(ref b) => b.remaining(), - } - } - - #[inline] - fn bytes(&self) -> &[u8] { - match *self { - VecOrBuf::Vec(ref v) => v.bytes(), - VecOrBuf::Buf(ref b) => b.bytes(), - } - } - - #[inline] - fn advance(&mut self, cnt: usize) { - match *self { - VecOrBuf::Vec(ref mut v) => v.advance(cnt), - VecOrBuf::Buf(ref mut b) => b.advance(cnt), - } - } - - #[inline] - fn bytes_vec<'t>(&'t self, dst: &mut [&'t IoVec]) -> usize { - match *self { - VecOrBuf::Vec(ref v) => { - if v.has_remaining() { - v.bytes_vec(dst) - } else { - 0 - } - }, - VecOrBuf::Buf(ref b) => { - if b.has_remaining() { - b.bytes_vec(dst) - } else { - 0 - } - }, - } - } -} - struct BufDeque { - bufs: VecDeque>, + bufs: VecDeque, } @@ -539,16 +508,13 @@ impl Buf for BufDeque { #[inline] fn bytes(&self) -> &[u8] { for buf in &self.bufs { - if buf.has_remaining() { - return buf.bytes(); - } + return buf.bytes(); } &[] } #[inline] fn advance(&mut self, mut cnt: usize) { - let mut maybe_reclaim = None; while cnt > 0 { { let front = &mut self.bufs[0]; @@ -561,12 +527,7 @@ impl Buf for BufDeque { cnt -= rem; } } - maybe_reclaim = self.bufs.pop_front(); - } - - if let Some(VecOrBuf::Vec(v)) = maybe_reclaim { - trace!("reclaiming write buf Vec"); - self.bufs.push_back(VecOrBuf::Vec(v)); + self.bufs.pop_front(); } } @@ -630,15 +591,12 @@ mod tests { } #[test] - fn write_buf_skips_empty_bufs() { - let mut mock = AsyncIo::new_buf(vec![], 1024); - mock.max_read_vecs(0); // disable vectored IO + #[should_panic] + fn write_buf_requires_non_empty_bufs() { + let mock = AsyncIo::new_buf(vec![], 1024); let mut buffered = Buffered::<_, Cursor>>::new(mock); buffered.buffer(Cursor::new(Vec::new())); - buffered.buffer(Cursor::new(b"hello".to_vec())); - buffered.flush().unwrap(); - assert_eq!(buffered.io, b"hello"); } #[test] @@ -649,42 +607,17 @@ mod tests { let mock = AsyncIo::new_buf(vec![], 1024); let mut buffered = Buffered::<_, Cursor>>::new(mock); - buffered.write_buf_mut().extend(b"hello "); + + buffered.headers_buf().extend(b"hello "); buffered.buffer(Cursor::new(b"world, ".to_vec())); - buffered.write_buf_mut().extend(b"it's "); + buffered.buffer(Cursor::new(b"it's ".to_vec())); buffered.buffer(Cursor::new(b"hyper!".to_vec())); + assert_eq!(buffered.write_buf.queue.bufs.len(), 3); buffered.flush().unwrap(); assert_eq!(buffered.io, b"hello world, it's hyper!"); assert_eq!(buffered.io.num_writes(), 1); - } - - #[test] - fn write_buf_reclaim_vec() { - extern crate pretty_env_logger; - let _ = pretty_env_logger::try_init(); - - let mock = AsyncIo::new_buf(vec![], 1024); - let mut buffered = Buffered::<_, Cursor>>::new(mock); - - buffered.write_buf_mut().extend(b"hello "); - assert_eq!(buffered.write_buf.buf.bufs.len(), 1); - buffered.write_buf_mut().extend(b"world, "); - assert_eq!(buffered.write_buf.buf.bufs.len(), 1); - - // after flushing, reclaim the Vec - buffered.flush().unwrap(); - assert_eq!(buffered.write_buf.remaining(), 0); - assert_eq!(buffered.write_buf.buf.bufs.len(), 1); - - // add a user buf in the way - buffered.buffer(Cursor::new(b"it's ".to_vec())); - // and then add more hyper bytes - buffered.write_buf_mut().extend(b"hyper!"); - buffered.flush().unwrap(); - assert_eq!(buffered.write_buf.buf.bufs.len(), 1); - - assert_eq!(buffered.io, b"hello world, it's hyper!"); + assert_eq!(buffered.write_buf.queue.bufs.len(), 0); } #[test] @@ -696,17 +629,16 @@ mod tests { let mut buffered = Buffered::<_, Cursor>>::new(mock); buffered.write_buf.set_strategy(Strategy::Flatten); - buffered.write_buf_mut().extend(b"hello "); + buffered.headers_buf().extend(b"hello "); buffered.buffer(Cursor::new(b"world, ".to_vec())); - buffered.write_buf_mut().extend(b"it's "); + buffered.buffer(Cursor::new(b"it's ".to_vec())); buffered.buffer(Cursor::new(b"hyper!".to_vec())); - assert_eq!(buffered.write_buf.buf.bufs.len(), 1); + assert_eq!(buffered.write_buf.queue.bufs.len(), 0); buffered.flush().unwrap(); assert_eq!(buffered.io, b"hello world, it's hyper!"); assert_eq!(buffered.io.num_writes(), 1); - assert_eq!(buffered.write_buf.buf.bufs.len(), 1); } #[test] @@ -721,19 +653,20 @@ mod tests { // we have 4 buffers, but hope to detect that vectored IO isn't // being used, and switch to flattening automatically, // resulting in only 2 writes - buffered.write_buf_mut().extend(b"hello "); + buffered.headers_buf().extend(b"hello "); buffered.buffer(Cursor::new(b"world, ".to_vec())); - buffered.write_buf_mut().extend(b"it's hyper!"); - //buffered.buffer(Cursor::new(b"hyper!".to_vec())); + buffered.buffer(Cursor::new(b"it's ".to_vec())); + buffered.buffer(Cursor::new(b"hyper!".to_vec())); + assert_eq!(buffered.write_buf.queue.bufs.len(), 3); buffered.flush().unwrap(); assert_eq!(buffered.io, b"hello world, it's hyper!"); assert_eq!(buffered.io.num_writes(), 2); - assert_eq!(buffered.write_buf.buf.bufs.len(), 1); + assert_eq!(buffered.write_buf.queue.bufs.len(), 0); } #[test] - fn write_buf_queue_does_not_auto() { + fn write_buf_queue_disable_auto() { extern crate pretty_env_logger; let _ = pretty_env_logger::try_init(); @@ -744,13 +677,16 @@ mod tests { // we have 4 buffers, and vec IO disabled, but explicitly said // don't try to auto detect (via setting strategy above) - buffered.write_buf_mut().extend(b"hello "); + + buffered.headers_buf().extend(b"hello "); buffered.buffer(Cursor::new(b"world, ".to_vec())); - buffered.write_buf_mut().extend(b"it's "); + buffered.buffer(Cursor::new(b"it's ".to_vec())); buffered.buffer(Cursor::new(b"hyper!".to_vec())); + assert_eq!(buffered.write_buf.queue.bufs.len(), 3); buffered.flush().unwrap(); assert_eq!(buffered.io, b"hello world, it's hyper!"); assert_eq!(buffered.io.num_writes(), 4); + assert_eq!(buffered.write_buf.queue.bufs.len(), 0); } }