From 5a59875742500672f253719c1e1a16b4eddfacc7 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Thu, 5 Dec 2019 16:56:35 -0800 Subject: [PATCH] feat(body): replace `Chunk` type with `Bytes` Closes #1931 BREAKING CHANGE: All usage of `hyper::Chunk` should be replaced with `bytes::Bytes` (or `hyper::body::Bytes`). --- examples/web_api.rs | 6 +- src/body/body.rs | 58 ++++++------- src/body/chunk.rs | 175 --------------------------------------- src/body/mod.rs | 6 +- src/lib.rs | 2 +- src/proto/h1/conn.rs | 45 +++++----- src/proto/h1/dispatch.rs | 4 +- src/proto/h1/io.rs | 4 +- tests/client.rs | 2 +- tests/support/mod.rs | 2 +- 10 files changed, 59 insertions(+), 245 deletions(-) delete mode 100644 src/body/chunk.rs diff --git a/examples/web_api.rs b/examples/web_api.rs index ccdf2e93..aad71af1 100644 --- a/examples/web_api.rs +++ b/examples/web_api.rs @@ -3,7 +3,7 @@ use futures_util::{StreamExt, TryStreamExt}; use hyper::client::HttpConnector; use hyper::service::{make_service_fn, service_fn}; -use hyper::{header, Body, Chunk, Client, Method, Request, Response, Server, StatusCode}; +use hyper::{header, Body, Client, Method, Request, Response, Server, StatusCode}; type GenericError = Box; type Result = std::result::Result; @@ -25,11 +25,11 @@ async fn client_request_response(client: &Client) -> ResultPOST request body: {}
Response: {}", POST_DATA, std::str::from_utf8(&b).unwrap() - )) + ) })); Ok(Response::new(body)) diff --git a/src/body/body.rs b/src/body/body.rs index 23ca0282..1de610e4 100644 --- a/src/body/body.rs +++ b/src/body/body.rs @@ -11,13 +11,12 @@ use futures_util::TryStreamExt; use http::HeaderMap; use http_body::{Body as HttpBody, SizeHint}; -use super::Chunk; use crate::common::{task, Future, Never, Pin, Poll}; use crate::upgrade::OnUpgrade; -type BodySender = mpsc::Sender>; +type BodySender = mpsc::Sender>; -/// A stream of `Chunk`s, used when receiving bodies. +/// A stream of `Bytes`s, used when receiving bodies. /// /// A good default `Payload` to use in many applications. #[must_use = "streams do nothing unless polled"] @@ -29,11 +28,11 @@ pub struct Body { } enum Kind { - Once(Option), + Once(Option), Chan { content_length: Option, abort_rx: oneshot::Receiver<()>, - rx: mpsc::Receiver>, + rx: mpsc::Receiver>, }, H2 { content_length: Option, @@ -45,7 +44,7 @@ enum Kind { // See https://github.com/rust-lang/rust/issues/57017 #[cfg(feature = "stream")] Wrapped( - Pin>> + Send + Sync>>, + Pin>> + Send + Sync>>, ), } @@ -152,7 +151,7 @@ impl Body { pub fn wrap_stream(stream: S) -> Body where S: Stream> + Send + Sync + 'static, - O: Into + 'static, + O: Into + 'static, E: Into> + 'static, { let mapped = stream.map_ok(Into::into).map_err(Into::into); @@ -208,7 +207,7 @@ impl Body { }) } - fn poll_eof(&mut self, cx: &mut task::Context<'_>) -> Poll>> { + fn poll_eof(&mut self, cx: &mut task::Context<'_>) -> Poll>> { match self.take_delayed_eof() { Some(DelayEof::NotEof(mut delay)) => match self.poll_inner(cx) { ok @ Poll::Ready(Some(Ok(..))) | ok @ Poll::Pending => { @@ -237,7 +236,7 @@ impl Body { } } - fn poll_inner(&mut self, cx: &mut task::Context<'_>) -> Poll>> { + fn poll_inner(&mut self, cx: &mut task::Context<'_>) -> Poll>> { match self.kind { Kind::Once(ref mut val) => Poll::Ready(val.take().map(Ok)), Kind::Chan { @@ -265,7 +264,7 @@ impl Body { } => match ready!(h2.poll_data(cx)) { Some(Ok(bytes)) => { let _ = h2.flow_control().release_capacity(bytes.len()); - Poll::Ready(Some(Ok(Chunk::from(bytes)))) + Poll::Ready(Some(Ok(bytes))) } Some(Err(e)) => Poll::Ready(Some(Err(crate::Error::new_body(e)))), None => Poll::Ready(None), @@ -279,7 +278,7 @@ impl Body { } } - pub(super) fn take_full_data(&mut self) -> Option { + pub(super) fn take_full_data(&mut self) -> Option { if let Kind::Once(ref mut chunk) = self.kind { chunk.take() } else { @@ -297,7 +296,7 @@ impl Default for Body { } impl HttpBody for Body { - type Data = Chunk; + type Data = Bytes; type Error = crate::Error; fn poll_data( @@ -362,7 +361,7 @@ impl fmt::Debug for Body { #[derive(Debug)] struct Empty; #[derive(Debug)] - struct Full<'a>(&'a Chunk); + struct Full<'a>(&'a Bytes); let mut builder = f.debug_tuple("Body"); match self.kind { @@ -381,7 +380,7 @@ impl fmt::Debug for Body { /// `Cargo.toml`. #[cfg(feature = "stream")] impl Stream for Body { - type Item = crate::Result; + type Item = crate::Result; fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { HttpBody::poll_data(self, cx) @@ -393,22 +392,22 @@ impl Stream for Body { /// This function requires enabling the `stream` feature in your /// `Cargo.toml`. #[cfg(feature = "stream")] -impl From>> + Send + Sync>> +impl From>> + Send + Sync>> for Body { #[inline] fn from( stream: Box< - dyn Stream>> + Send + Sync, + dyn Stream>> + Send + Sync, >, ) -> Body { Body::new(Kind::Wrapped(stream.into())) } } -impl From for Body { +impl From for Body { #[inline] - fn from(chunk: Chunk) -> Body { + fn from(chunk: Bytes) -> Body { if chunk.is_empty() { Body::empty() } else { @@ -417,24 +416,17 @@ impl From for Body { } } -impl From for Body { - #[inline] - fn from(bytes: Bytes) -> Body { - Body::from(Chunk::from(bytes)) - } -} - impl From> for Body { #[inline] fn from(vec: Vec) -> Body { - Body::from(Chunk::from(vec)) + Body::from(Bytes::from(vec)) } } impl From<&'static [u8]> for Body { #[inline] fn from(slice: &'static [u8]) -> Body { - Body::from(Chunk::from(slice)) + Body::from(Bytes::from(slice)) } } @@ -451,14 +443,14 @@ impl From> for Body { impl From for Body { #[inline] fn from(s: String) -> Body { - Body::from(Chunk::from(s.into_bytes())) + Body::from(Bytes::from(s.into_bytes())) } } impl From<&'static str> for Body { #[inline] fn from(slice: &'static str) -> Body { - Body::from(Chunk::from(slice.as_bytes())) + Body::from(Bytes::from(slice.as_bytes())) } } @@ -486,7 +478,7 @@ impl Sender { } /// Send data on this channel when it is ready. - pub async fn send_data(&mut self, chunk: Chunk) -> crate::Result<()> { + pub async fn send_data(&mut self, chunk: Bytes) -> crate::Result<()> { futures_util::future::poll_fn(|cx| self.poll_ready(cx)).await?; self.tx .try_send(Ok(chunk)) @@ -497,15 +489,15 @@ impl Sender { /// /// # Errors /// - /// Returns `Err(Chunk)` if the channel could not (currently) accept - /// another `Chunk`. + /// Returns `Err(Bytes)` if the channel could not (currently) accept + /// another `Bytes`. /// /// # Note /// /// This is mostly useful for when trying to send from some other thread /// that doesn't have an async context. If in an async context, prefer /// [`send_data`][] instead. - pub fn try_send_data(&mut self, chunk: Chunk) -> Result<(), Chunk> { + pub fn try_send_data(&mut self, chunk: Bytes) -> Result<(), Bytes> { self.tx .try_send(Ok(chunk)) .map_err(|err| err.into_inner().expect("just sent Ok")) diff --git a/src/body/chunk.rs b/src/body/chunk.rs deleted file mode 100644 index aa3bd10c..00000000 --- a/src/body/chunk.rs +++ /dev/null @@ -1,175 +0,0 @@ -use std::fmt; - -use bytes::{Buf, Bytes}; - -/// A piece of a message body. -/// -/// These are returned by [`Body`](::Body). It is an efficient buffer type. -/// -/// A `Chunk` can be easily created by many of Rust's standard types that -/// represent a collection of bytes, using `Chunk::from`. -pub struct Chunk { - /// The buffer of bytes making up this body. - bytes: Bytes, -} - -// An unexported type to prevent locking `Chunk::into_iter()` to `Bytes::into_iter()`. -#[derive(Debug)] -pub struct IntoIter { - inner: ::IntoIter, -} - -impl Chunk { - /// Converts this `Chunk` directly into the `Bytes` type without copies. - /// - /// This is simply an inherent alias for `Bytes::from(chunk)`, which exists, - /// but doesn't appear in rustdocs. - #[inline] - pub fn into_bytes(self) -> Bytes { - self.into() - } -} - -impl Buf for Chunk { - #[inline] - fn remaining(&self) -> usize { - //perf: Bytes::len() isn't inline yet, - //so it's slightly slower than checking - //the length of the slice. - self.bytes().len() - } - - #[inline] - fn bytes(&self) -> &[u8] { - &self.bytes - } - - #[inline] - fn advance(&mut self, cnt: usize) { - self.bytes.advance(cnt); - } -} - -impl From> for Chunk { - #[inline] - fn from(v: Vec) -> Chunk { - Chunk::from(Bytes::from(v)) - } -} - -impl From<&'static [u8]> for Chunk { - #[inline] - fn from(slice: &'static [u8]) -> Chunk { - Chunk::from(Bytes::from_static(slice)) - } -} - -impl From for Chunk { - #[inline] - fn from(s: String) -> Chunk { - s.into_bytes().into() - } -} - -impl From<&'static str> for Chunk { - #[inline] - fn from(slice: &'static str) -> Chunk { - slice.as_bytes().into() - } -} - -impl From for Chunk { - #[inline] - fn from(bytes: Bytes) -> Chunk { - Chunk { bytes: bytes } - } -} - -impl From for Bytes { - #[inline] - fn from(chunk: Chunk) -> Bytes { - chunk.bytes - } -} - -impl ::std::ops::Deref for Chunk { - type Target = [u8]; - - #[inline] - fn deref(&self) -> &Self::Target { - self.as_ref() - } -} - -impl AsRef<[u8]> for Chunk { - #[inline] - fn as_ref(&self) -> &[u8] { - &self.bytes - } -} - -impl fmt::Debug for Chunk { - #[inline] - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt::Debug::fmt(&self.bytes, f) - } -} - -impl Default for Chunk { - #[inline] - fn default() -> Chunk { - Chunk::from(Bytes::new()) - } -} - -impl IntoIterator for Chunk { - type Item = u8; - type IntoIter = IntoIter; - - #[inline] - fn into_iter(self) -> Self::IntoIter { - IntoIter { - inner: self.bytes.into_iter(), - } - } -} - -impl Iterator for IntoIter { - type Item = u8; - - #[inline] - fn next(&mut self) -> Option { - self.inner.next() - } - - #[inline] - fn size_hint(&self) -> (usize, Option) { - self.inner.size_hint() - } -} - -impl ExactSizeIterator for IntoIter {} - -#[cfg(test)] -mod tests { - #[cfg(feature = "nightly")] - use test::Bencher; - - #[cfg(feature = "nightly")] - #[bench] - fn bench_chunk_static_buf(b: &mut Bencher) { - use bytes::BufMut; - - let s = "Hello, World!"; - b.bytes = s.len() as u64; - - let mut dst = Vec::with_capacity(128); - - b.iter(|| { - let chunk = crate::Chunk::from(s); - dst.put(chunk); - ::test::black_box(&dst); - dst.clear(); - }) - } -} diff --git a/src/body/mod.rs b/src/body/mod.rs index 0fd3170a..1a28093a 100644 --- a/src/body/mod.rs +++ b/src/body/mod.rs @@ -15,14 +15,13 @@ //! requests and client responses). It is also a decent default implementation //! if you don't have very custom needs of your send streams. +pub use bytes::{Buf, Bytes}; pub use http_body::Body as HttpBody; pub use self::body::{Body, Sender}; -pub use self::chunk::Chunk; pub(crate) use self::payload::Payload; mod body; -mod chunk; mod payload; /// An optimization to try to take a full body if immediately available. @@ -56,6 +55,5 @@ fn _assert_send_sync() { fn _assert_sync() {} _assert_send::(); - _assert_send::(); - _assert_sync::(); + _assert_sync::(); } diff --git a/src/lib.rs b/src/lib.rs index ad5d8846..4e9c8b55 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -39,7 +39,7 @@ extern crate test; pub use http::{header, HeaderMap, Method, Request, Response, StatusCode, Uri, Version}; -pub use crate::body::{Body, Chunk}; +pub use crate::body::Body; pub use crate::client::Client; pub use crate::error::{Error, Result}; pub use crate::server::Server; diff --git a/src/proto/h1/conn.rs b/src/proto/h1/conn.rs index 6cfb2eb7..1b0f60e4 100644 --- a/src/proto/h1/conn.rs +++ b/src/proto/h1/conn.rs @@ -12,7 +12,6 @@ use super::{/*Decode,*/ Decoder, Encode, EncodedBuf, Encoder, Http1Transaction, use crate::common::{task, Pin, Poll, Unpin}; use crate::headers::connection_keep_alive; use crate::proto::{BodyLength, DecodedLength, MessageHead}; -use crate::Chunk; const H2_PREFACE: &'static [u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"; @@ -205,7 +204,7 @@ where pub fn poll_read_body( &mut self, cx: &mut task::Context<'_>, - ) -> Poll>> { + ) -> Poll>> { debug_assert!(self.can_read_body()); let (reading, ret) = match self.state.reading { @@ -217,7 +216,7 @@ where ( Reading::KeepAlive, if !slice.is_empty() { - Some(Ok(Chunk::from(slice))) + Some(Ok(slice)) } else { None }, @@ -229,7 +228,7 @@ where // an empty slice... (Reading::Closed, None) } else { - return Poll::Ready(Some(Ok(Chunk::from(slice)))); + return Poll::Ready(Some(Ok(slice))); }; (reading, Poll::Ready(chunk)) } @@ -930,7 +929,7 @@ mod tests { // an empty IO, we'll be skipping and using the read buffer anyways let io = tokio_test::io::Builder::new().build(); - let mut conn = Conn::<_, crate::Chunk, crate::proto::h1::ServerTransaction>::new(io); + let mut conn = Conn::<_, bytes::Bytes, crate::proto::h1::ServerTransaction>::new(io); *conn.io.read_buf_mut() = ::bytes::BytesMut::from(&s[..]); conn.state.cached_headers = Some(HeaderMap::with_capacity(2)); @@ -981,7 +980,7 @@ mod tests { let good_message = b"GET / HTTP/1.1\r\n\r\n".to_vec(); let len = good_message.len(); let io = AsyncIo::new_buf(good_message, len); - let mut conn = Conn::<_, proto::Chunk, ServerTransaction>::new(io); + let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); match conn.poll().unwrap() { Async::Ready(Some(Frame::Message { message, body: false })) => { @@ -999,7 +998,7 @@ mod tests { let _: Result<(), ()> = future::lazy(|| { let good_message = b"GET / HTTP/1.1\r\nHost: foo.bar\r\n\r\n".to_vec(); let io = AsyncIo::new_buf(good_message, 10); - let mut conn = Conn::<_, proto::Chunk, ServerTransaction>::new(io); + let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); assert!(conn.poll().unwrap().is_not_ready()); conn.io.io_mut().block_in(50); let async = conn.poll().unwrap(); @@ -1015,7 +1014,7 @@ mod tests { #[test] fn test_conn_init_read_eof_idle() { let io = AsyncIo::new_buf(vec![], 1); - let mut conn = Conn::<_, proto::Chunk, ServerTransaction>::new(io); + let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); conn.state.idle(); match conn.poll().unwrap() { @@ -1027,7 +1026,7 @@ mod tests { #[test] fn test_conn_init_read_eof_idle_partial_parse() { let io = AsyncIo::new_buf(b"GET / HTTP/1.1".to_vec(), 100); - let mut conn = Conn::<_, proto::Chunk, ServerTransaction>::new(io); + let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); conn.state.idle(); match conn.poll() { @@ -1041,7 +1040,7 @@ mod tests { let _: Result<(), ()> = future::lazy(|| { // server ignores let io = AsyncIo::new_eof(); - let mut conn = Conn::<_, proto::Chunk, ServerTransaction>::new(io); + let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); conn.state.busy(); match conn.poll().unwrap() { @@ -1051,7 +1050,7 @@ mod tests { // client let io = AsyncIo::new_eof(); - let mut conn = Conn::<_, proto::Chunk, ClientTransaction>::new(io); + let mut conn = Conn::<_, proto::Bytes, ClientTransaction>::new(io); conn.state.busy(); match conn.poll() { @@ -1066,7 +1065,7 @@ mod tests { fn test_conn_body_finish_read_eof() { let _: Result<(), ()> = future::lazy(|| { let io = AsyncIo::new_eof(); - let mut conn = Conn::<_, proto::Chunk, ClientTransaction>::new(io); + let mut conn = Conn::<_, proto::Bytes, ClientTransaction>::new(io); conn.state.busy(); conn.state.writing = Writing::KeepAlive; conn.state.reading = Reading::Body(Decoder::length(0)); @@ -1091,7 +1090,7 @@ mod tests { fn test_conn_message_empty_body_read_eof() { let _: Result<(), ()> = future::lazy(|| { let io = AsyncIo::new_buf(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n".to_vec(), 1024); - let mut conn = Conn::<_, proto::Chunk, ClientTransaction>::new(io); + let mut conn = Conn::<_, proto::Bytes, ClientTransaction>::new(io); conn.state.busy(); conn.state.writing = Writing::KeepAlive; @@ -1115,7 +1114,7 @@ mod tests { fn test_conn_read_body_end() { let _: Result<(), ()> = future::lazy(|| { let io = AsyncIo::new_buf(b"POST / HTTP/1.1\r\nContent-Length: 5\r\n\r\n12345".to_vec(), 1024); - let mut conn = Conn::<_, proto::Chunk, ServerTransaction>::new(io); + let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); conn.state.busy(); match conn.poll() { @@ -1145,7 +1144,7 @@ mod tests { #[test] fn test_conn_closed_read() { let io = AsyncIo::new_buf(vec![], 0); - let mut conn = Conn::<_, proto::Chunk, ServerTransaction>::new(io); + let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); conn.state.close(); match conn.poll().unwrap() { @@ -1159,7 +1158,7 @@ mod tests { let _ = pretty_env_logger::try_init(); let _: Result<(), ()> = future::lazy(|| { let io = AsyncIo::new_buf(vec![], 0); - let mut conn = Conn::<_, proto::Chunk, ServerTransaction>::new(io); + let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); let max = super::super::io::DEFAULT_MAX_BUFFER_SIZE + 4096; conn.state.writing = Writing::Body(Encoder::length((max * 2) as u64)); @@ -1184,7 +1183,7 @@ mod tests { fn test_conn_body_write_chunked() { let _: Result<(), ()> = future::lazy(|| { let io = AsyncIo::new_buf(vec![], 4096); - let mut conn = Conn::<_, proto::Chunk, ServerTransaction>::new(io); + let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); conn.state.writing = Writing::Body(Encoder::chunked()); assert!(conn.start_send(Frame::Body { chunk: Some("headers".into()) }).unwrap().is_ready()); @@ -1197,7 +1196,7 @@ mod tests { fn test_conn_body_flush() { let _: Result<(), ()> = future::lazy(|| { let io = AsyncIo::new_buf(vec![], 1024 * 1024 * 5); - let mut conn = Conn::<_, proto::Chunk, ServerTransaction>::new(io); + let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); conn.state.writing = Writing::Body(Encoder::length(1024 * 1024)); assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'a'; 1024 * 1024].into()) }).unwrap().is_ready()); assert!(!conn.can_buffer_body()); @@ -1234,7 +1233,7 @@ mod tests { // test that once writing is done, unparks let f = future::lazy(|| { let io = AsyncIo::new_buf(vec![], 4096); - let mut conn = Conn::<_, proto::Chunk, ServerTransaction>::new(io); + let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); conn.state.reading = Reading::KeepAlive; assert!(conn.poll().unwrap().is_not_ready()); @@ -1248,7 +1247,7 @@ mod tests { // test that flushing when not waiting on read doesn't unpark let f = future::lazy(|| { let io = AsyncIo::new_buf(vec![], 4096); - let mut conn = Conn::<_, proto::Chunk, ServerTransaction>::new(io); + let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); conn.state.writing = Writing::KeepAlive; assert!(conn.poll_complete().unwrap().is_ready()); Ok::<(), ()>(()) @@ -1259,7 +1258,7 @@ mod tests { // test that flushing and writing isn't done doesn't unpark let f = future::lazy(|| { let io = AsyncIo::new_buf(vec![], 4096); - let mut conn = Conn::<_, proto::Chunk, ServerTransaction>::new(io); + let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); conn.state.reading = Reading::KeepAlive; assert!(conn.poll().unwrap().is_not_ready()); conn.state.writing = Writing::Body(Encoder::length(5_000)); @@ -1272,7 +1271,7 @@ mod tests { #[test] fn test_conn_closed_write() { let io = AsyncIo::new_buf(vec![], 0); - let mut conn = Conn::<_, proto::Chunk, ServerTransaction>::new(io); + let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); conn.state.close(); match conn.start_send(Frame::Body { chunk: Some(b"foobar".to_vec().into()) }) { @@ -1286,7 +1285,7 @@ mod tests { #[test] fn test_conn_write_empty_chunk() { let io = AsyncIo::new_buf(vec![], 0); - let mut conn = Conn::<_, proto::Chunk, ServerTransaction>::new(io); + let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); conn.state.writing = Writing::KeepAlive; assert!(conn.start_send(Frame::Body { chunk: None }).unwrap().is_ready()); diff --git a/src/proto/h1/dispatch.rs b/src/proto/h1/dispatch.rs index 534cb423..6d5751bc 100644 --- a/src/proto/h1/dispatch.rs +++ b/src/proto/h1/dispatch.rs @@ -646,7 +646,7 @@ mod tests { // the request is ready to write later... //let io = AsyncIo::new_buf(b"HTTP/1.1 200 OK\r\n\r\n".to_vec(), 0); let (mut tx, rx) = crate::client::dispatch::channel(); - let conn = Conn::<_, crate::Chunk, ClientTransaction>::new(io); + let conn = Conn::<_, bytes::Bytes, ClientTransaction>::new(io); let mut dispatcher = Dispatcher::new(Client::new(rx), conn); // First poll is needed to allow tx to send... @@ -681,7 +681,7 @@ mod tests { .build(); let (mut tx, rx) = crate::client::dispatch::channel(); - let conn = Conn::<_, crate::Chunk, ClientTransaction>::new(io); + let conn = Conn::<_, bytes::Bytes, ClientTransaction>::new(io); let mut dispatcher = tokio_test::task::spawn(Dispatcher::new(Client::new(rx), conn)); // First poll is needed to allow tx to send... diff --git a/src/proto/h1/io.rs b/src/proto/h1/io.rs index 02f5a707..75d3f355 100644 --- a/src/proto/h1/io.rs +++ b/src/proto/h1/io.rs @@ -962,10 +962,10 @@ mod tests { let s = "Hello, World!"; b.bytes = s.len() as u64; - let mut write_buf = WriteBuf::::new(); + let mut write_buf = WriteBuf::::new(); write_buf.set_strategy(WriteStrategy::Flatten); b.iter(|| { - let chunk = crate::Chunk::from(s); + let chunk = bytes::Bytes::from(s); write_buf.buffer(chunk); ::test::black_box(&write_buf); write_buf.headers.bytes.clear(); diff --git a/tests/client.rs b/tests/client.rs index 16f3dfb5..68076365 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -28,7 +28,7 @@ fn tcp_connect(addr: &SocketAddr) -> impl Future Result { +async fn concat(mut body: Body) -> Result { let mut vec = Vec::new(); while let Some(chunk) = body.next().await { vec.extend_from_slice(&chunk?); diff --git a/tests/support/mod.rs b/tests/support/mod.rs index bc39a7e4..48863410 100644 --- a/tests/support/mod.rs +++ b/tests/support/mod.rs @@ -474,7 +474,7 @@ fn naive_proxy(cfg: ProxyConfig) -> (SocketAddr, impl Future) { (proxy_addr, srv.map(|res| res.expect("proxy error"))) } -async fn concat(mut body: Body) -> Result { +async fn concat(mut body: Body) -> Result { let mut vec = Vec::new(); while let Some(chunk) = body.next().await { vec.extend_from_slice(&chunk?);