perf(h1): convert buffer to flatten strategy with auto detection

This commit is contained in:
Sean McArthur
2018-02-06 14:35:47 -08:00
parent 3a124462c6
commit d2fdf1f525
2 changed files with 200 additions and 6 deletions

View File

@@ -70,6 +70,7 @@ pub struct AsyncIo<T> {
flushed: bool, flushed: bool,
inner: T, inner: T,
max_read_vecs: usize, max_read_vecs: usize,
num_writes: usize,
} }
impl<T> AsyncIo<T> { impl<T> AsyncIo<T> {
@@ -81,6 +82,7 @@ impl<T> AsyncIo<T> {
flushed: false, flushed: false,
inner: inner, inner: inner,
max_read_vecs: READ_VECS_CNT, max_read_vecs: READ_VECS_CNT,
num_writes: 0,
} }
} }
@@ -107,6 +109,10 @@ impl<T> AsyncIo<T> {
pub fn blocked(&self) -> bool { pub fn blocked(&self) -> bool {
self.blocked self.blocked
} }
pub fn num_writes(&self) -> usize {
self.num_writes
}
} }
impl AsyncIo<Buf> { impl AsyncIo<Buf> {
@@ -160,6 +166,7 @@ impl<T: Read> Read for AsyncIo<T> {
impl<T: Write> Write for AsyncIo<T> { impl<T: Write> Write for AsyncIo<T> {
fn write(&mut self, data: &[u8]) -> io::Result<usize> { fn write(&mut self, data: &[u8]) -> io::Result<usize> {
self.num_writes += 1;
if let Some(err) = self.error.take() { if let Some(err) = self.error.take() {
Err(err) Err(err)
} else if self.bytes_until_block == 0 { } else if self.bytes_until_block == 0 {
@@ -198,6 +205,9 @@ impl<T: Read + Write> AsyncWrite for AsyncIo<T> {
let i = ::bytes::Buf::bytes_vec(&buf, &mut bufs[..self.max_read_vecs]); let i = ::bytes::Buf::bytes_vec(&buf, &mut bufs[..self.max_read_vecs]);
let mut n = 0; let mut n = 0;
let mut ret = Ok(0); let mut ret = Ok(0);
// each call to write() will increase our count, but we assume
// that if iovecs are used, its really only 1 write call.
let num_writes = self.num_writes;
for iovec in &bufs[..i] { for iovec in &bufs[..i] {
match self.write(iovec) { match self.write(iovec) {
Ok(num) => { Ok(num) => {
@@ -216,6 +226,7 @@ impl<T: Read + Write> AsyncWrite for AsyncIo<T> {
} }
} }
} }
self.num_writes = num_writes + 1;
ret ret
}; };
match r { match r {

View File

@@ -1,3 +1,4 @@
use std::cell::Cell;
use std::collections::VecDeque; use std::collections::VecDeque;
use std::fmt; use std::fmt;
use std::io; use std::io;
@@ -55,7 +56,7 @@ where
self.write_buf.set_strategy(if enabled { self.write_buf.set_strategy(if enabled {
Strategy::Flatten Strategy::Flatten
} else { } else {
Strategy::Queue Strategy::Auto
}); });
} }
@@ -68,6 +69,11 @@ where
self.read_buf.as_ref() self.read_buf.as_ref()
} }
//TODO(perf): don't return a `&mut Vec<u8>`, but a wrapper
//that protects the Vec when growing. Specifically, if this
//Vec couldn't be reset, as it's position isn't at the end,
//any new reserves will copy the bytes before the position,
//which is unnecessary.
pub fn write_buf_mut(&mut self) -> &mut Vec<u8> { pub fn write_buf_mut(&mut self) -> &mut Vec<u8> {
let buf = self.write_buf.head_mut(); let buf = self.write_buf.head_mut();
buf.maybe_reset(); buf.maybe_reset();
@@ -154,7 +160,7 @@ where
try_nb!(self.io.flush()); try_nb!(self.io.flush());
} else { } else {
loop { loop {
let n = try_ready!(self.io.write_buf(&mut self.write_buf)); let n = try_ready!(self.io.write_buf(&mut self.write_buf.auto()));
debug!("flushed {} bytes", n); debug!("flushed {} bytes", n);
if self.write_buf.remaining() == 0 { if self.write_buf.remaining() == 0 {
break; break;
@@ -263,7 +269,7 @@ impl<B> WriteBuf<B> {
WriteBuf { WriteBuf {
buf: BufDeque::new(), buf: BufDeque::new(),
max_buf_size: DEFAULT_MAX_BUFFER_SIZE, max_buf_size: DEFAULT_MAX_BUFFER_SIZE,
strategy: Strategy::Queue, strategy: Strategy::Auto,
} }
} }
} }
@@ -277,6 +283,11 @@ where
self.strategy = strategy; self.strategy = strategy;
} }
#[inline]
fn auto(&mut self) -> WriteBufAuto<B> {
WriteBufAuto::new(self)
}
fn buffer(&mut self, buf: B) { fn buffer(&mut self, buf: B) {
match self.strategy { match self.strategy {
Strategy::Flatten => { Strategy::Flatten => {
@@ -284,7 +295,7 @@ where
head.maybe_reset(); head.maybe_reset();
head.bytes.put(buf); head.bytes.put(buf);
}, },
Strategy::Queue => { Strategy::Auto | Strategy::Queue => {
self.buf.bufs.push_back(VecOrBuf::Buf(buf)); self.buf.bufs.push_back(VecOrBuf::Buf(buf));
}, },
} }
@@ -295,8 +306,7 @@ where
Strategy::Flatten => { Strategy::Flatten => {
self.remaining() < self.max_buf_size self.remaining() < self.max_buf_size
}, },
Strategy::Queue => { Strategy::Auto | Strategy::Queue => {
// for now, the simplest of heuristics
self.buf.bufs.len() < MAX_BUF_LIST_BUFFERS self.buf.bufs.len() < MAX_BUF_LIST_BUFFERS
&& self.remaining() < self.max_buf_size && self.remaining() < self.max_buf_size
}, },
@@ -355,8 +365,68 @@ impl<B: Buf> Buf for WriteBuf<B> {
} }
} }
/// Detects when wrapped `WriteBuf` is used for vectored IO, and
/// adjusts the `WriteBuf` strategy if not.
struct WriteBufAuto<'a, B: Buf + 'a> {
bytes_called: Cell<bool>,
bytes_vec_called: Cell<bool>,
inner: &'a mut WriteBuf<B>,
}
impl<'a, B: Buf> WriteBufAuto<'a, B> {
fn new(inner: &'a mut WriteBuf<B>) -> WriteBufAuto<'a, B> {
WriteBufAuto {
bytes_called: Cell::new(false),
bytes_vec_called: Cell::new(false),
inner: inner,
}
}
}
impl<'a, B: Buf> Buf for WriteBufAuto<'a, B> {
#[inline]
fn remaining(&self) -> usize {
self.inner.remaining()
}
#[inline]
fn bytes(&self) -> &[u8] {
self.bytes_called.set(true);
self.inner.bytes()
}
#[inline]
fn advance(&mut self, cnt: usize) {
self.inner.advance(cnt)
}
#[inline]
fn bytes_vec<'t>(&'t self, dst: &mut [&'t IoVec]) -> usize {
self.bytes_vec_called.set(true);
self.inner.bytes_vec(dst)
}
}
impl<'a, B: Buf + 'a> Drop for WriteBufAuto<'a, B> {
fn drop(&mut self) {
if let Strategy::Auto = self.inner.strategy {
if self.bytes_vec_called.get() {
self.inner.strategy = Strategy::Queue;
} else if self.bytes_called.get() {
trace!("detected no usage of vectored write, flattening");
self.inner.strategy = Strategy::Flatten;
let mut vec = Vec::new();
vec.put(&mut self.inner.buf);
self.inner.buf.bufs.push_back(VecOrBuf::Vec(Cursor::new(vec)));
}
}
}
}
#[derive(Debug)] #[derive(Debug)]
enum Strategy { enum Strategy {
Auto,
Flatten, Flatten,
Queue, Queue,
} }
@@ -536,4 +606,117 @@ mod tests {
buffered.flush().unwrap(); buffered.flush().unwrap();
assert_eq!(buffered.io, b"hello"); assert_eq!(buffered.io, b"hello");
} }
#[test]
fn write_buf_queue() {
extern crate pretty_env_logger;
let _ = pretty_env_logger::try_init();
let mock = AsyncIo::new_buf(vec![], 1024);
let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(mock);
buffered.write_buf_mut().extend(b"hello ");
buffered.buffer(Cursor::new(b"world, ".to_vec()));
buffered.write_buf_mut().extend(b"it's ");
buffered.buffer(Cursor::new(b"hyper!".to_vec()));
buffered.flush().unwrap();
assert_eq!(buffered.io, b"hello world, it's hyper!");
assert_eq!(buffered.io.num_writes(), 1);
}
#[test]
fn write_buf_reclaim_vec() {
extern crate pretty_env_logger;
let _ = pretty_env_logger::try_init();
let mock = AsyncIo::new_buf(vec![], 1024);
let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(mock);
buffered.write_buf_mut().extend(b"hello ");
assert_eq!(buffered.write_buf.buf.bufs.len(), 1);
buffered.write_buf_mut().extend(b"world, ");
assert_eq!(buffered.write_buf.buf.bufs.len(), 1);
// after flushing, reclaim the Vec
buffered.flush().unwrap();
assert_eq!(buffered.write_buf.remaining(), 0);
assert_eq!(buffered.write_buf.buf.bufs.len(), 1);
// add a user buf in the way
buffered.buffer(Cursor::new(b"it's ".to_vec()));
// and then add more hyper bytes
buffered.write_buf_mut().extend(b"hyper!");
buffered.flush().unwrap();
assert_eq!(buffered.write_buf.buf.bufs.len(), 1);
assert_eq!(buffered.io, b"hello world, it's hyper!");
}
#[test]
fn write_buf_flatten() {
extern crate pretty_env_logger;
let _ = pretty_env_logger::try_init();
let mock = AsyncIo::new_buf(vec![], 1024);
let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(mock);
buffered.write_buf.set_strategy(Strategy::Flatten);
buffered.write_buf_mut().extend(b"hello ");
buffered.buffer(Cursor::new(b"world, ".to_vec()));
buffered.write_buf_mut().extend(b"it's ");
buffered.buffer(Cursor::new(b"hyper!".to_vec()));
assert_eq!(buffered.write_buf.buf.bufs.len(), 1);
buffered.flush().unwrap();
assert_eq!(buffered.io, b"hello world, it's hyper!");
assert_eq!(buffered.io.num_writes(), 1);
assert_eq!(buffered.write_buf.buf.bufs.len(), 1);
}
#[test]
fn write_buf_auto_flatten() {
extern crate pretty_env_logger;
let _ = pretty_env_logger::try_init();
let mut mock = AsyncIo::new_buf(vec![], 1024);
mock.max_read_vecs(0); // disable vectored IO
let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(mock);
// we have 4 buffers, but hope to detect that vectored IO isn't
// being used, and switch to flattening automatically,
// resulting in only 2 writes
buffered.write_buf_mut().extend(b"hello ");
buffered.buffer(Cursor::new(b"world, ".to_vec()));
buffered.write_buf_mut().extend(b"it's hyper!");
//buffered.buffer(Cursor::new(b"hyper!".to_vec()));
buffered.flush().unwrap();
assert_eq!(buffered.io, b"hello world, it's hyper!");
assert_eq!(buffered.io.num_writes(), 2);
assert_eq!(buffered.write_buf.buf.bufs.len(), 1);
}
#[test]
fn write_buf_queue_does_not_auto() {
extern crate pretty_env_logger;
let _ = pretty_env_logger::try_init();
let mut mock = AsyncIo::new_buf(vec![], 1024);
mock.max_read_vecs(0); // disable vectored IO
let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(mock);
buffered.write_buf.set_strategy(Strategy::Queue);
// we have 4 buffers, and vec IO disabled, but explicitly said
// don't try to auto detect (via setting strategy above)
buffered.write_buf_mut().extend(b"hello ");
buffered.buffer(Cursor::new(b"world, ".to_vec()));
buffered.write_buf_mut().extend(b"it's ");
buffered.buffer(Cursor::new(b"hyper!".to_vec()));
buffered.flush().unwrap();
assert_eq!(buffered.io, b"hello world, it's hyper!");
assert_eq!(buffered.io.num_writes(), 4);
}
} }