perf(h1): optimize write buffer when flattening

This commit is contained in:
Sean McArthur
2018-05-30 17:19:45 -07:00
parent bfb2ab8644
commit 5323c2f39c
2 changed files with 93 additions and 157 deletions

View File

@@ -168,7 +168,7 @@ where I: AsyncRead + AsyncWrite,
self.state.busy();
if msg.expect_continue {
let cont = b"HTTP/1.1 100 Continue\r\n\r\n";
self.io.write_buf_mut().extend_from_slice(cont);
self.io.headers_buf().extend_from_slice(cont);
}
let wants_keep_alive = msg.keep_alive;
self.state.keep_alive &= wants_keep_alive;
@@ -397,7 +397,7 @@ where I: AsyncRead + AsyncWrite,
self.enforce_version(&mut head);
let buf = self.io.write_buf_mut();
let buf = self.io.headers_buf();
self.state.writing = match T::encode(Encode {
head: &mut head,
body,

View File

@@ -86,7 +86,7 @@ where
pub fn set_write_strategy_flatten(&mut self) {
// this should always be called only at construction time,
// so this assert is here to catch myself
debug_assert!(self.write_buf.buf.bufs.is_empty());
debug_assert!(self.write_buf.queue.bufs.is_empty());
self.write_buf.set_strategy(Strategy::Flatten);
}
@@ -94,14 +94,8 @@ where
self.read_buf.as_ref()
}
//TODO(perf): don't return a `&mut Vec<u8>`, but a wrapper
//that protects the Vec when growing. Specifically, if this
//Vec couldn't be reset, as it's position isn't at the end,
//any new reserves will copy the bytes before the position,
//which is unnecessary.
pub fn write_buf_mut(&mut self) -> &mut Vec<u8> {
let buf = self.write_buf.head_mut();
buf.maybe_reset();
pub fn headers_buf(&mut self) -> &mut Vec<u8> {
let buf = self.write_buf.headers_mut();
&mut buf.bytes
}
@@ -192,6 +186,10 @@ where
} else if self.write_buf.remaining() == 0 {
try_nb!(self.io.flush());
} else {
match self.write_buf.strategy {
Strategy::Flatten => return self.flush_flattened(),
_ => (),
}
loop {
let n = try_ready!(self.io.write_buf(&mut self.write_buf.auto()));
debug!("flushed {} bytes", n);
@@ -206,6 +204,27 @@ where
}
Ok(Async::Ready(()))
}
/// Specialized version of `flush` when strategy is Flatten.
///
/// Since all buffered bytes are flattened into the single headers buffer,
/// that skips some bookkeeping around using multiple buffers.
fn flush_flattened(&mut self) -> Poll<(), io::Error> {
loop {
let n = try_nb!(self.io.write(self.write_buf.headers.bytes()));
debug!("flushed {} bytes", n);
self.write_buf.headers.advance(n);
if self.write_buf.headers.remaining() == 0 {
self.write_buf.headers.reset();
break;
} else if n == 0 {
trace!("write returned zero, but {} bytes remaining", self.write_buf.remaining());
return Err(io::ErrorKind::WriteZero.into())
}
}
try_nb!(self.io.flush());
Ok(Async::Ready(()))
}
}
pub trait MemRead {
@@ -249,17 +268,16 @@ impl<T: AsRef<[u8]>> Cursor<T> {
#[inline]
pub fn consume(&mut self, num: usize) {
debug_assert!(self.pos + num <= self.bytes.as_ref().len());
self.pos += num;
}
}
impl Cursor<Vec<u8>> {
fn maybe_reset(&mut self) {
if self.pos != 0 && self.remaining() == 0 {
self.pos = 0;
unsafe {
self.bytes.set_len(0);
}
fn reset(&mut self) {
self.pos = 0;
unsafe {
self.bytes.set_len(0);
}
}
}
@@ -292,16 +310,20 @@ impl<T: AsRef<[u8]>> Buf for Cursor<T> {
// an internal buffer to collect writes before flushes
struct WriteBuf<B> {
buf: BufDeque<B>,
/// Re-usable buffer that holds message headers
headers: Cursor<Vec<u8>>,
max_buf_size: usize,
/// Deque of user buffers if strategy is Queue
queue: BufDeque<B>,
strategy: Strategy,
}
impl<B> WriteBuf<B> {
fn new() -> WriteBuf<B> {
WriteBuf {
buf: BufDeque::new(),
headers: Cursor::new(Vec::with_capacity(INIT_BUFFER_SIZE)),
max_buf_size: DEFAULT_MAX_BUFFER_SIZE,
queue: BufDeque::new(),
strategy: Strategy::Auto,
}
}
@@ -322,14 +344,14 @@ where
}
fn buffer(&mut self, buf: B) {
debug_assert!(buf.has_remaining());
match self.strategy {
Strategy::Flatten => {
let head = self.head_mut();
head.maybe_reset();
let head = self.headers_mut();
head.bytes.put(buf);
},
Strategy::Auto | Strategy::Queue => {
self.buf.bufs.push_back(VecOrBuf::Buf(buf));
self.queue.bufs.push_back(buf);
},
}
}
@@ -340,30 +362,15 @@ where
self.remaining() < self.max_buf_size
},
Strategy::Auto | Strategy::Queue => {
self.buf.bufs.len() < MAX_BUF_LIST_BUFFERS
self.queue.bufs.len() < MAX_BUF_LIST_BUFFERS
&& self.remaining() < self.max_buf_size
},
}
}
fn head_mut(&mut self) -> &mut Cursor<Vec<u8>> {
// this dance is brought to you, The Borrow Checker!
let reuse_back = if let Some(&VecOrBuf::Vec(_)) = self.buf.bufs.back() {
true
} else {
false
};
if !reuse_back {
let head_buf = Cursor::new(Vec::with_capacity(INIT_BUFFER_SIZE));
self.buf.bufs.push_back(VecOrBuf::Vec(head_buf));
}
if let Some(&mut VecOrBuf::Vec(ref mut v)) = self.buf.bufs.back_mut() {
v
} else {
unreachable!("head_buf just pushed on back");
}
fn headers_mut(&mut self) -> &mut Cursor<Vec<u8>> {
debug_assert!(!self.queue.has_remaining());
&mut self.headers
}
}
@@ -379,22 +386,37 @@ impl<B: Buf> fmt::Debug for WriteBuf<B> {
impl<B: Buf> Buf for WriteBuf<B> {
#[inline]
fn remaining(&self) -> usize {
self.buf.remaining()
self.headers.remaining() + self.queue.remaining()
}
#[inline]
fn bytes(&self) -> &[u8] {
self.buf.bytes()
let headers = self.headers.bytes();
if !headers.is_empty() {
headers
} else {
self.queue.bytes()
}
}
#[inline]
fn advance(&mut self, cnt: usize) {
self.buf.advance(cnt)
let hrem = self.headers.remaining();
if hrem == cnt {
self.headers.reset();
} else if hrem > cnt {
self.headers.advance(cnt);
} else {
let qcnt = cnt - hrem;
self.headers.reset();
self.queue.advance(qcnt);
}
}
#[inline]
fn bytes_vec<'t>(&'t self, dst: &mut [&'t IoVec]) -> usize {
self.buf.bytes_vec(dst)
let n = self.headers.bytes_vec(dst);
self.queue.bytes_vec(&mut dst[n..]) + n
}
}
@@ -448,9 +470,7 @@ impl<'a, B: Buf + 'a> Drop for WriteBufAuto<'a, B> {
} else if self.bytes_called.get() {
trace!("detected no usage of vectored write, flattening");
self.inner.strategy = Strategy::Flatten;
let mut vec = Vec::new();
vec.put(&mut self.inner.buf);
self.inner.buf.bufs.push_back(VecOrBuf::Vec(Cursor::new(vec)));
self.inner.headers.bytes.put(&mut self.inner.queue);
}
}
}
@@ -464,59 +484,8 @@ enum Strategy {
Queue,
}
enum VecOrBuf<B> {
Vec(Cursor<Vec<u8>>),
Buf(B),
}
impl<B: Buf> Buf for VecOrBuf<B> {
#[inline]
fn remaining(&self) -> usize {
match *self {
VecOrBuf::Vec(ref v) => v.remaining(),
VecOrBuf::Buf(ref b) => b.remaining(),
}
}
#[inline]
fn bytes(&self) -> &[u8] {
match *self {
VecOrBuf::Vec(ref v) => v.bytes(),
VecOrBuf::Buf(ref b) => b.bytes(),
}
}
#[inline]
fn advance(&mut self, cnt: usize) {
match *self {
VecOrBuf::Vec(ref mut v) => v.advance(cnt),
VecOrBuf::Buf(ref mut b) => b.advance(cnt),
}
}
#[inline]
fn bytes_vec<'t>(&'t self, dst: &mut [&'t IoVec]) -> usize {
match *self {
VecOrBuf::Vec(ref v) => {
if v.has_remaining() {
v.bytes_vec(dst)
} else {
0
}
},
VecOrBuf::Buf(ref b) => {
if b.has_remaining() {
b.bytes_vec(dst)
} else {
0
}
},
}
}
}
struct BufDeque<T> {
bufs: VecDeque<VecOrBuf<T>>,
bufs: VecDeque<T>,
}
@@ -539,16 +508,13 @@ impl<T: Buf> Buf for BufDeque<T> {
#[inline]
fn bytes(&self) -> &[u8] {
for buf in &self.bufs {
if buf.has_remaining() {
return buf.bytes();
}
return buf.bytes();
}
&[]
}
#[inline]
fn advance(&mut self, mut cnt: usize) {
let mut maybe_reclaim = None;
while cnt > 0 {
{
let front = &mut self.bufs[0];
@@ -561,12 +527,7 @@ impl<T: Buf> Buf for BufDeque<T> {
cnt -= rem;
}
}
maybe_reclaim = self.bufs.pop_front();
}
if let Some(VecOrBuf::Vec(v)) = maybe_reclaim {
trace!("reclaiming write buf Vec");
self.bufs.push_back(VecOrBuf::Vec(v));
self.bufs.pop_front();
}
}
@@ -630,15 +591,12 @@ mod tests {
}
#[test]
fn write_buf_skips_empty_bufs() {
let mut mock = AsyncIo::new_buf(vec![], 1024);
mock.max_read_vecs(0); // disable vectored IO
#[should_panic]
fn write_buf_requires_non_empty_bufs() {
let mock = AsyncIo::new_buf(vec![], 1024);
let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(mock);
buffered.buffer(Cursor::new(Vec::new()));
buffered.buffer(Cursor::new(b"hello".to_vec()));
buffered.flush().unwrap();
assert_eq!(buffered.io, b"hello");
}
#[test]
@@ -649,42 +607,17 @@ mod tests {
let mock = AsyncIo::new_buf(vec![], 1024);
let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(mock);
buffered.write_buf_mut().extend(b"hello ");
buffered.headers_buf().extend(b"hello ");
buffered.buffer(Cursor::new(b"world, ".to_vec()));
buffered.write_buf_mut().extend(b"it's ");
buffered.buffer(Cursor::new(b"it's ".to_vec()));
buffered.buffer(Cursor::new(b"hyper!".to_vec()));
assert_eq!(buffered.write_buf.queue.bufs.len(), 3);
buffered.flush().unwrap();
assert_eq!(buffered.io, b"hello world, it's hyper!");
assert_eq!(buffered.io.num_writes(), 1);
}
#[test]
fn write_buf_reclaim_vec() {
extern crate pretty_env_logger;
let _ = pretty_env_logger::try_init();
let mock = AsyncIo::new_buf(vec![], 1024);
let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(mock);
buffered.write_buf_mut().extend(b"hello ");
assert_eq!(buffered.write_buf.buf.bufs.len(), 1);
buffered.write_buf_mut().extend(b"world, ");
assert_eq!(buffered.write_buf.buf.bufs.len(), 1);
// after flushing, reclaim the Vec
buffered.flush().unwrap();
assert_eq!(buffered.write_buf.remaining(), 0);
assert_eq!(buffered.write_buf.buf.bufs.len(), 1);
// add a user buf in the way
buffered.buffer(Cursor::new(b"it's ".to_vec()));
// and then add more hyper bytes
buffered.write_buf_mut().extend(b"hyper!");
buffered.flush().unwrap();
assert_eq!(buffered.write_buf.buf.bufs.len(), 1);
assert_eq!(buffered.io, b"hello world, it's hyper!");
assert_eq!(buffered.write_buf.queue.bufs.len(), 0);
}
#[test]
@@ -696,17 +629,16 @@ mod tests {
let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(mock);
buffered.write_buf.set_strategy(Strategy::Flatten);
buffered.write_buf_mut().extend(b"hello ");
buffered.headers_buf().extend(b"hello ");
buffered.buffer(Cursor::new(b"world, ".to_vec()));
buffered.write_buf_mut().extend(b"it's ");
buffered.buffer(Cursor::new(b"it's ".to_vec()));
buffered.buffer(Cursor::new(b"hyper!".to_vec()));
assert_eq!(buffered.write_buf.buf.bufs.len(), 1);
assert_eq!(buffered.write_buf.queue.bufs.len(), 0);
buffered.flush().unwrap();
assert_eq!(buffered.io, b"hello world, it's hyper!");
assert_eq!(buffered.io.num_writes(), 1);
assert_eq!(buffered.write_buf.buf.bufs.len(), 1);
}
#[test]
@@ -721,19 +653,20 @@ mod tests {
// we have 4 buffers, but hope to detect that vectored IO isn't
// being used, and switch to flattening automatically,
// resulting in only 2 writes
buffered.write_buf_mut().extend(b"hello ");
buffered.headers_buf().extend(b"hello ");
buffered.buffer(Cursor::new(b"world, ".to_vec()));
buffered.write_buf_mut().extend(b"it's hyper!");
//buffered.buffer(Cursor::new(b"hyper!".to_vec()));
buffered.buffer(Cursor::new(b"it's ".to_vec()));
buffered.buffer(Cursor::new(b"hyper!".to_vec()));
assert_eq!(buffered.write_buf.queue.bufs.len(), 3);
buffered.flush().unwrap();
assert_eq!(buffered.io, b"hello world, it's hyper!");
assert_eq!(buffered.io.num_writes(), 2);
assert_eq!(buffered.write_buf.buf.bufs.len(), 1);
assert_eq!(buffered.write_buf.queue.bufs.len(), 0);
}
#[test]
fn write_buf_queue_does_not_auto() {
fn write_buf_queue_disable_auto() {
extern crate pretty_env_logger;
let _ = pretty_env_logger::try_init();
@@ -744,13 +677,16 @@ mod tests {
// we have 4 buffers, and vec IO disabled, but explicitly said
// don't try to auto detect (via setting strategy above)
buffered.write_buf_mut().extend(b"hello ");
buffered.headers_buf().extend(b"hello ");
buffered.buffer(Cursor::new(b"world, ".to_vec()));
buffered.write_buf_mut().extend(b"it's ");
buffered.buffer(Cursor::new(b"it's ".to_vec()));
buffered.buffer(Cursor::new(b"hyper!".to_vec()));
assert_eq!(buffered.write_buf.queue.bufs.len(), 3);
buffered.flush().unwrap();
assert_eq!(buffered.io, b"hello world, it's hyper!");
assert_eq!(buffered.io.num_writes(), 4);
assert_eq!(buffered.write_buf.queue.bufs.len(), 0);
}
}