From b039ef25bc820fa56e6ec1af6835b9321276cfa9 Mon Sep 17 00:00:00 2001 From: Gurwinder Singh Date: Sat, 17 Aug 2019 06:39:25 +0530 Subject: [PATCH] Make handshake an async fn; other cleanup --- src/client.rs | 291 ++++++++++++++++++-------------------- src/server.rs | 4 +- src/share.rs | 4 +- tests/h2-fuzz/src/main.rs | 28 ++-- 4 files changed, 157 insertions(+), 170 deletions(-) diff --git a/src/client.rs b/src/client.rs index 5854611..70b84fe 100644 --- a/src/client.rs +++ b/src/client.rs @@ -149,35 +149,12 @@ use futures::{ready, FutureExt, Stream}; use http::{uri, HeaderMap, Method, Request, Response, Version}; use std::fmt; use std::future::Future; -use std::io; -use std::marker::PhantomData; use std::pin::Pin; use std::task::{Context, Poll}; use std::time::Duration; use std::usize; use tokio_io::{AsyncRead, AsyncWrite, AsyncWriteExt}; -/// Performs the HTTP/2.0 connection handshake. -/// -/// This type implements `Future`, yielding a `(SendRequest, Connection)` -/// instance once the handshake has completed. -/// -/// The handshake is completed once both the connection preface and the initial -/// settings frame is sent by the client. -/// -/// The handshake future does not wait for the initial settings frame from the -/// server. -/// -/// See [module] level documentation for more details. -/// -/// [module]: index.html -#[must_use = "futures do nothing unless polled"] -pub struct Handshake<'a, T, B = Bytes> { - builder: Builder, - inner: Pin> + 'a>>, - _marker: PhantomData, -} - /// Initializes new HTTP/2.0 streams on a connection by sending a request. /// /// This type does no work itself. Instead, it is a handle to the inner @@ -236,20 +213,21 @@ pub struct ReadySendRequest { /// # Examples /// /// ``` -/// #![feature(async_await)] +/// # #![feature(async_await)] /// # use tokio_io::*; /// # use h2::client; /// # use h2::client::*; /// # -/// # async fn doc(my_io: T) +/// # async fn doc(my_io: T) -> Result<(), h2::Error> /// # where T: AsyncRead + AsyncWrite + Send + Unpin + 'static, /// # { -/// let (send_request, connection) = client::handshake(my_io).await.unwrap(); +/// let (send_request, connection) = client::handshake(my_io).await?; /// // Submit the connection handle to an executor. /// tokio::spawn(async { connection.await.expect("connection failed"); }); /// /// // Now, use `send_request` to initialize HTTP/2.0 streams. /// // ... +/// # Ok(()) /// # } /// # /// # pub fn main() {} @@ -314,11 +292,13 @@ pub struct PushPromises { /// # Examples /// /// ``` +/// # #![feature(async_await)] /// # use tokio_io::*; /// # use h2::client::*; +/// # use bytes::Bytes; /// # -/// # fn doc<'a, T: AsyncRead + AsyncWrite + Unpin + 'a>(my_io: T) -/// # -> Handshake<'a, T> +/// # async fn doc(my_io: T) +/// -> Result<((SendRequest, Connection)), h2::Error> /// # { /// // `client_fut` is a future representing the completion of the HTTP/2.0 /// // handshake. @@ -326,7 +306,7 @@ pub struct PushPromises { /// .initial_window_size(1_000_000) /// .max_concurrent_streams(1000) /// .handshake(my_io); -/// # client_fut +/// # client_fut.await /// # } /// # /// # pub fn main() {} @@ -394,7 +374,7 @@ where /// # Examples /// /// ```rust - /// #![feature(async_await)] + /// # #![feature(async_await)] /// # use h2::client::*; /// # use http::*; /// # async fn doc(send_request: SendRequest<&'static [u8]>) @@ -454,7 +434,7 @@ where /// Sending a request with no body /// /// ```rust - /// #![feature(async_await)] + /// # #![feature(async_await)] /// # use h2::client::*; /// # use http::*; /// # async fn doc(send_request: SendRequest<&'static [u8]>) @@ -479,7 +459,7 @@ where /// Sending a request with a body and trailers /// /// ```rust - /// #![feature(async_await)] + /// # #![feature(async_await)] /// # use h2::client::*; /// # use http::*; /// # async fn doc(send_request: SendRequest<&'static [u8]>) @@ -625,11 +605,13 @@ impl Builder { /// # Examples /// /// ``` + /// # #![feature(async_await)] /// # use tokio_io::*; /// # use h2::client::*; + /// # use bytes::Bytes; /// # - /// # fn doc<'a, T: AsyncRead + AsyncWrite + Unpin + 'a>(my_io: T) - /// # -> Handshake<'a, T> + /// # async fn doc(my_io: T) + /// # -> Result<((SendRequest, Connection)), h2::Error> /// # { /// // `client_fut` is a future representing the completion of the HTTP/2.0 /// // handshake. @@ -637,7 +619,7 @@ impl Builder { /// .initial_window_size(1_000_000) /// .max_concurrent_streams(1000) /// .handshake(my_io); - /// # client_fut + /// # client_fut.await /// # } /// # /// # pub fn main() {} @@ -666,18 +648,20 @@ impl Builder { /// # Examples /// /// ``` + /// # #![feature(async_await)] /// # use tokio_io::*; /// # use h2::client::*; + /// # use bytes::Bytes; /// # - /// # fn doc<'a, T: AsyncRead + AsyncWrite + Unpin + 'a>(my_io: T) - /// # -> Handshake<'a, T> + /// # async fn doc(my_io: T) + /// # -> Result<((SendRequest, Connection)), h2::Error> /// # { /// // `client_fut` is a future representing the completion of the HTTP/2.0 /// // handshake. /// let client_fut = Builder::new() /// .initial_window_size(1_000_000) /// .handshake(my_io); - /// # client_fut + /// # client_fut.await /// # } /// # /// # pub fn main() {} @@ -700,18 +684,20 @@ impl Builder { /// # Examples /// /// ``` + /// # #![feature(async_await)] /// # use tokio_io::*; /// # use h2::client::*; + /// # use bytes::Bytes; /// # - /// # fn doc<'a, T: AsyncRead + AsyncWrite + Unpin + 'a>(my_io: T) - /// # -> Handshake<'a, T> + /// # async fn doc(my_io: T) + /// # -> Result<((SendRequest, Connection)), h2::Error> /// # { /// // `client_fut` is a future representing the completion of the HTTP/2.0 /// // handshake. /// let client_fut = Builder::new() /// .initial_connection_window_size(1_000_000) /// .handshake(my_io); - /// # client_fut + /// # client_fut.await /// # } /// # /// # pub fn main() {} @@ -733,18 +719,20 @@ impl Builder { /// # Examples /// /// ``` + /// # #![feature(async_await)] /// # use tokio_io::*; /// # use h2::client::*; + /// # use bytes::Bytes; /// # - /// # fn doc<'a, T: AsyncRead + AsyncWrite + Unpin + 'a>(my_io: T) - /// # -> Handshake<'a, T> + /// # async fn doc(my_io: T) + /// # -> Result<((SendRequest, Connection)), h2::Error> /// # { /// // `client_fut` is a future representing the completion of the HTTP/2.0 /// // handshake. /// let client_fut = Builder::new() /// .max_frame_size(1_000_000) /// .handshake(my_io); - /// # client_fut + /// # client_fut.await /// # } /// # /// # pub fn main() {} @@ -772,18 +760,20 @@ impl Builder { /// # Examples /// /// ``` + /// # #![feature(async_await)] /// # use tokio_io::*; /// # use h2::client::*; + /// # use bytes::Bytes; /// # - /// # fn doc<'a, T: AsyncRead + AsyncWrite + Unpin + 'a>(my_io: T) - /// # -> Handshake<'a, T> + /// # async fn doc(my_io: T) + /// # -> Result<((SendRequest, Connection)), h2::Error> /// # { /// // `client_fut` is a future representing the completion of the HTTP/2.0 /// // handshake. /// let client_fut = Builder::new() /// .max_header_list_size(16 * 1024) /// .handshake(my_io); - /// # client_fut + /// # client_fut.await /// # } /// # /// # pub fn main() {} @@ -820,18 +810,20 @@ impl Builder { /// # Examples /// /// ``` + /// # #![feature(async_await)] /// # use tokio_io::*; /// # use h2::client::*; + /// # use bytes::Bytes; /// # - /// # fn doc<'a, T: AsyncRead + AsyncWrite + Unpin + 'a>(my_io: T) - /// # -> Handshake<'a, T> + /// # async fn doc(my_io: T) + /// # -> Result<((SendRequest, Connection)), h2::Error> /// # { /// // `client_fut` is a future representing the completion of the HTTP/2.0 /// // handshake. /// let client_fut = Builder::new() /// .max_concurrent_streams(1000) /// .handshake(my_io); - /// # client_fut + /// # client_fut.await /// # } /// # /// # pub fn main() {} @@ -860,18 +852,20 @@ impl Builder { /// # Examples /// /// ``` + /// # #![feature(async_await)] /// # use tokio_io::*; /// # use h2::client::*; + /// # use bytes::Bytes; /// # - /// # fn doc<'a, T: AsyncRead + AsyncWrite + Unpin + 'a>(my_io: T) - /// # -> Handshake<'a, T> + /// # async fn doc(my_io: T) + /// # -> Result<((SendRequest, Connection)), h2::Error> /// # { /// // `client_fut` is a future representing the completion of the HTTP/2.0 /// // handshake. /// let client_fut = Builder::new() /// .initial_max_send_streams(1000) /// .handshake(my_io); - /// # client_fut + /// # client_fut.await /// # } /// # /// # pub fn main() {} @@ -904,18 +898,20 @@ impl Builder { /// # Examples /// /// ``` + /// # #![feature(async_await)] /// # use tokio_io::*; /// # use h2::client::*; + /// # use bytes::Bytes; /// # - /// # fn doc<'a, T: AsyncRead + AsyncWrite + Unpin + 'a>(my_io: T) - /// # -> Handshake<'a, T> + /// # async fn doc(my_io: T) + /// # -> Result<((SendRequest, Connection)), h2::Error> /// # { /// // `client_fut` is a future representing the completion of the HTTP/2.0 /// // handshake. /// let client_fut = Builder::new() /// .max_concurrent_reset_streams(1000) /// .handshake(my_io); - /// # client_fut + /// # client_fut.await /// # } /// # /// # pub fn main() {} @@ -948,19 +944,21 @@ impl Builder { /// # Examples /// /// ``` + /// # #![feature(async_await)] /// # use tokio_io::*; /// # use h2::client::*; /// # use std::time::Duration; + /// # use bytes::Bytes; /// # - /// # fn doc<'a, T: AsyncRead + AsyncWrite + Unpin + 'a>(my_io: T) - /// # -> Handshake<'a, T> + /// # async fn doc(my_io: T) + /// # -> Result<((SendRequest, Connection)), h2::Error> /// # { /// // `client_fut` is a future representing the completion of the HTTP/2.0 /// // handshake. /// let client_fut = Builder::new() /// .reset_stream_duration(Duration::from_secs(10)) /// .handshake(my_io); - /// # client_fut + /// # client_fut.await /// # } /// # /// # pub fn main() {} @@ -985,19 +983,21 @@ impl Builder { /// # Examples /// /// ``` + /// # #![feature(async_await)] /// # use tokio_io::*; /// # use h2::client::*; /// # use std::time::Duration; + /// # use bytes::Bytes; /// # - /// # fn doc<'a, T: AsyncRead + AsyncWrite + Unpin + 'a>(my_io: T) - /// # -> Handshake<'a, T> + /// # async fn doc(my_io: T) + /// # -> Result<((SendRequest, Connection)), h2::Error> /// # { /// // `client_fut` is a future representing the completion of the HTTP/2.0 /// // handshake. /// let client_fut = Builder::new() /// .enable_push(false) /// .handshake(my_io); - /// # client_fut + /// # client_fut.await /// # } /// # /// # pub fn main() {} @@ -1021,7 +1021,11 @@ impl Builder { /// Creates a new configured HTTP/2.0 client backed by `io`. /// /// It is expected that `io` already be in an appropriate state to commence - /// the [HTTP/2.0 handshake]. See [Handshake] for more details. + /// the [HTTP/2.0 handshake]. The handshake is completed once both the connection + /// preface and the initial settings frame is sent by the client. + /// + /// The handshake future does not wait for the initial settings frame from the + /// server. /// /// Returns a future which resolves to the [`Connection`] / [`SendRequest`] /// tuple once the HTTP/2.0 handshake has been completed. @@ -1030,7 +1034,6 @@ impl Builder { /// type. See [Outbound data type] for more details. /// /// [HTTP/2.0 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader - /// [Handshake]: ../index.html#handshake /// [`Connection`]: struct.Connection.html /// [`SendRequest`]: struct.SendRequest.html /// [Outbound data type]: ../index.html#outbound-data-type. @@ -1040,17 +1043,19 @@ impl Builder { /// Basic usage: /// /// ``` + /// # #![feature(async_await)] /// # use tokio_io::*; /// # use h2::client::*; + /// # use bytes::Bytes; /// # - /// # fn doc<'a, T: AsyncRead + AsyncWrite + Unpin + 'a>(my_io: T) - /// # -> Handshake<'a, T> + /// # async fn doc(my_io: T) + /// -> Result<((SendRequest, Connection)), h2::Error> /// # { /// // `client_fut` is a future representing the completion of the HTTP/2.0 /// // handshake. /// let client_fut = Builder::new() /// .handshake(my_io); - /// # client_fut + /// # client_fut.await /// # } /// # /// # pub fn main() {} @@ -1060,24 +1065,28 @@ impl Builder { /// type will be `&'static [u8]`. /// /// ``` + /// # #![feature(async_await)] /// # use tokio_io::*; /// # use h2::client::*; /// # - /// # fn doc<'a, T: AsyncRead + AsyncWrite + Unpin + 'a>(my_io: T) - /// # -> Handshake<'a, T, &'static [u8]> + /// # async fn doc(my_io: T) + /// # -> Result<((SendRequest<&'static [u8]>, Connection)), h2::Error> /// # { /// // `client_fut` is a future representing the completion of the HTTP/2.0 /// // handshake. - /// let client_fut: Handshake<_, &'static [u8]> = Builder::new() - /// .handshake(my_io); - /// # client_fut + /// let client_fut = Builder::new() + /// .handshake::<_, &'static [u8]>(my_io); + /// # client_fut.await /// # } /// # /// # pub fn main() {} /// ``` - pub fn handshake<'a, T, B>(&self, io: T) -> Handshake<'a, T, B> + pub fn handshake( + &self, + io: T, + ) -> impl Future, Connection), crate::Error>> where - T: AsyncRead + AsyncWrite + Unpin + 'a, + T: AsyncRead + AsyncWrite + Unpin, B: IntoBuf + Unpin, B::Buf: Unpin + 'static, { @@ -1111,47 +1120,86 @@ impl Default for Builder { /// # Examples /// /// ``` -/// #![feature(async_await)] +/// # #![feature(async_await)] /// # use tokio_io::*; /// # use h2::client; /// # use h2::client::*; /// # -/// # async fn doc<'a, T: AsyncRead + AsyncWrite + Unpin + 'a>(my_io: T) +/// # async fn doc(my_io: T) -> Result<(), h2::Error> /// # { -/// let (send_request, connection) = client::handshake(my_io).await.unwrap(); +/// let (send_request, connection) = client::handshake(my_io).await?; /// // The HTTP/2.0 handshake has completed, now start polling /// // `connection` and use `send_request` to send requests to the /// // server. +/// # Ok(()) /// # } /// # /// # pub fn main() {} /// ``` -pub fn handshake<'a, T>(io: T) -> Handshake<'a, T, Bytes> +pub async fn handshake(io: T) -> Result<(SendRequest, Connection), crate::Error> where - T: AsyncRead + AsyncWrite + Unpin + 'a, + T: AsyncRead + AsyncWrite + Unpin, { - Builder::new().handshake(io) + let builder = Builder::new(); + builder.handshake(io).await } // ===== impl Connection ===== -impl<'a, T, B> Connection +impl Connection where - T: AsyncRead + AsyncWrite + Unpin + 'a, + T: AsyncRead + AsyncWrite + Unpin, B: IntoBuf + Unpin, B::Buf: Unpin, { - fn handshake2(mut io: T, builder: Builder) -> Handshake<'a, T, B> { + async fn handshake2( + mut io: T, + builder: Builder, + ) -> Result<(SendRequest, Connection), crate::Error> { log::debug!("binding client connection"); let msg: &'static [u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"; - let handshake = Box::pin(async move { io.write_all(msg).await.map(|_| io) }); + io.write_all(msg).await?; - Handshake { - builder, - inner: handshake, - _marker: PhantomData, + log::debug!("client connection bound"); + + // Create the codec + let mut codec = Codec::new(io); + + if let Some(max) = builder.settings.max_frame_size() { + codec.set_max_recv_frame_size(max as usize); } + + if let Some(max) = builder.settings.max_header_list_size() { + codec.set_max_recv_header_list_size(max as usize); + } + + // Send initial settings frame + codec + .buffer(builder.settings.clone().into()) + .expect("invalid SETTINGS frame"); + + let inner = proto::Connection::new( + codec, + proto::Config { + next_stream_id: builder.stream_id, + initial_max_send_streams: builder.initial_max_send_streams, + reset_stream_duration: builder.reset_stream_duration, + reset_stream_max: builder.reset_stream_max, + settings: builder.settings.clone(), + }, + ); + let send_request = SendRequest { + inner: inner.streams().clone(), + pending: None, + }; + + let mut connection = Connection { inner }; + if let Some(sz) = builder.initial_target_connection_window_size { + connection.set_target_window_size(sz); + } + + Ok((send_request, connection)) } /// Sets the target window size for the whole connection. @@ -1212,73 +1260,6 @@ where } } -// ===== impl Handshake ===== - -impl<'a, T, B> Future for Handshake<'_, T, B> -where - T: AsyncRead + AsyncWrite + Unpin + 'a, - B: IntoBuf + Unpin, - B::Buf: Unpin + 'static, -{ - type Output = Result<(SendRequest, Connection), crate::Error>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let io = ready!(self.inner.poll_unpin(cx))?; - - log::debug!("client connection bound"); - - // Create the codec - let mut codec = Codec::new(io); - - if let Some(max) = self.builder.settings.max_frame_size() { - codec.set_max_recv_frame_size(max as usize); - } - - if let Some(max) = self.builder.settings.max_header_list_size() { - codec.set_max_recv_header_list_size(max as usize); - } - - // Send initial settings frame - codec - .buffer(self.builder.settings.clone().into()) - .expect("invalid SETTINGS frame"); - - let inner = proto::Connection::new( - codec, - proto::Config { - next_stream_id: self.builder.stream_id, - initial_max_send_streams: self.builder.initial_max_send_streams, - reset_stream_duration: self.builder.reset_stream_duration, - reset_stream_max: self.builder.reset_stream_max, - settings: self.builder.settings.clone(), - }, - ); - let send_request = SendRequest { - inner: inner.streams().clone(), - pending: None, - }; - - let mut connection = Connection { inner }; - if let Some(sz) = self.builder.initial_target_connection_window_size { - connection.set_target_window_size(sz); - } - - Poll::Ready(Ok((send_request, connection))) - } -} - -impl fmt::Debug for Handshake<'_, T, B> -where - T: AsyncRead + AsyncWrite, - T: fmt::Debug, - B: fmt::Debug + IntoBuf, - B::Buf: fmt::Debug + IntoBuf, -{ - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - write!(fmt, "client::Handshake") - } -} - // ===== impl ResponseFuture ===== impl Future for ResponseFuture { diff --git a/src/server.rs b/src/server.rs index 8a1c6fc..dc9d257 100644 --- a/src/server.rs +++ b/src/server.rs @@ -177,7 +177,7 @@ pub struct Handshake { /// # Examples /// /// ``` -/// #![feature(async_await)] +/// # #![feature(async_await)] /// # use futures::StreamExt; /// # use tokio_io::*; /// # use h2::server; @@ -314,7 +314,7 @@ const PREFACE: [u8; 24] = *b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"; /// # Examples /// /// ``` -/// #![feature(async_await)] +/// # #![feature(async_await)] /// # use tokio_io::*; /// # use h2::server; /// # use h2::server::*; diff --git a/src/share.rs b/src/share.rs index 9f28186..4f6bc39 100644 --- a/src/share.rs +++ b/src/share.rs @@ -267,7 +267,7 @@ impl SendStream { /// is sent. For example: /// /// ```rust - /// #![feature(async_await)] + /// # #![feature(async_await)] /// # use h2::*; /// # async fn doc(mut send_stream: SendStream<&'static [u8]>) { /// send_stream.reserve_capacity(100); @@ -557,7 +557,7 @@ impl PingPong { /// # Example /// /// ``` - /// #![feature(async_await)] + /// # #![feature(async_await)] /// # async fn doc(mut ping_pong: h2::PingPong) { /// // let mut ping_pong = ... /// diff --git a/tests/h2-fuzz/src/main.rs b/tests/h2-fuzz/src/main.rs index 4f65e37..6f4e802 100644 --- a/tests/h2-fuzz/src/main.rs +++ b/tests/h2-fuzz/src/main.rs @@ -1,13 +1,13 @@ #![feature(async_await)] -use std::future::Future; -use futures::Stream; -use std::task::{Context, Poll}; -use std::pin::Pin; use futures::future; -use http::{Method, Request}; -use std::io; -use tokio_io::{AsyncRead, AsyncWrite}; use futures::stream::FuturesUnordered; +use futures::Stream; +use http::{Method, Request}; +use std::future::Future; +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio::io::{AsyncRead, AsyncWrite}; struct MockIo<'a> { input: &'a [u8], @@ -33,7 +33,11 @@ impl<'a> AsyncRead for MockIo<'a> { false } - fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll> { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { let mut len = self.next_u32() as usize; if self.input.is_empty() { Poll::Ready(Ok(0)) @@ -56,7 +60,11 @@ impl<'a> AsyncRead for MockIo<'a> { } impl<'a> AsyncWrite for MockIo<'a> { - fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll> { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { let len = std::cmp::min(self.next_u32() as usize, buf.len()); if len == 0 { if self.input.is_empty() { @@ -73,7 +81,6 @@ impl<'a> AsyncWrite for MockIo<'a> { fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } - fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } @@ -111,7 +118,6 @@ async fn run(script: &[u8]) -> Result<(), h2::Error> { } Poll::Pending }); - future.await?; Ok(()) }