Update to Tokio and Bytes 1.0 (#504)

This commit is contained in:
Sean McArthur
2020-12-23 10:01:44 -08:00
committed by GitHub
parent dc3079ab89
commit b4976675fa
10 changed files with 38 additions and 32 deletions

View File

@@ -45,9 +45,9 @@ members = [
futures-core = { version = "0.3", default-features = false } futures-core = { version = "0.3", default-features = false }
futures-sink = { version = "0.3", default-features = false } futures-sink = { version = "0.3", default-features = false }
futures-util = { version = "0.3", default-features = false } futures-util = { version = "0.3", default-features = false }
tokio-util = { version = "0.5", features = ["codec"] } tokio-util = { version = "0.6", features = ["codec"] }
tokio = { version = "0.3.4", features = ["io-util"] } tokio = { version = "1", features = ["io-util"] }
bytes = "0.6" bytes = "1"
http = "0.2" http = "0.2"
tracing = { version = "0.1.13", default-features = false, features = ["std"] } tracing = { version = "0.1.13", default-features = false, features = ["std"] }
tracing-futures = { version = "0.2", default-features = false, features = ["std-future"]} tracing-futures = { version = "0.2", default-features = false, features = ["std-future"]}
@@ -68,9 +68,9 @@ serde = "1.0.0"
serde_json = "1.0.0" serde_json = "1.0.0"
# Examples # Examples
tokio = { version = "0.3.4", features = ["rt-multi-thread", "macros", "sync", "net"] } tokio = { version = "1", features = ["rt-multi-thread", "macros", "sync", "net"] }
env_logger = { version = "0.5.3", default-features = false } env_logger = { version = "0.5.3", default-features = false }
rustls = "0.18" #rustls = "0.18"
tokio-rustls = "0.20.0" #tokio-rustls = "0.20.0"
webpki = "0.21" #webpki = "0.21"
webpki-roots = "0.17" #webpki-roots = "0.17"

View File

@@ -1,3 +1,7 @@
fn main() {
eprintln!("TODO: Re-enable when tokio-rustls is upgraded.");
}
/*
use h2::client; use h2::client;
use http::{Method, Request}; use http::{Method, Request};
use tokio::net::TcpStream; use tokio::net::TcpStream;
@@ -73,3 +77,4 @@ pub async fn main() -> Result<(), Box<dyn Error>> {
} }
Ok(()) Ok(())
} }
*/

View File

@@ -202,10 +202,10 @@ where
// could just use `poll_write_buf`... // could just use `poll_write_buf`...
let n = if self.is_write_vectored { let n = if self.is_write_vectored {
let mut bufs = [IoSlice::new(&[]); MAX_IOVS]; let mut bufs = [IoSlice::new(&[]); MAX_IOVS];
let cnt = buf.bytes_vectored(&mut bufs); let cnt = buf.chunks_vectored(&mut bufs);
ready!(Pin::new(&mut self.inner).poll_write_vectored(cx, &bufs[..cnt]))? ready!(Pin::new(&mut self.inner).poll_write_vectored(cx, &bufs[..cnt]))?
} else { } else {
ready!(Pin::new(&mut self.inner).poll_write(cx, buf.bytes()))? ready!(Pin::new(&mut self.inner).poll_write(cx, buf.chunk()))?
}; };
buf.advance(n); buf.advance(n);
} }
@@ -213,12 +213,12 @@ where
tracing::trace!(queued_data_frame = false); tracing::trace!(queued_data_frame = false);
let n = if self.is_write_vectored { let n = if self.is_write_vectored {
let mut iovs = [IoSlice::new(&[]); MAX_IOVS]; let mut iovs = [IoSlice::new(&[]); MAX_IOVS];
let cnt = self.buf.bytes_vectored(&mut iovs); let cnt = self.buf.chunks_vectored(&mut iovs);
ready!( ready!(
Pin::new(&mut self.inner).poll_write_vectored(cx, &mut iovs[..cnt]) Pin::new(&mut self.inner).poll_write_vectored(cx, &mut iovs[..cnt])
)? )?
} else { } else {
ready!(Pin::new(&mut self.inner).poll_write(cx, &mut self.buf.bytes()))? ready!(Pin::new(&mut self.inner).poll_write(cx, &mut self.buf.chunk()))?
}; };
self.buf.advance(n); self.buf.advance(n);
} }

View File

@@ -311,7 +311,7 @@ impl Decoder {
if huff { if huff {
let ret = { let ret = {
let raw = &buf.bytes()[..len]; let raw = &buf.chunk()[..len];
huffman::decode(raw, &mut self.buffer).map(BytesMut::freeze) huffman::decode(raw, &mut self.buffer).map(BytesMut::freeze)
}; };
@@ -419,7 +419,7 @@ fn decode_int<B: Buf>(buf: &mut B, prefix_size: u8) -> Result<usize, DecoderErro
fn peek_u8<B: Buf>(buf: &mut B) -> Option<u8> { fn peek_u8<B: Buf>(buf: &mut B) -> Option<u8> {
if buf.has_remaining() { if buf.has_remaining() {
Some(buf.bytes()[0]) Some(buf.chunk()[0])
} else { } else {
None None
} }

View File

@@ -847,8 +847,12 @@ where
self.inner.remaining() self.inner.remaining()
} }
fn bytes(&self) -> &[u8] { fn chunk(&self) -> &[u8] {
self.inner.bytes() self.inner.chunk()
}
fn chunks_vectored<'a>(&'a self, dst: &mut [std::io::IoSlice<'a>]) -> usize {
self.inner.chunks_vectored(dst)
} }
fn advance(&mut self, cnt: usize) { fn advance(&mut self, cnt: usize) {

View File

@@ -11,5 +11,5 @@ h2 = { path = "../.." }
env_logger = { version = "0.5.3", default-features = false } env_logger = { version = "0.5.3", default-features = false }
futures = { version = "0.3", default-features = false, features = ["std"] } futures = { version = "0.3", default-features = false, features = ["std"] }
honggfuzz = "0.5" honggfuzz = "0.5"
http = { git = "https://github.com/paolobarbolini/http.git", branch = "bytes06" } http = "0.2"
tokio = { version = "0.3.2", features = [] } tokio = "1"

View File

@@ -7,10 +7,10 @@ edition = "2018"
[dependencies] [dependencies]
h2 = { path = "../..", features = ["stream", "unstable"] } h2 = { path = "../..", features = ["stream", "unstable"] }
bytes = "0.6" bytes = "1"
tracing = "0.1" tracing = "0.1"
tracing-subscriber = { version = "0.2", default-features = false, features = ["fmt", "chrono", "ansi"] } tracing-subscriber = { version = "0.2", default-features = false, features = ["fmt", "chrono", "ansi"] }
futures = { version = "0.3", default-features = false } futures = { version = "0.3", default-features = false }
http = "0.2" http = "0.2"
tokio = { version = "0.3.2", features = ["time"] } tokio = { version = "1", features = ["time"] }
tokio-test = "0.3" tokio-test = "0.4"

View File

@@ -147,7 +147,7 @@ impl Handle {
poll_fn(move |cx| { poll_fn(move |cx| {
while buf.has_remaining() { while buf.has_remaining() {
let res = Pin::new(self.codec.get_mut()) let res = Pin::new(self.codec.get_mut())
.poll_write(cx, &mut buf.bytes()) .poll_write(cx, &mut buf.chunk())
.map_err(|e| panic!("write err={:?}", e)); .map_err(|e| panic!("write err={:?}", e));
let n = ready!(res).unwrap(); let n = ready!(res).unwrap();

View File

@@ -11,4 +11,4 @@ edition = "2018"
h2-support = { path = "../h2-support" } h2-support = { path = "../h2-support" }
tracing = "0.1.13" tracing = "0.1.13"
futures = { version = "0.3", default-features = false, features = ["alloc"] } futures = { version = "0.3", default-features = false, features = ["alloc"] }
tokio = { version = "0.3.2", features = ["macros", "net", "rt", "io-util"] } tokio = { version = "1", features = ["macros", "net", "rt", "io-util"] }

View File

@@ -940,7 +940,6 @@ async fn recv_no_init_window_then_receive_some_init_window() {
#[tokio::test] #[tokio::test]
async fn settings_lowered_capacity_returns_capacity_to_connection() { async fn settings_lowered_capacity_returns_capacity_to_connection() {
use futures::channel::oneshot; use futures::channel::oneshot;
use futures::future::{select, Either};
h2_support::trace_init!(); h2_support::trace_init!();
let (io, mut srv) = mock::new(); let (io, mut srv) = mock::new();
@@ -972,10 +971,9 @@ async fn settings_lowered_capacity_returns_capacity_to_connection() {
// //
// A timeout is used here to avoid blocking forever if there is a // A timeout is used here to avoid blocking forever if there is a
// failure // failure
let result = select(rx2, tokio::time::sleep(Duration::from_secs(5))).await; let _ = tokio::time::timeout(Duration::from_secs(5), rx2)
if let Either::Right((_, _)) = result { .await
panic!("Timed out"); .unwrap();
}
idle_ms(500).await; idle_ms(500).await;
@@ -1004,10 +1002,9 @@ async fn settings_lowered_capacity_returns_capacity_to_connection() {
}); });
// Wait for server handshake to complete. // Wait for server handshake to complete.
let result = select(rx1, tokio::time::sleep(Duration::from_secs(5))).await; let _ = tokio::time::timeout(Duration::from_secs(5), rx1)
if let Either::Right((_, _)) = result { .await
panic!("Timed out"); .unwrap();
}
let request = Request::post("https://example.com/one").body(()).unwrap(); let request = Request::post("https://example.com/one").body(()).unwrap();