re-enable vectored writes (#500)

Tokio's AsyncWrite trait once again has support for vectored writes in
Tokio 0.3.4 (see tokio-rs/tokio#3149.

This branch re-enables vectored writes in h2.

This change doesn't make all that big of a performance improvement in
Hyper's HTTP/2 benchmarks, but they use a BytesMut as the buffer.
With a buffer that turns into more IO vectors in bytes_vectored, there
might be a more noticeable performance improvement.

I spent a bit trying to refactor the flush logic to coalesce into fewer
writev calls with more buffers, but the current implementation seems
like about the best we're going to get without a bigger refactor. It's
basically the same as what h2 did previously, so it's probably fine.
This commit is contained in:
Eliza Weisman
2020-11-23 16:35:48 -08:00
committed by GitHub
parent 5a92f256c0
commit 73bf6a61ad
2 changed files with 31 additions and 20 deletions

View File

@@ -46,7 +46,7 @@ futures-core = { version = "0.3", default-features = false }
futures-sink = { version = "0.3", default-features = false } futures-sink = { version = "0.3", default-features = false }
futures-util = { version = "0.3", default-features = false } futures-util = { version = "0.3", default-features = false }
tokio-util = { version = "0.5", features = ["codec"] } tokio-util = { version = "0.5", features = ["codec"] }
tokio = { version = "0.3.2", features = ["io-util"] } tokio = { version = "0.3.4", features = ["io-util"] }
bytes = "0.6" bytes = "0.6"
http = "0.2" http = "0.2"
tracing = { version = "0.1.13", default-features = false, features = ["std", "log"] } tracing = { version = "0.1.13", default-features = false, features = ["std", "log"] }
@@ -68,7 +68,7 @@ serde = "1.0.0"
serde_json = "1.0.0" serde_json = "1.0.0"
# Examples # Examples
tokio = { version = "0.3.2", features = ["rt-multi-thread", "macros", "sync", "net"] } tokio = { version = "0.3.4", features = ["rt-multi-thread", "macros", "sync", "net"] }
env_logger = { version = "0.5.3", default-features = false } env_logger = { version = "0.5.3", default-features = false }
rustls = "0.18" rustls = "0.18"
tokio-rustls = "0.20.0" tokio-rustls = "0.20.0"

View File

@@ -3,12 +3,12 @@ use crate::codec::UserError::*;
use crate::frame::{self, Frame, FrameSize}; use crate::frame::{self, Frame, FrameSize};
use crate::hpack; use crate::hpack;
use bytes::{buf::BufMut, Buf, BytesMut}; use bytes::{Buf, BufMut, BytesMut};
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use std::io::{self, Cursor}; use std::io::{self, Cursor, IoSlice};
// A macro to get around a method needing to borrow &mut self // A macro to get around a method needing to borrow &mut self
macro_rules! limited_write_buf { macro_rules! limited_write_buf {
@@ -39,6 +39,9 @@ pub struct FramedWrite<T, B> {
/// Max frame size, this is specified by the peer /// Max frame size, this is specified by the peer
max_frame_size: FrameSize, max_frame_size: FrameSize,
/// Whether or not the wrapped `AsyncWrite` supports vectored IO.
is_write_vectored: bool,
} }
#[derive(Debug)] #[derive(Debug)]
@@ -68,6 +71,7 @@ where
B: Buf, B: Buf,
{ {
pub fn new(inner: T) -> FramedWrite<T, B> { pub fn new(inner: T) -> FramedWrite<T, B> {
let is_write_vectored = inner.is_write_vectored();
FramedWrite { FramedWrite {
inner, inner,
hpack: hpack::Encoder::default(), hpack: hpack::Encoder::default(),
@@ -75,6 +79,7 @@ where
next: None, next: None,
last_data_frame: None, last_data_frame: None,
max_frame_size: frame::DEFAULT_MAX_FRAME_SIZE, max_frame_size: frame::DEFAULT_MAX_FRAME_SIZE,
is_write_vectored,
} }
} }
@@ -182,6 +187,8 @@ where
/// Flush buffered data to the wire /// Flush buffered data to the wire
pub fn flush(&mut self, cx: &mut Context) -> Poll<io::Result<()>> { pub fn flush(&mut self, cx: &mut Context) -> Poll<io::Result<()>> {
const MAX_IOVS: usize = 64;
let span = tracing::trace_span!("FramedWrite::flush"); let span = tracing::trace_span!("FramedWrite::flush");
let _e = span.enter(); let _e = span.enter();
@@ -190,25 +197,29 @@ where
match self.next { match self.next {
Some(Next::Data(ref mut frame)) => { Some(Next::Data(ref mut frame)) => {
tracing::trace!(queued_data_frame = true); tracing::trace!(queued_data_frame = true);
let mut buf = (&mut self.buf).chain(frame.payload_mut());
if self.buf.has_remaining() { // TODO(eliza): when tokio-util 0.5.1 is released, this
let n = // could just use `poll_write_buf`...
ready!(Pin::new(&mut self.inner).poll_write(cx, self.buf.bytes()))?; let n = if self.is_write_vectored {
self.buf.advance(n); let mut bufs = [IoSlice::new(&[]); MAX_IOVS];
} let cnt = buf.bytes_vectored(&mut bufs);
ready!(Pin::new(&mut self.inner).poll_write_vectored(cx, &bufs[..cnt]))?
let buf = frame.payload_mut(); } else {
ready!(Pin::new(&mut self.inner).poll_write(cx, buf.bytes()))?
if !self.buf.has_remaining() && buf.has_remaining() { };
let n = ready!(Pin::new(&mut self.inner).poll_write(cx, buf.bytes()))?; buf.advance(n);
buf.advance(n);
}
} }
_ => { _ => {
tracing::trace!(queued_data_frame = false); tracing::trace!(queued_data_frame = false);
let n = ready!( let n = if self.is_write_vectored {
Pin::new(&mut self.inner).poll_write(cx, &mut self.buf.bytes()) let mut iovs = [IoSlice::new(&[]); MAX_IOVS];
)?; let cnt = self.buf.bytes_vectored(&mut iovs);
ready!(
Pin::new(&mut self.inner).poll_write_vectored(cx, &mut iovs[..cnt])
)?
} else {
ready!(Pin::new(&mut self.inner).poll_write(cx, &mut self.buf.bytes()))?
};
self.buf.advance(n); self.buf.advance(n);
} }
} }