Make handshake an async fn; other cleanup

This commit is contained in:
Gurwinder Singh
2019-08-17 06:39:25 +05:30
committed by Sean McArthur
parent ad7ffa795f
commit b039ef25bc
4 changed files with 157 additions and 170 deletions

View File

@@ -149,35 +149,12 @@ use futures::{ready, FutureExt, Stream};
use http::{uri, HeaderMap, Method, Request, Response, Version}; use http::{uri, HeaderMap, Method, Request, Response, Version};
use std::fmt; use std::fmt;
use std::future::Future; use std::future::Future;
use std::io;
use std::marker::PhantomData;
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use std::time::Duration; use std::time::Duration;
use std::usize; use std::usize;
use tokio_io::{AsyncRead, AsyncWrite, AsyncWriteExt}; use tokio_io::{AsyncRead, AsyncWrite, AsyncWriteExt};
/// Performs the HTTP/2.0 connection handshake.
///
/// This type implements `Future`, yielding a `(SendRequest, Connection)`
/// instance once the handshake has completed.
///
/// The handshake is completed once both the connection preface and the initial
/// settings frame is sent by the client.
///
/// The handshake future does not wait for the initial settings frame from the
/// server.
///
/// See [module] level documentation for more details.
///
/// [module]: index.html
#[must_use = "futures do nothing unless polled"]
pub struct Handshake<'a, T, B = Bytes> {
builder: Builder,
inner: Pin<Box<dyn Future<Output = io::Result<T>> + 'a>>,
_marker: PhantomData<fn(B)>,
}
/// Initializes new HTTP/2.0 streams on a connection by sending a request. /// Initializes new HTTP/2.0 streams on a connection by sending a request.
/// ///
/// This type does no work itself. Instead, it is a handle to the inner /// This type does no work itself. Instead, it is a handle to the inner
@@ -236,20 +213,21 @@ pub struct ReadySendRequest<B: IntoBuf> {
/// # Examples /// # Examples
/// ///
/// ``` /// ```
/// #![feature(async_await)] /// # #![feature(async_await)]
/// # use tokio_io::*; /// # use tokio_io::*;
/// # use h2::client; /// # use h2::client;
/// # use h2::client::*; /// # use h2::client::*;
/// # /// #
/// # async fn doc<T>(my_io: T) /// # async fn doc<T>(my_io: T) -> Result<(), h2::Error>
/// # where T: AsyncRead + AsyncWrite + Send + Unpin + 'static, /// # where T: AsyncRead + AsyncWrite + Send + Unpin + 'static,
/// # { /// # {
/// let (send_request, connection) = client::handshake(my_io).await.unwrap(); /// let (send_request, connection) = client::handshake(my_io).await?;
/// // Submit the connection handle to an executor. /// // Submit the connection handle to an executor.
/// tokio::spawn(async { connection.await.expect("connection failed"); }); /// tokio::spawn(async { connection.await.expect("connection failed"); });
/// ///
/// // Now, use `send_request` to initialize HTTP/2.0 streams. /// // Now, use `send_request` to initialize HTTP/2.0 streams.
/// // ... /// // ...
/// # Ok(())
/// # } /// # }
/// # /// #
/// # pub fn main() {} /// # pub fn main() {}
@@ -314,11 +292,13 @@ pub struct PushPromises {
/// # Examples /// # Examples
/// ///
/// ``` /// ```
/// # #![feature(async_await)]
/// # use tokio_io::*; /// # use tokio_io::*;
/// # use h2::client::*; /// # use h2::client::*;
/// # use bytes::Bytes;
/// # /// #
/// # fn doc<'a, T: AsyncRead + AsyncWrite + Unpin + 'a>(my_io: T) /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
/// # -> Handshake<'a, T> /// -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
/// # { /// # {
/// // `client_fut` is a future representing the completion of the HTTP/2.0 /// // `client_fut` is a future representing the completion of the HTTP/2.0
/// // handshake. /// // handshake.
@@ -326,7 +306,7 @@ pub struct PushPromises {
/// .initial_window_size(1_000_000) /// .initial_window_size(1_000_000)
/// .max_concurrent_streams(1000) /// .max_concurrent_streams(1000)
/// .handshake(my_io); /// .handshake(my_io);
/// # client_fut /// # client_fut.await
/// # } /// # }
/// # /// #
/// # pub fn main() {} /// # pub fn main() {}
@@ -394,7 +374,7 @@ where
/// # Examples /// # Examples
/// ///
/// ```rust /// ```rust
/// #![feature(async_await)] /// # #![feature(async_await)]
/// # use h2::client::*; /// # use h2::client::*;
/// # use http::*; /// # use http::*;
/// # async fn doc(send_request: SendRequest<&'static [u8]>) /// # async fn doc(send_request: SendRequest<&'static [u8]>)
@@ -454,7 +434,7 @@ where
/// Sending a request with no body /// Sending a request with no body
/// ///
/// ```rust /// ```rust
/// #![feature(async_await)] /// # #![feature(async_await)]
/// # use h2::client::*; /// # use h2::client::*;
/// # use http::*; /// # use http::*;
/// # async fn doc(send_request: SendRequest<&'static [u8]>) /// # async fn doc(send_request: SendRequest<&'static [u8]>)
@@ -479,7 +459,7 @@ where
/// Sending a request with a body and trailers /// Sending a request with a body and trailers
/// ///
/// ```rust /// ```rust
/// #![feature(async_await)] /// # #![feature(async_await)]
/// # use h2::client::*; /// # use h2::client::*;
/// # use http::*; /// # use http::*;
/// # async fn doc(send_request: SendRequest<&'static [u8]>) /// # async fn doc(send_request: SendRequest<&'static [u8]>)
@@ -625,11 +605,13 @@ impl Builder {
/// # Examples /// # Examples
/// ///
/// ``` /// ```
/// # #![feature(async_await)]
/// # use tokio_io::*; /// # use tokio_io::*;
/// # use h2::client::*; /// # use h2::client::*;
/// # use bytes::Bytes;
/// # /// #
/// # fn doc<'a, T: AsyncRead + AsyncWrite + Unpin + 'a>(my_io: T) /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
/// # -> Handshake<'a, T> /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
/// # { /// # {
/// // `client_fut` is a future representing the completion of the HTTP/2.0 /// // `client_fut` is a future representing the completion of the HTTP/2.0
/// // handshake. /// // handshake.
@@ -637,7 +619,7 @@ impl Builder {
/// .initial_window_size(1_000_000) /// .initial_window_size(1_000_000)
/// .max_concurrent_streams(1000) /// .max_concurrent_streams(1000)
/// .handshake(my_io); /// .handshake(my_io);
/// # client_fut /// # client_fut.await
/// # } /// # }
/// # /// #
/// # pub fn main() {} /// # pub fn main() {}
@@ -666,18 +648,20 @@ impl Builder {
/// # Examples /// # Examples
/// ///
/// ``` /// ```
/// # #![feature(async_await)]
/// # use tokio_io::*; /// # use tokio_io::*;
/// # use h2::client::*; /// # use h2::client::*;
/// # use bytes::Bytes;
/// # /// #
/// # fn doc<'a, T: AsyncRead + AsyncWrite + Unpin + 'a>(my_io: T) /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
/// # -> Handshake<'a, T> /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
/// # { /// # {
/// // `client_fut` is a future representing the completion of the HTTP/2.0 /// // `client_fut` is a future representing the completion of the HTTP/2.0
/// // handshake. /// // handshake.
/// let client_fut = Builder::new() /// let client_fut = Builder::new()
/// .initial_window_size(1_000_000) /// .initial_window_size(1_000_000)
/// .handshake(my_io); /// .handshake(my_io);
/// # client_fut /// # client_fut.await
/// # } /// # }
/// # /// #
/// # pub fn main() {} /// # pub fn main() {}
@@ -700,18 +684,20 @@ impl Builder {
/// # Examples /// # Examples
/// ///
/// ``` /// ```
/// # #![feature(async_await)]
/// # use tokio_io::*; /// # use tokio_io::*;
/// # use h2::client::*; /// # use h2::client::*;
/// # use bytes::Bytes;
/// # /// #
/// # fn doc<'a, T: AsyncRead + AsyncWrite + Unpin + 'a>(my_io: T) /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
/// # -> Handshake<'a, T> /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
/// # { /// # {
/// // `client_fut` is a future representing the completion of the HTTP/2.0 /// // `client_fut` is a future representing the completion of the HTTP/2.0
/// // handshake. /// // handshake.
/// let client_fut = Builder::new() /// let client_fut = Builder::new()
/// .initial_connection_window_size(1_000_000) /// .initial_connection_window_size(1_000_000)
/// .handshake(my_io); /// .handshake(my_io);
/// # client_fut /// # client_fut.await
/// # } /// # }
/// # /// #
/// # pub fn main() {} /// # pub fn main() {}
@@ -733,18 +719,20 @@ impl Builder {
/// # Examples /// # Examples
/// ///
/// ``` /// ```
/// # #![feature(async_await)]
/// # use tokio_io::*; /// # use tokio_io::*;
/// # use h2::client::*; /// # use h2::client::*;
/// # use bytes::Bytes;
/// # /// #
/// # fn doc<'a, T: AsyncRead + AsyncWrite + Unpin + 'a>(my_io: T) /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
/// # -> Handshake<'a, T> /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
/// # { /// # {
/// // `client_fut` is a future representing the completion of the HTTP/2.0 /// // `client_fut` is a future representing the completion of the HTTP/2.0
/// // handshake. /// // handshake.
/// let client_fut = Builder::new() /// let client_fut = Builder::new()
/// .max_frame_size(1_000_000) /// .max_frame_size(1_000_000)
/// .handshake(my_io); /// .handshake(my_io);
/// # client_fut /// # client_fut.await
/// # } /// # }
/// # /// #
/// # pub fn main() {} /// # pub fn main() {}
@@ -772,18 +760,20 @@ impl Builder {
/// # Examples /// # Examples
/// ///
/// ``` /// ```
/// # #![feature(async_await)]
/// # use tokio_io::*; /// # use tokio_io::*;
/// # use h2::client::*; /// # use h2::client::*;
/// # use bytes::Bytes;
/// # /// #
/// # fn doc<'a, T: AsyncRead + AsyncWrite + Unpin + 'a>(my_io: T) /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
/// # -> Handshake<'a, T> /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
/// # { /// # {
/// // `client_fut` is a future representing the completion of the HTTP/2.0 /// // `client_fut` is a future representing the completion of the HTTP/2.0
/// // handshake. /// // handshake.
/// let client_fut = Builder::new() /// let client_fut = Builder::new()
/// .max_header_list_size(16 * 1024) /// .max_header_list_size(16 * 1024)
/// .handshake(my_io); /// .handshake(my_io);
/// # client_fut /// # client_fut.await
/// # } /// # }
/// # /// #
/// # pub fn main() {} /// # pub fn main() {}
@@ -820,18 +810,20 @@ impl Builder {
/// # Examples /// # Examples
/// ///
/// ``` /// ```
/// # #![feature(async_await)]
/// # use tokio_io::*; /// # use tokio_io::*;
/// # use h2::client::*; /// # use h2::client::*;
/// # use bytes::Bytes;
/// # /// #
/// # fn doc<'a, T: AsyncRead + AsyncWrite + Unpin + 'a>(my_io: T) /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
/// # -> Handshake<'a, T> /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
/// # { /// # {
/// // `client_fut` is a future representing the completion of the HTTP/2.0 /// // `client_fut` is a future representing the completion of the HTTP/2.0
/// // handshake. /// // handshake.
/// let client_fut = Builder::new() /// let client_fut = Builder::new()
/// .max_concurrent_streams(1000) /// .max_concurrent_streams(1000)
/// .handshake(my_io); /// .handshake(my_io);
/// # client_fut /// # client_fut.await
/// # } /// # }
/// # /// #
/// # pub fn main() {} /// # pub fn main() {}
@@ -860,18 +852,20 @@ impl Builder {
/// # Examples /// # Examples
/// ///
/// ``` /// ```
/// # #![feature(async_await)]
/// # use tokio_io::*; /// # use tokio_io::*;
/// # use h2::client::*; /// # use h2::client::*;
/// # use bytes::Bytes;
/// # /// #
/// # fn doc<'a, T: AsyncRead + AsyncWrite + Unpin + 'a>(my_io: T) /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
/// # -> Handshake<'a, T> /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
/// # { /// # {
/// // `client_fut` is a future representing the completion of the HTTP/2.0 /// // `client_fut` is a future representing the completion of the HTTP/2.0
/// // handshake. /// // handshake.
/// let client_fut = Builder::new() /// let client_fut = Builder::new()
/// .initial_max_send_streams(1000) /// .initial_max_send_streams(1000)
/// .handshake(my_io); /// .handshake(my_io);
/// # client_fut /// # client_fut.await
/// # } /// # }
/// # /// #
/// # pub fn main() {} /// # pub fn main() {}
@@ -904,18 +898,20 @@ impl Builder {
/// # Examples /// # Examples
/// ///
/// ``` /// ```
/// # #![feature(async_await)]
/// # use tokio_io::*; /// # use tokio_io::*;
/// # use h2::client::*; /// # use h2::client::*;
/// # use bytes::Bytes;
/// # /// #
/// # fn doc<'a, T: AsyncRead + AsyncWrite + Unpin + 'a>(my_io: T) /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
/// # -> Handshake<'a, T> /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
/// # { /// # {
/// // `client_fut` is a future representing the completion of the HTTP/2.0 /// // `client_fut` is a future representing the completion of the HTTP/2.0
/// // handshake. /// // handshake.
/// let client_fut = Builder::new() /// let client_fut = Builder::new()
/// .max_concurrent_reset_streams(1000) /// .max_concurrent_reset_streams(1000)
/// .handshake(my_io); /// .handshake(my_io);
/// # client_fut /// # client_fut.await
/// # } /// # }
/// # /// #
/// # pub fn main() {} /// # pub fn main() {}
@@ -948,19 +944,21 @@ impl Builder {
/// # Examples /// # Examples
/// ///
/// ``` /// ```
/// # #![feature(async_await)]
/// # use tokio_io::*; /// # use tokio_io::*;
/// # use h2::client::*; /// # use h2::client::*;
/// # use std::time::Duration; /// # use std::time::Duration;
/// # use bytes::Bytes;
/// # /// #
/// # fn doc<'a, T: AsyncRead + AsyncWrite + Unpin + 'a>(my_io: T) /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
/// # -> Handshake<'a, T> /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
/// # { /// # {
/// // `client_fut` is a future representing the completion of the HTTP/2.0 /// // `client_fut` is a future representing the completion of the HTTP/2.0
/// // handshake. /// // handshake.
/// let client_fut = Builder::new() /// let client_fut = Builder::new()
/// .reset_stream_duration(Duration::from_secs(10)) /// .reset_stream_duration(Duration::from_secs(10))
/// .handshake(my_io); /// .handshake(my_io);
/// # client_fut /// # client_fut.await
/// # } /// # }
/// # /// #
/// # pub fn main() {} /// # pub fn main() {}
@@ -985,19 +983,21 @@ impl Builder {
/// # Examples /// # Examples
/// ///
/// ``` /// ```
/// # #![feature(async_await)]
/// # use tokio_io::*; /// # use tokio_io::*;
/// # use h2::client::*; /// # use h2::client::*;
/// # use std::time::Duration; /// # use std::time::Duration;
/// # use bytes::Bytes;
/// # /// #
/// # fn doc<'a, T: AsyncRead + AsyncWrite + Unpin + 'a>(my_io: T) /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
/// # -> Handshake<'a, T> /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
/// # { /// # {
/// // `client_fut` is a future representing the completion of the HTTP/2.0 /// // `client_fut` is a future representing the completion of the HTTP/2.0
/// // handshake. /// // handshake.
/// let client_fut = Builder::new() /// let client_fut = Builder::new()
/// .enable_push(false) /// .enable_push(false)
/// .handshake(my_io); /// .handshake(my_io);
/// # client_fut /// # client_fut.await
/// # } /// # }
/// # /// #
/// # pub fn main() {} /// # pub fn main() {}
@@ -1021,7 +1021,11 @@ impl Builder {
/// Creates a new configured HTTP/2.0 client backed by `io`. /// Creates a new configured HTTP/2.0 client backed by `io`.
/// ///
/// It is expected that `io` already be in an appropriate state to commence /// It is expected that `io` already be in an appropriate state to commence
/// the [HTTP/2.0 handshake]. See [Handshake] for more details. /// the [HTTP/2.0 handshake]. The handshake is completed once both the connection
/// preface and the initial settings frame is sent by the client.
///
/// The handshake future does not wait for the initial settings frame from the
/// server.
/// ///
/// Returns a future which resolves to the [`Connection`] / [`SendRequest`] /// Returns a future which resolves to the [`Connection`] / [`SendRequest`]
/// tuple once the HTTP/2.0 handshake has been completed. /// tuple once the HTTP/2.0 handshake has been completed.
@@ -1030,7 +1034,6 @@ impl Builder {
/// type. See [Outbound data type] for more details. /// type. See [Outbound data type] for more details.
/// ///
/// [HTTP/2.0 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader /// [HTTP/2.0 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
/// [Handshake]: ../index.html#handshake
/// [`Connection`]: struct.Connection.html /// [`Connection`]: struct.Connection.html
/// [`SendRequest`]: struct.SendRequest.html /// [`SendRequest`]: struct.SendRequest.html
/// [Outbound data type]: ../index.html#outbound-data-type. /// [Outbound data type]: ../index.html#outbound-data-type.
@@ -1040,17 +1043,19 @@ impl Builder {
/// Basic usage: /// Basic usage:
/// ///
/// ``` /// ```
/// # #![feature(async_await)]
/// # use tokio_io::*; /// # use tokio_io::*;
/// # use h2::client::*; /// # use h2::client::*;
/// # use bytes::Bytes;
/// # /// #
/// # fn doc<'a, T: AsyncRead + AsyncWrite + Unpin + 'a>(my_io: T) /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
/// # -> Handshake<'a, T> /// -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
/// # { /// # {
/// // `client_fut` is a future representing the completion of the HTTP/2.0 /// // `client_fut` is a future representing the completion of the HTTP/2.0
/// // handshake. /// // handshake.
/// let client_fut = Builder::new() /// let client_fut = Builder::new()
/// .handshake(my_io); /// .handshake(my_io);
/// # client_fut /// # client_fut.await
/// # } /// # }
/// # /// #
/// # pub fn main() {} /// # pub fn main() {}
@@ -1060,24 +1065,28 @@ impl Builder {
/// type will be `&'static [u8]`. /// type will be `&'static [u8]`.
/// ///
/// ``` /// ```
/// # #![feature(async_await)]
/// # use tokio_io::*; /// # use tokio_io::*;
/// # use h2::client::*; /// # use h2::client::*;
/// # /// #
/// # fn doc<'a, T: AsyncRead + AsyncWrite + Unpin + 'a>(my_io: T) /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
/// # -> Handshake<'a, T, &'static [u8]> /// # -> Result<((SendRequest<&'static [u8]>, Connection<T, &'static [u8]>)), h2::Error>
/// # { /// # {
/// // `client_fut` is a future representing the completion of the HTTP/2.0 /// // `client_fut` is a future representing the completion of the HTTP/2.0
/// // handshake. /// // handshake.
/// let client_fut: Handshake<_, &'static [u8]> = Builder::new() /// let client_fut = Builder::new()
/// .handshake(my_io); /// .handshake::<_, &'static [u8]>(my_io);
/// # client_fut /// # client_fut.await
/// # } /// # }
/// # /// #
/// # pub fn main() {} /// # pub fn main() {}
/// ``` /// ```
pub fn handshake<'a, T, B>(&self, io: T) -> Handshake<'a, T, B> pub fn handshake<T, B>(
&self,
io: T,
) -> impl Future<Output = Result<(SendRequest<B>, Connection<T, B>), crate::Error>>
where where
T: AsyncRead + AsyncWrite + Unpin + 'a, T: AsyncRead + AsyncWrite + Unpin,
B: IntoBuf + Unpin, B: IntoBuf + Unpin,
B::Buf: Unpin + 'static, B::Buf: Unpin + 'static,
{ {
@@ -1111,47 +1120,86 @@ impl Default for Builder {
/// # Examples /// # Examples
/// ///
/// ``` /// ```
/// #![feature(async_await)] /// # #![feature(async_await)]
/// # use tokio_io::*; /// # use tokio_io::*;
/// # use h2::client; /// # use h2::client;
/// # use h2::client::*; /// # use h2::client::*;
/// # /// #
/// # async fn doc<'a, T: AsyncRead + AsyncWrite + Unpin + 'a>(my_io: T) /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) -> Result<(), h2::Error>
/// # { /// # {
/// let (send_request, connection) = client::handshake(my_io).await.unwrap(); /// let (send_request, connection) = client::handshake(my_io).await?;
/// // The HTTP/2.0 handshake has completed, now start polling /// // The HTTP/2.0 handshake has completed, now start polling
/// // `connection` and use `send_request` to send requests to the /// // `connection` and use `send_request` to send requests to the
/// // server. /// // server.
/// # Ok(())
/// # } /// # }
/// # /// #
/// # pub fn main() {} /// # pub fn main() {}
/// ``` /// ```
pub fn handshake<'a, T>(io: T) -> Handshake<'a, T, Bytes> pub async fn handshake<T>(io: T) -> Result<(SendRequest<Bytes>, Connection<T, Bytes>), crate::Error>
where where
T: AsyncRead + AsyncWrite + Unpin + 'a, T: AsyncRead + AsyncWrite + Unpin,
{ {
Builder::new().handshake(io) let builder = Builder::new();
builder.handshake(io).await
} }
// ===== impl Connection ===== // ===== impl Connection =====
impl<'a, T, B> Connection<T, B> impl<T, B> Connection<T, B>
where where
T: AsyncRead + AsyncWrite + Unpin + 'a, T: AsyncRead + AsyncWrite + Unpin,
B: IntoBuf + Unpin, B: IntoBuf + Unpin,
B::Buf: Unpin, B::Buf: Unpin,
{ {
fn handshake2(mut io: T, builder: Builder) -> Handshake<'a, T, B> { async fn handshake2(
mut io: T,
builder: Builder,
) -> Result<(SendRequest<B>, Connection<T, B>), crate::Error> {
log::debug!("binding client connection"); log::debug!("binding client connection");
let msg: &'static [u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"; let msg: &'static [u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
let handshake = Box::pin(async move { io.write_all(msg).await.map(|_| io) }); io.write_all(msg).await?;
Handshake { log::debug!("client connection bound");
builder,
inner: handshake, // Create the codec
_marker: PhantomData, let mut codec = Codec::new(io);
if let Some(max) = builder.settings.max_frame_size() {
codec.set_max_recv_frame_size(max as usize);
} }
if let Some(max) = builder.settings.max_header_list_size() {
codec.set_max_recv_header_list_size(max as usize);
}
// Send initial settings frame
codec
.buffer(builder.settings.clone().into())
.expect("invalid SETTINGS frame");
let inner = proto::Connection::new(
codec,
proto::Config {
next_stream_id: builder.stream_id,
initial_max_send_streams: builder.initial_max_send_streams,
reset_stream_duration: builder.reset_stream_duration,
reset_stream_max: builder.reset_stream_max,
settings: builder.settings.clone(),
},
);
let send_request = SendRequest {
inner: inner.streams().clone(),
pending: None,
};
let mut connection = Connection { inner };
if let Some(sz) = builder.initial_target_connection_window_size {
connection.set_target_window_size(sz);
}
Ok((send_request, connection))
} }
/// Sets the target window size for the whole connection. /// Sets the target window size for the whole connection.
@@ -1212,73 +1260,6 @@ where
} }
} }
// ===== impl Handshake =====
impl<'a, T, B> Future for Handshake<'_, T, B>
where
T: AsyncRead + AsyncWrite + Unpin + 'a,
B: IntoBuf + Unpin,
B::Buf: Unpin + 'static,
{
type Output = Result<(SendRequest<B>, Connection<T, B>), crate::Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let io = ready!(self.inner.poll_unpin(cx))?;
log::debug!("client connection bound");
// Create the codec
let mut codec = Codec::new(io);
if let Some(max) = self.builder.settings.max_frame_size() {
codec.set_max_recv_frame_size(max as usize);
}
if let Some(max) = self.builder.settings.max_header_list_size() {
codec.set_max_recv_header_list_size(max as usize);
}
// Send initial settings frame
codec
.buffer(self.builder.settings.clone().into())
.expect("invalid SETTINGS frame");
let inner = proto::Connection::new(
codec,
proto::Config {
next_stream_id: self.builder.stream_id,
initial_max_send_streams: self.builder.initial_max_send_streams,
reset_stream_duration: self.builder.reset_stream_duration,
reset_stream_max: self.builder.reset_stream_max,
settings: self.builder.settings.clone(),
},
);
let send_request = SendRequest {
inner: inner.streams().clone(),
pending: None,
};
let mut connection = Connection { inner };
if let Some(sz) = self.builder.initial_target_connection_window_size {
connection.set_target_window_size(sz);
}
Poll::Ready(Ok((send_request, connection)))
}
}
impl<T, B> fmt::Debug for Handshake<'_, T, B>
where
T: AsyncRead + AsyncWrite,
T: fmt::Debug,
B: fmt::Debug + IntoBuf,
B::Buf: fmt::Debug + IntoBuf,
{
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "client::Handshake")
}
}
// ===== impl ResponseFuture ===== // ===== impl ResponseFuture =====
impl Future for ResponseFuture { impl Future for ResponseFuture {

View File

@@ -177,7 +177,7 @@ pub struct Handshake<T, B: IntoBuf = Bytes> {
/// # Examples /// # Examples
/// ///
/// ``` /// ```
/// #![feature(async_await)] /// # #![feature(async_await)]
/// # use futures::StreamExt; /// # use futures::StreamExt;
/// # use tokio_io::*; /// # use tokio_io::*;
/// # use h2::server; /// # use h2::server;
@@ -314,7 +314,7 @@ const PREFACE: [u8; 24] = *b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
/// # Examples /// # Examples
/// ///
/// ``` /// ```
/// #![feature(async_await)] /// # #![feature(async_await)]
/// # use tokio_io::*; /// # use tokio_io::*;
/// # use h2::server; /// # use h2::server;
/// # use h2::server::*; /// # use h2::server::*;

View File

@@ -267,7 +267,7 @@ impl<B: IntoBuf> SendStream<B> {
/// is sent. For example: /// is sent. For example:
/// ///
/// ```rust /// ```rust
/// #![feature(async_await)] /// # #![feature(async_await)]
/// # use h2::*; /// # use h2::*;
/// # async fn doc(mut send_stream: SendStream<&'static [u8]>) { /// # async fn doc(mut send_stream: SendStream<&'static [u8]>) {
/// send_stream.reserve_capacity(100); /// send_stream.reserve_capacity(100);
@@ -557,7 +557,7 @@ impl PingPong {
/// # Example /// # Example
/// ///
/// ``` /// ```
/// #![feature(async_await)] /// # #![feature(async_await)]
/// # async fn doc(mut ping_pong: h2::PingPong) { /// # async fn doc(mut ping_pong: h2::PingPong) {
/// // let mut ping_pong = ... /// // let mut ping_pong = ...
/// ///

View File

@@ -1,13 +1,13 @@
#![feature(async_await)] #![feature(async_await)]
use std::future::Future;
use futures::Stream;
use std::task::{Context, Poll};
use std::pin::Pin;
use futures::future; use futures::future;
use http::{Method, Request};
use std::io;
use tokio_io::{AsyncRead, AsyncWrite};
use futures::stream::FuturesUnordered; use futures::stream::FuturesUnordered;
use futures::Stream;
use http::{Method, Request};
use std::future::Future;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::{AsyncRead, AsyncWrite};
struct MockIo<'a> { struct MockIo<'a> {
input: &'a [u8], input: &'a [u8],
@@ -33,7 +33,11 @@ impl<'a> AsyncRead for MockIo<'a> {
false false
} }
fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> { fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
let mut len = self.next_u32() as usize; let mut len = self.next_u32() as usize;
if self.input.is_empty() { if self.input.is_empty() {
Poll::Ready(Ok(0)) Poll::Ready(Ok(0))
@@ -56,7 +60,11 @@ impl<'a> AsyncRead for MockIo<'a> {
} }
impl<'a> AsyncWrite for MockIo<'a> { impl<'a> AsyncWrite for MockIo<'a> {
fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> { fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
let len = std::cmp::min(self.next_u32() as usize, buf.len()); let len = std::cmp::min(self.next_u32() as usize, buf.len());
if len == 0 { if len == 0 {
if self.input.is_empty() { if self.input.is_empty() {
@@ -73,7 +81,6 @@ impl<'a> AsyncWrite for MockIo<'a> {
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> { fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(())) Poll::Ready(Ok(()))
} }
fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> { fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(())) Poll::Ready(Ok(()))
} }
@@ -111,7 +118,6 @@ async fn run(script: &[u8]) -> Result<(), h2::Error> {
} }
Poll::Pending Poll::Pending
}); });
future.await?; future.await?;
Ok(()) Ok(())
} }