diff --git a/Cargo.toml b/Cargo.toml index 0a280f3..cec4cde 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,9 +41,9 @@ members = [ ] [dependencies] -futures-preview = "0.3.0-alpha.18" -tokio-io = { git = "https://github.com/tokio-rs/tokio" } -tokio-codec = { git = "https://github.com/tokio-rs/tokio" } +futures-preview = "0.3.0-alpha.17" +tokio-io = "0.2.0-alpha.1" +tokio-codec = "0.2.0-alpha.1" bytes = "0.4.7" http = "0.1.8" log = "0.4.1" @@ -65,7 +65,7 @@ serde = "1.0.0" serde_json = "1.0.0" # Akamai example -tokio = { git = "https://github.com/tokio-rs/tokio" } +tokio = "0.2.0-alpha.1" env_logger = { version = "0.5.3", default-features = false } rustls = "0.12" tokio-rustls = "0.5.0" diff --git a/examples/client.rs b/examples/client.rs index 0f5ff29..3993344 100644 --- a/examples/client.rs +++ b/examples/client.rs @@ -1,48 +1,14 @@ #![feature(async_await)] -use futures::{ready, Stream}; +use futures::future::poll_fn; +use futures::StreamExt; use h2::client; -use h2::RecvStream; use http::{HeaderMap, Request}; -use std::future::Future; -use std::pin::Pin; -use std::task::{Context, Poll}; use std::error::Error; use tokio::net::TcpStream; -struct Process { - body: RecvStream, - trailers: bool, -} - -impl Future for Process { - type Output = Result<(), h2::Error>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - loop { - if self.trailers { - let trailers = ready!(self.body.poll_trailers(cx)); - - println!("GOT TRAILERS: {:?}", trailers); - - return Poll::Ready(Ok(())); - } else { - match ready!(Pin::new(&mut self.body).poll_next(cx)) { - Some(Ok(chunk)) => { - println!("GOT CHUNK = {:?}", chunk); - } - Some(Err(e)) => return Poll::Ready(Err(e)), - None => { - self.trailers = true; - } - } - } - } - } -} - #[tokio::main] pub async fn main() -> Result<(), Box> { let _ = env_logger::try_init(); @@ -76,12 +42,15 @@ pub async fn main() -> Result<(), Box> { println!("GOT RESPONSE: {:?}", response); // Get the body - let (_, body) = response.into_parts(); + let (_, mut body) = response.into_parts(); - Process { - body, - trailers: false, + while let Some(chunk) = body.next().await { + println!("GOT CHUNK = {:?}", chunk?); } - .await?; + + if let Some(trailers) = poll_fn(|cx| body.poll_trailers(cx)).await { + println!("GOT TRAILERS: {:?}", trailers?); + } + Ok(()) } diff --git a/src/codec/framed_write.rs b/src/codec/framed_write.rs index ce95620..cfcbdd4 100644 --- a/src/codec/framed_write.rs +++ b/src/codec/framed_write.rs @@ -280,6 +280,14 @@ impl AsyncRead for FramedWrite { ) -> Poll> { Pin::new(&mut self.inner).poll_read(cx, buf) } + + fn poll_read_buf( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut Buf, + ) -> Poll> { + Pin::new(&mut self.inner).poll_read_buf(cx, buf) + } } #[cfg(feature = "unstable")] diff --git a/src/proto/connection.rs b/src/proto/connection.rs index 4c38e25..3b1f7bb 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -289,8 +289,8 @@ where // The order here matters: // - poll_go_away may buffer a graceful shutdown GOAWAY frame // - If it has, we've also added a PING to be sent in poll_ready - match ready!(self.poll_go_away(cx)) { - Some(Ok(reason)) => { + match ready!(self.poll_go_away(cx)?) { + Some(reason) => { if self.go_away.should_close_now() { if self.go_away.is_user_initiated() { // A user initiated abrupt shutdown shouldn't return @@ -307,33 +307,32 @@ where "graceful GOAWAY should be NO_ERROR" ); } - Some(Err(e)) => return Poll::Ready(Err(e.into())), None => (), } ready!(self.poll_ready(cx))?; - match ready!(Pin::new(&mut self.codec).poll_next(cx)) { - Some(Ok(Headers(frame))) => { + match ready!(Pin::new(&mut self.codec).poll_next(cx)?) { + Some(Headers(frame)) => { log::trace!("recv HEADERS; frame={:?}", frame); self.streams.recv_headers(frame)?; } - Some(Ok(Data(frame))) => { + Some(Data(frame)) => { log::trace!("recv DATA; frame={:?}", frame); self.streams.recv_data(frame)?; } - Some(Ok(Reset(frame))) => { + Some(Reset(frame)) => { log::trace!("recv RST_STREAM; frame={:?}", frame); self.streams.recv_reset(frame)?; } - Some(Ok(PushPromise(frame))) => { + Some(PushPromise(frame)) => { log::trace!("recv PUSH_PROMISE; frame={:?}", frame); self.streams.recv_push_promise(frame)?; } - Some(Ok(Settings(frame))) => { + Some(Settings(frame)) => { log::trace!("recv SETTINGS; frame={:?}", frame); self.settings.recv_settings(frame); } - Some(Ok(GoAway(frame))) => { + Some(GoAway(frame)) => { log::trace!("recv GOAWAY; frame={:?}", frame); // This should prevent starting new streams, // but should allow continuing to process current streams @@ -342,7 +341,7 @@ where self.streams.recv_go_away(&frame)?; self.error = Some(frame.reason()); } - Some(Ok(Ping(frame))) => { + Some(Ping(frame)) => { log::trace!("recv PING; frame={:?}", frame); let status = self.ping_pong.recv_ping(frame); if status.is_shutdown() { @@ -355,15 +354,14 @@ where self.go_away(last_processed_id, Reason::NO_ERROR); } } - Some(Ok(WindowUpdate(frame))) => { + Some(WindowUpdate(frame)) => { log::trace!("recv WINDOW_UPDATE; frame={:?}", frame); self.streams.recv_window_update(frame)?; } - Some(Ok(Priority(frame))) => { + Some(Priority(frame)) => { log::trace!("recv PRIORITY; frame={:?}", frame); // TODO: handle } - Some(Err(e)) => return Poll::Ready(Err(e)), None => { log::trace!("codec closed"); self.streams.recv_eof(false).ok().expect("mutex poisoned"); diff --git a/tests/h2-fuzz/Cargo.toml b/tests/h2-fuzz/Cargo.toml index e9e251a..f9b9f11 100644 --- a/tests/h2-fuzz/Cargo.toml +++ b/tests/h2-fuzz/Cargo.toml @@ -9,7 +9,7 @@ edition = "2018" h2 = { path = "../.." } env_logger = { version = "0.5.3", default-features = false } -futures-preview = "0.3.0-alpha.18" +futures-preview = "0.3.0-alpha.17" honggfuzz = "0.5" http = "0.1.3" -tokio = { git = "https://github.com/tokio-rs/tokio" } +tokio = "0.2.0-alpha.1" diff --git a/tests/h2-support/Cargo.toml b/tests/h2-support/Cargo.toml index f062ad3..04a1588 100644 --- a/tests/h2-support/Cargo.toml +++ b/tests/h2-support/Cargo.toml @@ -9,7 +9,7 @@ h2 = { path = "../..", features = ["unstable"] } bytes = "0.4.7" env_logger = "0.5.9" -futures-preview = "0.3.0-alpha.18" +futures-preview = "0.3.0-alpha.17" http = "0.1.5" string = "0.2" -tokio = { git = "https://github.com/tokio-rs/tokio" } +tokio = "0.2.0-alpha.1" diff --git a/tests/h2-support/src/lib.rs b/tests/h2-support/src/lib.rs index a8ae0e8..98025ae 100644 --- a/tests/h2-support/src/lib.rs +++ b/tests/h2-support/src/lib.rs @@ -7,16 +7,15 @@ pub mod assert; pub mod raw; pub mod frames; -pub mod prelude; pub mod mock; pub mod mock_io; -pub mod notify; +pub mod prelude; pub mod util; mod client_ext; mod future_ext; -pub use crate::client_ext::{SendRequestExt}; +pub use crate::client_ext::SendRequestExt; pub use crate::future_ext::TestFuture; pub type WindowSize = usize; diff --git a/tests/h2-support/src/notify.rs b/tests/h2-support/src/notify.rs deleted file mode 100644 index 783297a..0000000 --- a/tests/h2-support/src/notify.rs +++ /dev/null @@ -1,53 +0,0 @@ -use std::sync::Arc; -use std::sync::atomic::AtomicBool; -use std::sync::atomic::Ordering::SeqCst; - -pub struct MockNotify { - inner: Arc, -} - -struct Inner { - notified: AtomicBool, -} - -impl MockNotify { - pub fn new() -> Self { - MockNotify { - inner: Arc::new(Inner { - notified: AtomicBool::new(false), - }), - } - } - - pub fn with R, R>(&self, _f: F) -> R { - unimplemented!(); - // use futures::future::poll_fn; - - // self.clear(); - - // let mut f = Some(f); - - // let res = tokio::spawn(poll_fn(move |cx| { - // Poll::Ready(f.take().unwrap()()) - // })).poll_future_notify(&self.inner, 0); - - // match res { - // Poll::Ready(v) => v, - // _ => unreachable!(), - // } - } - - pub fn clear(&self) { - self.inner.notified.store(false, SeqCst); - } - - pub fn is_notified(&self) -> bool { - self.inner.notified.load(SeqCst) - } -} - -// impl Notify for Inner { -// fn notify(&self, _: usize) { -// self.notified.store(true, SeqCst); -// } -// } diff --git a/tests/h2-support/src/prelude.rs b/tests/h2-support/src/prelude.rs index d0d5630..866ef15 100644 --- a/tests/h2-support/src/prelude.rs +++ b/tests/h2-support/src/prelude.rs @@ -12,9 +12,6 @@ pub use super::mock::{self, idle_ms}; // Re-export frames helpers pub use super::frames; -// Re-export mock notify -pub use super::notify::MockNotify; - // Re-export utility mod pub use super::util; diff --git a/tests/h2-tests/Cargo.toml b/tests/h2-tests/Cargo.toml index 0961465..5876b5e 100644 --- a/tests/h2-tests/Cargo.toml +++ b/tests/h2-tests/Cargo.toml @@ -10,5 +10,5 @@ edition = "2018" [dev-dependencies] h2-support = { path = "../h2-support" } log = "0.4.1" -futures-preview = "0.3.0-alpha.18" -tokio = { git = "https://github.com/tokio-rs/tokio" } +futures-preview = "0.3.0-alpha.17" +tokio = "0.2.0-alpha.1"