Updated as per review comments

This commit is contained in:
Gurwinder Singh
2019-08-16 08:45:08 +05:30
committed by Sean McArthur
parent 0a4bd393ec
commit ad7ffa795f
10 changed files with 42 additions and 124 deletions

View File

@@ -41,9 +41,9 @@ members = [
]
[dependencies]
futures-preview = "0.3.0-alpha.18"
tokio-io = { git = "https://github.com/tokio-rs/tokio" }
tokio-codec = { git = "https://github.com/tokio-rs/tokio" }
futures-preview = "0.3.0-alpha.17"
tokio-io = "0.2.0-alpha.1"
tokio-codec = "0.2.0-alpha.1"
bytes = "0.4.7"
http = "0.1.8"
log = "0.4.1"
@@ -65,7 +65,7 @@ serde = "1.0.0"
serde_json = "1.0.0"
# Akamai example
tokio = { git = "https://github.com/tokio-rs/tokio" }
tokio = "0.2.0-alpha.1"
env_logger = { version = "0.5.3", default-features = false }
rustls = "0.12"
tokio-rustls = "0.5.0"

View File

@@ -1,48 +1,14 @@
#![feature(async_await)]
use futures::{ready, Stream};
use futures::future::poll_fn;
use futures::StreamExt;
use h2::client;
use h2::RecvStream;
use http::{HeaderMap, Request};
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::error::Error;
use tokio::net::TcpStream;
struct Process {
body: RecvStream,
trailers: bool,
}
impl Future for Process {
type Output = Result<(), h2::Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
if self.trailers {
let trailers = ready!(self.body.poll_trailers(cx));
println!("GOT TRAILERS: {:?}", trailers);
return Poll::Ready(Ok(()));
} else {
match ready!(Pin::new(&mut self.body).poll_next(cx)) {
Some(Ok(chunk)) => {
println!("GOT CHUNK = {:?}", chunk);
}
Some(Err(e)) => return Poll::Ready(Err(e)),
None => {
self.trailers = true;
}
}
}
}
}
}
#[tokio::main]
pub async fn main() -> Result<(), Box<dyn Error>> {
let _ = env_logger::try_init();
@@ -76,12 +42,15 @@ pub async fn main() -> Result<(), Box<dyn Error>> {
println!("GOT RESPONSE: {:?}", response);
// Get the body
let (_, body) = response.into_parts();
let (_, mut body) = response.into_parts();
Process {
body,
trailers: false,
while let Some(chunk) = body.next().await {
println!("GOT CHUNK = {:?}", chunk?);
}
.await?;
if let Some(trailers) = poll_fn(|cx| body.poll_trailers(cx)).await {
println!("GOT TRAILERS: {:?}", trailers?);
}
Ok(())
}

View File

@@ -280,6 +280,14 @@ impl<T: AsyncRead + Unpin, B: Unpin> AsyncRead for FramedWrite<T, B> {
) -> Poll<io::Result<usize>> {
Pin::new(&mut self.inner).poll_read(cx, buf)
}
fn poll_read_buf<Buf: BufMut>(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut Buf,
) -> Poll<io::Result<usize>> {
Pin::new(&mut self.inner).poll_read_buf(cx, buf)
}
}
#[cfg(feature = "unstable")]

View File

@@ -289,8 +289,8 @@ where
// The order here matters:
// - poll_go_away may buffer a graceful shutdown GOAWAY frame
// - If it has, we've also added a PING to be sent in poll_ready
match ready!(self.poll_go_away(cx)) {
Some(Ok(reason)) => {
match ready!(self.poll_go_away(cx)?) {
Some(reason) => {
if self.go_away.should_close_now() {
if self.go_away.is_user_initiated() {
// A user initiated abrupt shutdown shouldn't return
@@ -307,33 +307,32 @@ where
"graceful GOAWAY should be NO_ERROR"
);
}
Some(Err(e)) => return Poll::Ready(Err(e.into())),
None => (),
}
ready!(self.poll_ready(cx))?;
match ready!(Pin::new(&mut self.codec).poll_next(cx)) {
Some(Ok(Headers(frame))) => {
match ready!(Pin::new(&mut self.codec).poll_next(cx)?) {
Some(Headers(frame)) => {
log::trace!("recv HEADERS; frame={:?}", frame);
self.streams.recv_headers(frame)?;
}
Some(Ok(Data(frame))) => {
Some(Data(frame)) => {
log::trace!("recv DATA; frame={:?}", frame);
self.streams.recv_data(frame)?;
}
Some(Ok(Reset(frame))) => {
Some(Reset(frame)) => {
log::trace!("recv RST_STREAM; frame={:?}", frame);
self.streams.recv_reset(frame)?;
}
Some(Ok(PushPromise(frame))) => {
Some(PushPromise(frame)) => {
log::trace!("recv PUSH_PROMISE; frame={:?}", frame);
self.streams.recv_push_promise(frame)?;
}
Some(Ok(Settings(frame))) => {
Some(Settings(frame)) => {
log::trace!("recv SETTINGS; frame={:?}", frame);
self.settings.recv_settings(frame);
}
Some(Ok(GoAway(frame))) => {
Some(GoAway(frame)) => {
log::trace!("recv GOAWAY; frame={:?}", frame);
// This should prevent starting new streams,
// but should allow continuing to process current streams
@@ -342,7 +341,7 @@ where
self.streams.recv_go_away(&frame)?;
self.error = Some(frame.reason());
}
Some(Ok(Ping(frame))) => {
Some(Ping(frame)) => {
log::trace!("recv PING; frame={:?}", frame);
let status = self.ping_pong.recv_ping(frame);
if status.is_shutdown() {
@@ -355,15 +354,14 @@ where
self.go_away(last_processed_id, Reason::NO_ERROR);
}
}
Some(Ok(WindowUpdate(frame))) => {
Some(WindowUpdate(frame)) => {
log::trace!("recv WINDOW_UPDATE; frame={:?}", frame);
self.streams.recv_window_update(frame)?;
}
Some(Ok(Priority(frame))) => {
Some(Priority(frame)) => {
log::trace!("recv PRIORITY; frame={:?}", frame);
// TODO: handle
}
Some(Err(e)) => return Poll::Ready(Err(e)),
None => {
log::trace!("codec closed");
self.streams.recv_eof(false).ok().expect("mutex poisoned");

View File

@@ -9,7 +9,7 @@ edition = "2018"
h2 = { path = "../.." }
env_logger = { version = "0.5.3", default-features = false }
futures-preview = "0.3.0-alpha.18"
futures-preview = "0.3.0-alpha.17"
honggfuzz = "0.5"
http = "0.1.3"
tokio = { git = "https://github.com/tokio-rs/tokio" }
tokio = "0.2.0-alpha.1"

View File

@@ -9,7 +9,7 @@ h2 = { path = "../..", features = ["unstable"] }
bytes = "0.4.7"
env_logger = "0.5.9"
futures-preview = "0.3.0-alpha.18"
futures-preview = "0.3.0-alpha.17"
http = "0.1.5"
string = "0.2"
tokio = { git = "https://github.com/tokio-rs/tokio" }
tokio = "0.2.0-alpha.1"

View File

@@ -7,16 +7,15 @@ pub mod assert;
pub mod raw;
pub mod frames;
pub mod prelude;
pub mod mock;
pub mod mock_io;
pub mod notify;
pub mod prelude;
pub mod util;
mod client_ext;
mod future_ext;
pub use crate::client_ext::{SendRequestExt};
pub use crate::client_ext::SendRequestExt;
pub use crate::future_ext::TestFuture;
pub type WindowSize = usize;

View File

@@ -1,53 +0,0 @@
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering::SeqCst;
pub struct MockNotify {
inner: Arc<Inner>,
}
struct Inner {
notified: AtomicBool,
}
impl MockNotify {
pub fn new() -> Self {
MockNotify {
inner: Arc::new(Inner {
notified: AtomicBool::new(false),
}),
}
}
pub fn with<F: FnOnce() -> R, R>(&self, _f: F) -> R {
unimplemented!();
// use futures::future::poll_fn;
// self.clear();
// let mut f = Some(f);
// let res = tokio::spawn(poll_fn(move |cx| {
// Poll::Ready(f.take().unwrap()())
// })).poll_future_notify(&self.inner, 0);
// match res {
// Poll::Ready(v) => v,
// _ => unreachable!(),
// }
}
pub fn clear(&self) {
self.inner.notified.store(false, SeqCst);
}
pub fn is_notified(&self) -> bool {
self.inner.notified.load(SeqCst)
}
}
// impl Notify for Inner {
// fn notify(&self, _: usize) {
// self.notified.store(true, SeqCst);
// }
// }

View File

@@ -12,9 +12,6 @@ pub use super::mock::{self, idle_ms};
// Re-export frames helpers
pub use super::frames;
// Re-export mock notify
pub use super::notify::MockNotify;
// Re-export utility mod
pub use super::util;

View File

@@ -10,5 +10,5 @@ edition = "2018"
[dev-dependencies]
h2-support = { path = "../h2-support" }
log = "0.4.1"
futures-preview = "0.3.0-alpha.18"
tokio = { git = "https://github.com/tokio-rs/tokio" }
futures-preview = "0.3.0-alpha.17"
tokio = "0.2.0-alpha.1"