perf(lib): re-enable writev support (#2338)
Tokio's `AsyncWrite` trait once again has support for vectored writes in Tokio 0.3.4 (see tokio-rs/tokio#3149). This branch re-enables vectored writes in Hyper for HTTP/1. Using vectored writes in HTTP/2 will require an upstream change in the `h2` crate as well. I've removed the adaptive write buffer implementation that attempts to detect whether vectored IO is or is not available, since the Tokio 0.3.4 `AsyncWrite` trait exposes this directly via the `is_write_vectored` method. Now, we just ask the IO whether or not it supports vectored writes, and configure the buffer accordingly. This makes the implementation somewhat simpler. This also removes `http1_writev()` methods from the builders. These are no longer necessary, as Hyper can now determine whether or not to use vectored writes based on `is_write_vectored`, rather than trying to auto-detect it. Closes #2320 BREAKING CHANGE: Removed `http1_writev` methods from `client::Builder`, `client::conn::Builder`, `server::Builder`, and `server::conn::Builder`. Vectored writes are now enabled based on whether the `AsyncWrite` implementation in use supports them, rather than though adaptive detection. To explicitly disable vectored writes, users may wrap the IO in a newtype that implements `AsyncRead` and `AsyncWrite` and returns `false` from its `AsyncWrite::is_write_vectored` method.
This commit is contained in:
		| @@ -71,14 +71,6 @@ where | ||||
|         self.io.set_read_buf_exact_size(sz); | ||||
|     } | ||||
|  | ||||
|     pub fn set_write_strategy_flatten(&mut self) { | ||||
|         self.io.set_write_strategy_flatten(); | ||||
|     } | ||||
|  | ||||
|     pub fn set_write_strategy_queue(&mut self) { | ||||
|         self.io.set_write_strategy_queue(); | ||||
|     } | ||||
|  | ||||
|     #[cfg(feature = "client")] | ||||
|     pub fn set_title_case_headers(&mut self) { | ||||
|         self.state.title_case_headers = true; | ||||
|   | ||||
| @@ -1,4 +1,3 @@ | ||||
| use std::cell::Cell; | ||||
| use std::cmp; | ||||
| use std::fmt; | ||||
| use std::io::{self, IoSlice}; | ||||
| @@ -57,13 +56,14 @@ where | ||||
|     B: Buf, | ||||
| { | ||||
|     pub fn new(io: T) -> Buffered<T, B> { | ||||
|         let write_buf = WriteBuf::new(&io); | ||||
|         Buffered { | ||||
|             flush_pipeline: false, | ||||
|             io, | ||||
|             read_blocked: false, | ||||
|             read_buf: BytesMut::with_capacity(0), | ||||
|             read_buf_strategy: ReadStrategy::default(), | ||||
|             write_buf: WriteBuf::new(), | ||||
|             write_buf, | ||||
|         } | ||||
|     } | ||||
|  | ||||
| @@ -98,13 +98,6 @@ where | ||||
|         self.write_buf.set_strategy(WriteStrategy::Flatten); | ||||
|     } | ||||
|  | ||||
|     pub fn set_write_strategy_queue(&mut self) { | ||||
|         // this should always be called only at construction time, | ||||
|         // so this assert is here to catch myself | ||||
|         debug_assert!(self.write_buf.queue.bufs_cnt() == 0); | ||||
|         self.write_buf.set_strategy(WriteStrategy::Queue); | ||||
|     } | ||||
|  | ||||
|     pub fn read_buf(&self) -> &[u8] { | ||||
|         self.read_buf.as_ref() | ||||
|     } | ||||
| @@ -237,13 +230,13 @@ where | ||||
|             if let WriteStrategy::Flatten = self.write_buf.strategy { | ||||
|                 return self.poll_flush_flattened(cx); | ||||
|             } | ||||
|  | ||||
|             loop { | ||||
|                 // TODO(eliza): this basically ignores all of `WriteBuf`...put | ||||
|                 // back vectored IO and `poll_write_buf` when the appropriate Tokio | ||||
|                 // changes land... | ||||
|                 let n = ready!(Pin::new(&mut self.io) | ||||
|                     // .poll_write_buf(cx, &mut self.write_buf.auto()))?; | ||||
|                     .poll_write(cx, self.write_buf.auto().bytes()))?; | ||||
|                 let n = { | ||||
|                     let mut iovs = [IoSlice::new(&[]); crate::common::io::MAX_WRITEV_BUFS]; | ||||
|                     let len = self.write_buf.bytes_vectored(&mut iovs); | ||||
|                     ready!(Pin::new(&mut self.io).poll_write_vectored(cx, &iovs[..len]))? | ||||
|                 }; | ||||
|                 // TODO(eliza): we have to do this manually because | ||||
|                 // `poll_write_buf` doesn't exist in Tokio 0.3 yet...when | ||||
|                 // `poll_write_buf` comes back, the manual advance will need to leave! | ||||
| @@ -462,12 +455,17 @@ pub(super) struct WriteBuf<B> { | ||||
| } | ||||
|  | ||||
| impl<B: Buf> WriteBuf<B> { | ||||
|     fn new() -> WriteBuf<B> { | ||||
|     fn new(io: &impl AsyncWrite) -> WriteBuf<B> { | ||||
|         let strategy = if io.is_write_vectored() { | ||||
|             WriteStrategy::Queue | ||||
|         } else { | ||||
|             WriteStrategy::Flatten | ||||
|         }; | ||||
|         WriteBuf { | ||||
|             headers: Cursor::new(Vec::with_capacity(INIT_BUFFER_SIZE)), | ||||
|             max_buf_size: DEFAULT_MAX_BUFFER_SIZE, | ||||
|             queue: BufList::new(), | ||||
|             strategy: WriteStrategy::Auto, | ||||
|             strategy, | ||||
|         } | ||||
|     } | ||||
| } | ||||
| @@ -480,12 +478,6 @@ where | ||||
|         self.strategy = strategy; | ||||
|     } | ||||
|  | ||||
|     // TODO(eliza): put back writev! | ||||
|     #[inline] | ||||
|     fn auto(&mut self) -> WriteBufAuto<'_, B> { | ||||
|         WriteBufAuto::new(self) | ||||
|     } | ||||
|  | ||||
|     pub(super) fn buffer<BB: Buf + Into<B>>(&mut self, mut buf: BB) { | ||||
|         debug_assert!(buf.has_remaining()); | ||||
|         match self.strategy { | ||||
| @@ -505,7 +497,7 @@ where | ||||
|                     buf.advance(adv); | ||||
|                 } | ||||
|             } | ||||
|             WriteStrategy::Auto | WriteStrategy::Queue => { | ||||
|             WriteStrategy::Queue => { | ||||
|                 self.queue.push(buf.into()); | ||||
|             } | ||||
|         } | ||||
| @@ -514,7 +506,7 @@ where | ||||
|     fn can_buffer(&self) -> bool { | ||||
|         match self.strategy { | ||||
|             WriteStrategy::Flatten => self.remaining() < self.max_buf_size, | ||||
|             WriteStrategy::Auto | WriteStrategy::Queue => { | ||||
|             WriteStrategy::Queue => { | ||||
|                 self.queue.bufs_cnt() < MAX_BUF_LIST_BUFFERS && self.remaining() < self.max_buf_size | ||||
|             } | ||||
|         } | ||||
| @@ -573,65 +565,8 @@ impl<B: Buf> Buf for WriteBuf<B> { | ||||
|     } | ||||
| } | ||||
|  | ||||
| /// Detects when wrapped `WriteBuf` is used for vectored IO, and | ||||
| /// adjusts the `WriteBuf` strategy if not. | ||||
| struct WriteBufAuto<'a, B: Buf> { | ||||
|     bytes_called: Cell<bool>, | ||||
|     bytes_vec_called: Cell<bool>, | ||||
|     inner: &'a mut WriteBuf<B>, | ||||
| } | ||||
|  | ||||
| impl<'a, B: Buf> WriteBufAuto<'a, B> { | ||||
|     fn new(inner: &'a mut WriteBuf<B>) -> WriteBufAuto<'a, B> { | ||||
|         WriteBufAuto { | ||||
|             bytes_called: Cell::new(false), | ||||
|             bytes_vec_called: Cell::new(false), | ||||
|             inner, | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl<'a, B: Buf> Buf for WriteBufAuto<'a, B> { | ||||
|     #[inline] | ||||
|     fn remaining(&self) -> usize { | ||||
|         self.inner.remaining() | ||||
|     } | ||||
|  | ||||
|     #[inline] | ||||
|     fn bytes(&self) -> &[u8] { | ||||
|         self.bytes_called.set(true); | ||||
|         self.inner.bytes() | ||||
|     } | ||||
|  | ||||
|     #[inline] | ||||
|     fn advance(&mut self, cnt: usize) { | ||||
|         self.inner.advance(cnt) | ||||
|     } | ||||
|  | ||||
|     #[inline] | ||||
|     fn bytes_vectored<'t>(&'t self, dst: &mut [IoSlice<'t>]) -> usize { | ||||
|         self.bytes_vec_called.set(true); | ||||
|         self.inner.bytes_vectored(dst) | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl<'a, B: Buf + 'a> Drop for WriteBufAuto<'a, B> { | ||||
|     fn drop(&mut self) { | ||||
|         if let WriteStrategy::Auto = self.inner.strategy { | ||||
|             if self.bytes_vec_called.get() { | ||||
|                 self.inner.strategy = WriteStrategy::Queue; | ||||
|             } else if self.bytes_called.get() { | ||||
|                 trace!("detected no usage of vectored write, flattening"); | ||||
|                 self.inner.strategy = WriteStrategy::Flatten; | ||||
|                 self.inner.headers.bytes.put(&mut self.inner.queue); | ||||
|             } | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| #[derive(Debug)] | ||||
| enum WriteStrategy { | ||||
|     Auto, | ||||
|     Flatten, | ||||
|     Queue, | ||||
| } | ||||
| @@ -643,8 +578,8 @@ mod tests { | ||||
|  | ||||
|     use tokio_test::io::Builder as Mock; | ||||
|  | ||||
|     #[cfg(feature = "nightly")] | ||||
|     use test::Bencher; | ||||
|     // #[cfg(feature = "nightly")] | ||||
|     // use test::Bencher; | ||||
|  | ||||
|     /* | ||||
|     impl<T: Read> MemRead for AsyncIo<T> { | ||||
| @@ -873,33 +808,6 @@ mod tests { | ||||
|         buffered.flush().await.expect("flush"); | ||||
|     } | ||||
|  | ||||
|     #[tokio::test] | ||||
|     async fn write_buf_auto_flatten() { | ||||
|         let _ = pretty_env_logger::try_init(); | ||||
|  | ||||
|         let mock = Mock::new() | ||||
|             // Expects write_buf to only consume first buffer | ||||
|             .write(b"hello ") | ||||
|             // And then the Auto strategy will have flattened | ||||
|             .write(b"world, it's hyper!") | ||||
|             .build(); | ||||
|  | ||||
|         let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(mock); | ||||
|  | ||||
|         // we have 4 buffers, but hope to detect that vectored IO isn't | ||||
|         // being used, and switch to flattening automatically, | ||||
|         // resulting in only 2 writes | ||||
|         buffered.headers_buf().extend(b"hello "); | ||||
|         buffered.buffer(Cursor::new(b"world, ".to_vec())); | ||||
|         buffered.buffer(Cursor::new(b"it's ".to_vec())); | ||||
|         buffered.buffer(Cursor::new(b"hyper!".to_vec())); | ||||
|         assert_eq!(buffered.write_buf.queue.bufs_cnt(), 3); | ||||
|  | ||||
|         buffered.flush().await.expect("flush"); | ||||
|  | ||||
|         assert_eq!(buffered.write_buf.queue.bufs_cnt(), 0); | ||||
|     } | ||||
|  | ||||
|     #[tokio::test] | ||||
|     async fn write_buf_queue_disable_auto() { | ||||
|         let _ = pretty_env_logger::try_init(); | ||||
| @@ -928,19 +836,19 @@ mod tests { | ||||
|         assert_eq!(buffered.write_buf.queue.bufs_cnt(), 0); | ||||
|     } | ||||
|  | ||||
|     #[cfg(feature = "nightly")] | ||||
|     #[bench] | ||||
|     fn bench_write_buf_flatten_buffer_chunk(b: &mut Bencher) { | ||||
|         let s = "Hello, World!"; | ||||
|         b.bytes = s.len() as u64; | ||||
|     // #[cfg(feature = "nightly")] | ||||
|     // #[bench] | ||||
|     // fn bench_write_buf_flatten_buffer_chunk(b: &mut Bencher) { | ||||
|     //     let s = "Hello, World!"; | ||||
|     //     b.bytes = s.len() as u64; | ||||
|  | ||||
|         let mut write_buf = WriteBuf::<bytes::Bytes>::new(); | ||||
|         write_buf.set_strategy(WriteStrategy::Flatten); | ||||
|         b.iter(|| { | ||||
|             let chunk = bytes::Bytes::from(s); | ||||
|             write_buf.buffer(chunk); | ||||
|             ::test::black_box(&write_buf); | ||||
|             write_buf.headers.bytes.clear(); | ||||
|         }) | ||||
|     } | ||||
|     //     let mut write_buf = WriteBuf::<bytes::Bytes>::new(); | ||||
|     //     write_buf.set_strategy(WriteStrategy::Flatten); | ||||
|     //     b.iter(|| { | ||||
|     //         let chunk = bytes::Bytes::from(s); | ||||
|     //         write_buf.buffer(chunk); | ||||
|     //         ::test::black_box(&write_buf); | ||||
|     //         write_buf.headers.bytes.clear(); | ||||
|     //     }) | ||||
|     // } | ||||
| } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user