## Motivation
Currently, the `tracing` spans for the client and server handshakes
contain the name of the I/O type. In some cases, where nested I/O types
are in use, these names can be quite long; for example, in Linkerd, we
see log lines like this:
```
2022-03-07T23:38:15.322506670Z [ 10533.916262s] DEBUG ThreadId(01) inbound:accept{client.addr=192.168.1.9:1227}:server{port=4143}:direct:gateway{dst=server.echo.svc.cluster.local:8080}:server_handshake{io=hyper::common::io:⏪:Rewind<linkerd_io::either::EitherIo<linkerd_io::sensor::SensorIo<linkerd_io::prefixed::PrefixedIo<linkerd_io::either::EitherIo<tokio_rustls::server::TlsStream<linkerd_io::either::EitherIo<linkerd_io::scoped::ScopedIo<tokio::net::tcp::stream::TcpStream>, linkerd_io::prefixed::PrefixedIo<linkerd_io::scoped::ScopedIo<tokio::net::tcp::stream::TcpStream>>>>, linkerd_io::either::EitherIo<linkerd_io::scoped::ScopedIo<tokio::net::tcp::stream::TcpStream>, linkerd_io::prefixed::PrefixedIo<linkerd_io::scoped::ScopedIo<tokio::net::tcp::stream::TcpStream>>>>>, linkerd_transport_metrics::sensor::Sensor>, linkerd_io::sensor::SensorIo<linkerd_io::either::EitherIo<tokio_rustls::server::TlsStream<linkerd_io::either::EitherIo<linkerd_io::scoped::ScopedIo<tokio::net::tcp::stream::TcpStream>, linkerd_io::prefixed::PrefixedIo<linkerd_io::scoped::ScopedIo<tokio::net::tcp::stream::TcpStream>>>>, linkerd_io::either::EitherIo<linkerd_io::scoped::ScopedIo<tokio::net::tcp::stream::TcpStream>, linkerd_io::prefixed::PrefixedIo<linkerd_io::scoped::ScopedIo<tokio::net::tcp::stream::TcpStream>>>>, linkerd_transport_metrics::sensor::Sensor>>>}:FramedWrite::buffer{frame=Settings { flags: (0x0), initial_window_size: 65535, max_frame_size: 16384 }}: h2::codec::framed_write: send frame=Settings { flags: (0x0), initial_window_size: 65535, max_frame_size: 16384 }
```
which is kinda not great.
## Solution
This branch removes the IO type's type name from the spans for the
server and client handshakes. In practice, these are not particularly
useful, because a given server or client instance is parameterized over
the IO types and will only serve connections of that type.
1556 lines
53 KiB
Rust
1556 lines
53 KiB
Rust
//! Client implementation of the HTTP/2 protocol.
|
|
//!
|
|
//! # Getting started
|
|
//!
|
|
//! Running an HTTP/2 client requires the caller to establish the underlying
|
|
//! connection as well as get the connection to a state that is ready to begin
|
|
//! the HTTP/2 handshake. See [here](../index.html#handshake) for more
|
|
//! details.
|
|
//!
|
|
//! This could be as basic as using Tokio's [`TcpStream`] to connect to a remote
|
|
//! host, but usually it means using either ALPN or HTTP/1.1 protocol upgrades.
|
|
//!
|
|
//! Once a connection is obtained, it is passed to [`handshake`], which will
|
|
//! begin the [HTTP/2 handshake]. This returns a future that completes once
|
|
//! the handshake process is performed and HTTP/2 streams may be initialized.
|
|
//!
|
|
//! [`handshake`] uses default configuration values. There are a number of
|
|
//! settings that can be changed by using [`Builder`] instead.
|
|
//!
|
|
//! Once the handshake future completes, the caller is provided with a
|
|
//! [`Connection`] instance and a [`SendRequest`] instance. The [`Connection`]
|
|
//! instance is used to drive the connection (see [Managing the connection]).
|
|
//! The [`SendRequest`] instance is used to initialize new streams (see [Making
|
|
//! requests]).
|
|
//!
|
|
//! # Making requests
|
|
//!
|
|
//! Requests are made using the [`SendRequest`] handle provided by the handshake
|
|
//! future. Once a request is submitted, an HTTP/2 stream is initialized and
|
|
//! the request is sent to the server.
|
|
//!
|
|
//! A request body and request trailers are sent using [`SendRequest`] and the
|
|
//! server's response is returned once the [`ResponseFuture`] future completes.
|
|
//! Both the [`SendStream`] and [`ResponseFuture`] instances are returned by
|
|
//! [`SendRequest::send_request`] and are tied to the HTTP/2 stream
|
|
//! initialized by the sent request.
|
|
//!
|
|
//! The [`SendRequest::poll_ready`] function returns `Ready` when a new HTTP/2
|
|
//! stream can be created, i.e. as long as the current number of active streams
|
|
//! is below [`MAX_CONCURRENT_STREAMS`]. If a new stream cannot be created, the
|
|
//! caller will be notified once an existing stream closes, freeing capacity for
|
|
//! the caller. The caller should use [`SendRequest::poll_ready`] to check for
|
|
//! capacity before sending a request to the server.
|
|
//!
|
|
//! [`SendRequest`] enforces the [`MAX_CONCURRENT_STREAMS`] setting. The user
|
|
//! must not send a request if `poll_ready` does not return `Ready`. Attempting
|
|
//! to do so will result in an [`Error`] being returned.
|
|
//!
|
|
//! # Managing the connection
|
|
//!
|
|
//! The [`Connection`] instance is used to manage connection state. The caller
|
|
//! is required to call [`Connection::poll`] in order to advance state.
|
|
//! [`SendRequest::send_request`] and other functions have no effect unless
|
|
//! [`Connection::poll`] is called.
|
|
//!
|
|
//! The [`Connection`] instance should only be dropped once [`Connection::poll`]
|
|
//! returns `Ready`. At this point, the underlying socket has been closed and no
|
|
//! further work needs to be done.
|
|
//!
|
|
//! The easiest way to ensure that the [`Connection`] instance gets polled is to
|
|
//! submit the [`Connection`] instance to an [executor]. The executor will then
|
|
//! manage polling the connection until the connection is complete.
|
|
//! Alternatively, the caller can call `poll` manually.
|
|
//!
|
|
//! # Example
|
|
//!
|
|
//! ```rust, no_run
|
|
//!
|
|
//! use h2::client;
|
|
//!
|
|
//! use http::{Request, Method};
|
|
//! use std::error::Error;
|
|
//! use tokio::net::TcpStream;
|
|
//!
|
|
//! #[tokio::main]
|
|
//! pub async fn main() -> Result<(), Box<dyn Error>> {
|
|
//! // Establish TCP connection to the server.
|
|
//! let tcp = TcpStream::connect("127.0.0.1:5928").await?;
|
|
//! let (h2, connection) = client::handshake(tcp).await?;
|
|
//! tokio::spawn(async move {
|
|
//! connection.await.unwrap();
|
|
//! });
|
|
//!
|
|
//! let mut h2 = h2.ready().await?;
|
|
//! // Prepare the HTTP request to send to the server.
|
|
//! let request = Request::builder()
|
|
//! .method(Method::GET)
|
|
//! .uri("https://www.example.com/")
|
|
//! .body(())
|
|
//! .unwrap();
|
|
//!
|
|
//! // Send the request. The second tuple item allows the caller
|
|
//! // to stream a request body.
|
|
//! let (response, _) = h2.send_request(request, true).unwrap();
|
|
//!
|
|
//! let (head, mut body) = response.await?.into_parts();
|
|
//!
|
|
//! println!("Received response: {:?}", head);
|
|
//!
|
|
//! // The `flow_control` handle allows the caller to manage
|
|
//! // flow control.
|
|
//! //
|
|
//! // Whenever data is received, the caller is responsible for
|
|
//! // releasing capacity back to the server once it has freed
|
|
//! // the data from memory.
|
|
//! let mut flow_control = body.flow_control().clone();
|
|
//!
|
|
//! while let Some(chunk) = body.data().await {
|
|
//! let chunk = chunk?;
|
|
//! println!("RX: {:?}", chunk);
|
|
//!
|
|
//! // Let the server send more data.
|
|
//! let _ = flow_control.release_capacity(chunk.len());
|
|
//! }
|
|
//!
|
|
//! Ok(())
|
|
//! }
|
|
//! ```
|
|
//!
|
|
//! [`TcpStream`]: https://docs.rs/tokio-core/0.1/tokio_core/net/struct.TcpStream.html
|
|
//! [`handshake`]: fn.handshake.html
|
|
//! [executor]: https://docs.rs/futures/0.1/futures/future/trait.Executor.html
|
|
//! [`SendRequest`]: struct.SendRequest.html
|
|
//! [`SendStream`]: ../struct.SendStream.html
|
|
//! [Making requests]: #making-requests
|
|
//! [Managing the connection]: #managing-the-connection
|
|
//! [`Connection`]: struct.Connection.html
|
|
//! [`Connection::poll`]: struct.Connection.html#method.poll
|
|
//! [`SendRequest::send_request`]: struct.SendRequest.html#method.send_request
|
|
//! [`MAX_CONCURRENT_STREAMS`]: http://httpwg.org/specs/rfc7540.html#SettingValues
|
|
//! [`SendRequest`]: struct.SendRequest.html
|
|
//! [`ResponseFuture`]: struct.ResponseFuture.html
|
|
//! [`SendRequest::poll_ready`]: struct.SendRequest.html#method.poll_ready
|
|
//! [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
|
|
//! [`Builder`]: struct.Builder.html
|
|
//! [`Error`]: ../struct.Error.html
|
|
|
|
use crate::codec::{Codec, SendError, UserError};
|
|
use crate::ext::Protocol;
|
|
use crate::frame::{Headers, Pseudo, Reason, Settings, StreamId};
|
|
use crate::proto::{self, Error};
|
|
use crate::{FlowControl, PingPong, RecvStream, SendStream};
|
|
|
|
use bytes::{Buf, Bytes};
|
|
use http::{uri, HeaderMap, Method, Request, Response, Version};
|
|
use std::fmt;
|
|
use std::future::Future;
|
|
use std::pin::Pin;
|
|
use std::task::{Context, Poll};
|
|
use std::time::Duration;
|
|
use std::usize;
|
|
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
|
|
use tracing::Instrument;
|
|
|
|
/// Initializes new HTTP/2 streams on a connection by sending a request.
|
|
///
|
|
/// This type does no work itself. Instead, it is a handle to the inner
|
|
/// connection state held by [`Connection`]. If the associated connection
|
|
/// instance is dropped, all `SendRequest` functions will return [`Error`].
|
|
///
|
|
/// [`SendRequest`] instances are able to move to and operate on separate tasks
|
|
/// / threads than their associated [`Connection`] instance. Internally, there
|
|
/// is a buffer used to stage requests before they get written to the
|
|
/// connection. There is no guarantee that requests get written to the
|
|
/// connection in FIFO order as HTTP/2 prioritization logic can play a role.
|
|
///
|
|
/// [`SendRequest`] implements [`Clone`], enabling the creation of many
|
|
/// instances that are backed by a single connection.
|
|
///
|
|
/// See [module] level documentation for more details.
|
|
///
|
|
/// [module]: index.html
|
|
/// [`Connection`]: struct.Connection.html
|
|
/// [`Clone`]: https://doc.rust-lang.org/std/clone/trait.Clone.html
|
|
/// [`Error`]: ../struct.Error.html
|
|
pub struct SendRequest<B: Buf> {
|
|
inner: proto::Streams<B, Peer>,
|
|
pending: Option<proto::OpaqueStreamRef>,
|
|
}
|
|
|
|
/// Returns a `SendRequest` instance once it is ready to send at least one
|
|
/// request.
|
|
#[derive(Debug)]
|
|
pub struct ReadySendRequest<B: Buf> {
|
|
inner: Option<SendRequest<B>>,
|
|
}
|
|
|
|
/// Manages all state associated with an HTTP/2 client connection.
|
|
///
|
|
/// A `Connection` is backed by an I/O resource (usually a TCP socket) and
|
|
/// implements the HTTP/2 client logic for that connection. It is responsible
|
|
/// for driving the internal state forward, performing the work requested of the
|
|
/// associated handles ([`SendRequest`], [`ResponseFuture`], [`SendStream`],
|
|
/// [`RecvStream`]).
|
|
///
|
|
/// `Connection` values are created by calling [`handshake`]. Once a
|
|
/// `Connection` value is obtained, the caller must repeatedly call [`poll`]
|
|
/// until `Ready` is returned. The easiest way to do this is to submit the
|
|
/// `Connection` instance to an [executor].
|
|
///
|
|
/// [module]: index.html
|
|
/// [`handshake`]: fn.handshake.html
|
|
/// [`SendRequest`]: struct.SendRequest.html
|
|
/// [`ResponseFuture`]: struct.ResponseFuture.html
|
|
/// [`SendStream`]: ../struct.SendStream.html
|
|
/// [`RecvStream`]: ../struct.RecvStream.html
|
|
/// [`poll`]: #method.poll
|
|
/// [executor]: https://docs.rs/futures/0.1/futures/future/trait.Executor.html
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// # use tokio::io::{AsyncRead, AsyncWrite};
|
|
/// # use h2::client;
|
|
/// # use h2::client::*;
|
|
/// #
|
|
/// # async fn doc<T>(my_io: T) -> Result<(), h2::Error>
|
|
/// # where T: AsyncRead + AsyncWrite + Send + Unpin + 'static,
|
|
/// # {
|
|
/// let (send_request, connection) = client::handshake(my_io).await?;
|
|
/// // Submit the connection handle to an executor.
|
|
/// tokio::spawn(async { connection.await.expect("connection failed"); });
|
|
///
|
|
/// // Now, use `send_request` to initialize HTTP/2 streams.
|
|
/// // ...
|
|
/// # Ok(())
|
|
/// # }
|
|
/// #
|
|
/// # pub fn main() {}
|
|
/// ```
|
|
#[must_use = "futures do nothing unless polled"]
|
|
pub struct Connection<T, B: Buf = Bytes> {
|
|
inner: proto::Connection<T, Peer, B>,
|
|
}
|
|
|
|
/// A future of an HTTP response.
|
|
#[derive(Debug)]
|
|
#[must_use = "futures do nothing unless polled"]
|
|
pub struct ResponseFuture {
|
|
inner: proto::OpaqueStreamRef,
|
|
push_promise_consumed: bool,
|
|
}
|
|
|
|
/// A future of a pushed HTTP response.
|
|
///
|
|
/// We have to differentiate between pushed and non pushed because of the spec
|
|
/// <https://httpwg.org/specs/rfc7540.html#PUSH_PROMISE>
|
|
/// > PUSH_PROMISE frames MUST only be sent on a peer-initiated stream
|
|
/// > that is in either the "open" or "half-closed (remote)" state.
|
|
#[derive(Debug)]
|
|
#[must_use = "futures do nothing unless polled"]
|
|
pub struct PushedResponseFuture {
|
|
inner: ResponseFuture,
|
|
}
|
|
|
|
/// A pushed response and corresponding request headers
|
|
#[derive(Debug)]
|
|
pub struct PushPromise {
|
|
/// The request headers
|
|
request: Request<()>,
|
|
|
|
/// The pushed response
|
|
response: PushedResponseFuture,
|
|
}
|
|
|
|
/// A stream of pushed responses and corresponding promised requests
|
|
#[derive(Debug)]
|
|
#[must_use = "streams do nothing unless polled"]
|
|
pub struct PushPromises {
|
|
inner: proto::OpaqueStreamRef,
|
|
}
|
|
|
|
/// Builds client connections with custom configuration values.
|
|
///
|
|
/// Methods can be chained in order to set the configuration values.
|
|
///
|
|
/// The client is constructed by calling [`handshake`] and passing the I/O
|
|
/// handle that will back the HTTP/2 server.
|
|
///
|
|
/// New instances of `Builder` are obtained via [`Builder::new`].
|
|
///
|
|
/// See function level documentation for details on the various client
|
|
/// configuration settings.
|
|
///
|
|
/// [`Builder::new`]: struct.Builder.html#method.new
|
|
/// [`handshake`]: struct.Builder.html#method.handshake
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// # use tokio::io::{AsyncRead, AsyncWrite};
|
|
/// # use h2::client::*;
|
|
/// # use bytes::Bytes;
|
|
/// #
|
|
/// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
|
|
/// -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
|
|
/// # {
|
|
/// // `client_fut` is a future representing the completion of the HTTP/2
|
|
/// // handshake.
|
|
/// let client_fut = Builder::new()
|
|
/// .initial_window_size(1_000_000)
|
|
/// .max_concurrent_streams(1000)
|
|
/// .handshake(my_io);
|
|
/// # client_fut.await
|
|
/// # }
|
|
/// #
|
|
/// # pub fn main() {}
|
|
/// ```
|
|
#[derive(Clone, Debug)]
|
|
pub struct Builder {
|
|
/// Time to keep locally reset streams around before reaping.
|
|
reset_stream_duration: Duration,
|
|
|
|
/// Initial maximum number of locally initiated (send) streams.
|
|
/// After receiving a Settings frame from the remote peer,
|
|
/// the connection will overwrite this value with the
|
|
/// MAX_CONCURRENT_STREAMS specified in the frame.
|
|
initial_max_send_streams: usize,
|
|
|
|
/// Initial target window size for new connections.
|
|
initial_target_connection_window_size: Option<u32>,
|
|
|
|
/// Maximum amount of bytes to "buffer" for writing per stream.
|
|
max_send_buffer_size: usize,
|
|
|
|
/// Maximum number of locally reset streams to keep at a time.
|
|
reset_stream_max: usize,
|
|
|
|
/// Initial `Settings` frame to send as part of the handshake.
|
|
settings: Settings,
|
|
|
|
/// The stream ID of the first (lowest) stream. Subsequent streams will use
|
|
/// monotonically increasing stream IDs.
|
|
stream_id: StreamId,
|
|
}
|
|
|
|
#[derive(Debug)]
|
|
pub(crate) struct Peer;
|
|
|
|
// ===== impl SendRequest =====
|
|
|
|
impl<B> SendRequest<B>
|
|
where
|
|
B: Buf + 'static,
|
|
{
|
|
/// Returns `Ready` when the connection can initialize a new HTTP/2
|
|
/// stream.
|
|
///
|
|
/// This function must return `Ready` before `send_request` is called. When
|
|
/// `Poll::Pending` is returned, the task will be notified once the readiness
|
|
/// state changes.
|
|
///
|
|
/// See [module] level docs for more details.
|
|
///
|
|
/// [module]: index.html
|
|
pub fn poll_ready(&mut self, cx: &mut Context) -> Poll<Result<(), crate::Error>> {
|
|
ready!(self.inner.poll_pending_open(cx, self.pending.as_ref()))?;
|
|
self.pending = None;
|
|
Poll::Ready(Ok(()))
|
|
}
|
|
|
|
/// Consumes `self`, returning a future that returns `self` back once it is
|
|
/// ready to send a request.
|
|
///
|
|
/// This function should be called before calling `send_request`.
|
|
///
|
|
/// This is a functional combinator for [`poll_ready`]. The returned future
|
|
/// will call `SendStream::poll_ready` until `Ready`, then returns `self` to
|
|
/// the caller.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```rust
|
|
/// # use h2::client::*;
|
|
/// # use http::*;
|
|
/// # async fn doc(send_request: SendRequest<&'static [u8]>)
|
|
/// # {
|
|
/// // First, wait until the `send_request` handle is ready to send a new
|
|
/// // request
|
|
/// let mut send_request = send_request.ready().await.unwrap();
|
|
/// // Use `send_request` here.
|
|
/// # }
|
|
/// # pub fn main() {}
|
|
/// ```
|
|
///
|
|
/// See [module] level docs for more details.
|
|
///
|
|
/// [`poll_ready`]: #method.poll_ready
|
|
/// [module]: index.html
|
|
pub fn ready(self) -> ReadySendRequest<B> {
|
|
ReadySendRequest { inner: Some(self) }
|
|
}
|
|
|
|
/// Sends a HTTP/2 request to the server.
|
|
///
|
|
/// `send_request` initializes a new HTTP/2 stream on the associated
|
|
/// connection, then sends the given request using this new stream. Only the
|
|
/// request head is sent.
|
|
///
|
|
/// On success, a [`ResponseFuture`] instance and [`SendStream`] instance
|
|
/// are returned. The [`ResponseFuture`] instance is used to get the
|
|
/// server's response and the [`SendStream`] instance is used to send a
|
|
/// request body or trailers to the server over the same HTTP/2 stream.
|
|
///
|
|
/// To send a request body or trailers, set `end_of_stream` to `false`.
|
|
/// Then, use the returned [`SendStream`] instance to stream request body
|
|
/// chunks or send trailers. If `end_of_stream` is **not** set to `false`
|
|
/// then attempting to call [`SendStream::send_data`] or
|
|
/// [`SendStream::send_trailers`] will result in an error.
|
|
///
|
|
/// If no request body or trailers are to be sent, set `end_of_stream` to
|
|
/// `true` and drop the returned [`SendStream`] instance.
|
|
///
|
|
/// # A note on HTTP versions
|
|
///
|
|
/// The provided `Request` will be encoded differently depending on the
|
|
/// value of its version field. If the version is set to 2.0, then the
|
|
/// request is encoded as per the specification recommends.
|
|
///
|
|
/// If the version is set to a lower value, then the request is encoded to
|
|
/// preserve the characteristics of HTTP 1.1 and lower. Specifically, host
|
|
/// headers are permitted and the `:authority` pseudo header is not
|
|
/// included.
|
|
///
|
|
/// The caller should always set the request's version field to 2.0 unless
|
|
/// specifically transmitting an HTTP 1.1 request over 2.0.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// Sending a request with no body
|
|
///
|
|
/// ```rust
|
|
/// # use h2::client::*;
|
|
/// # use http::*;
|
|
/// # async fn doc(send_request: SendRequest<&'static [u8]>)
|
|
/// # {
|
|
/// // First, wait until the `send_request` handle is ready to send a new
|
|
/// // request
|
|
/// let mut send_request = send_request.ready().await.unwrap();
|
|
/// // Prepare the HTTP request to send to the server.
|
|
/// let request = Request::get("https://www.example.com/")
|
|
/// .body(())
|
|
/// .unwrap();
|
|
///
|
|
/// // Send the request to the server. Since we are not sending a
|
|
/// // body or trailers, we can drop the `SendStream` instance.
|
|
/// let (response, _) = send_request.send_request(request, true).unwrap();
|
|
/// let response = response.await.unwrap();
|
|
/// // Process the response
|
|
/// # }
|
|
/// # pub fn main() {}
|
|
/// ```
|
|
///
|
|
/// Sending a request with a body and trailers
|
|
///
|
|
/// ```rust
|
|
/// # use h2::client::*;
|
|
/// # use http::*;
|
|
/// # async fn doc(send_request: SendRequest<&'static [u8]>)
|
|
/// # {
|
|
/// // First, wait until the `send_request` handle is ready to send a new
|
|
/// // request
|
|
/// let mut send_request = send_request.ready().await.unwrap();
|
|
///
|
|
/// // Prepare the HTTP request to send to the server.
|
|
/// let request = Request::get("https://www.example.com/")
|
|
/// .body(())
|
|
/// .unwrap();
|
|
///
|
|
/// // Send the request to the server. If we are not sending a
|
|
/// // body or trailers, we can drop the `SendStream` instance.
|
|
/// let (response, mut send_stream) = send_request
|
|
/// .send_request(request, false).unwrap();
|
|
///
|
|
/// // At this point, one option would be to wait for send capacity.
|
|
/// // Doing so would allow us to not hold data in memory that
|
|
/// // cannot be sent. However, this is not a requirement, so this
|
|
/// // example will skip that step. See `SendStream` documentation
|
|
/// // for more details.
|
|
/// send_stream.send_data(b"hello", false).unwrap();
|
|
/// send_stream.send_data(b"world", false).unwrap();
|
|
///
|
|
/// // Send the trailers.
|
|
/// let mut trailers = HeaderMap::new();
|
|
/// trailers.insert(
|
|
/// header::HeaderName::from_bytes(b"my-trailer").unwrap(),
|
|
/// header::HeaderValue::from_bytes(b"hello").unwrap());
|
|
///
|
|
/// send_stream.send_trailers(trailers).unwrap();
|
|
///
|
|
/// let response = response.await.unwrap();
|
|
/// // Process the response
|
|
/// # }
|
|
/// # pub fn main() {}
|
|
/// ```
|
|
///
|
|
/// [`ResponseFuture`]: struct.ResponseFuture.html
|
|
/// [`SendStream`]: ../struct.SendStream.html
|
|
/// [`SendStream::send_data`]: ../struct.SendStream.html#method.send_data
|
|
/// [`SendStream::send_trailers`]: ../struct.SendStream.html#method.send_trailers
|
|
pub fn send_request(
|
|
&mut self,
|
|
request: Request<()>,
|
|
end_of_stream: bool,
|
|
) -> Result<(ResponseFuture, SendStream<B>), crate::Error> {
|
|
self.inner
|
|
.send_request(request, end_of_stream, self.pending.as_ref())
|
|
.map_err(Into::into)
|
|
.map(|stream| {
|
|
if stream.is_pending_open() {
|
|
self.pending = Some(stream.clone_to_opaque());
|
|
}
|
|
|
|
let response = ResponseFuture {
|
|
inner: stream.clone_to_opaque(),
|
|
push_promise_consumed: false,
|
|
};
|
|
|
|
let stream = SendStream::new(stream);
|
|
|
|
(response, stream)
|
|
})
|
|
}
|
|
|
|
/// Returns whether the [extended CONNECT protocol][1] is enabled or not.
|
|
///
|
|
/// This setting is configured by the server peer by sending the
|
|
/// [`SETTINGS_ENABLE_CONNECT_PROTOCOL` parameter][2] in a `SETTINGS` frame.
|
|
/// This method returns the currently acknowledged value recieved from the
|
|
/// remote.
|
|
///
|
|
/// [1]: https://datatracker.ietf.org/doc/html/rfc8441#section-4
|
|
/// [2]: https://datatracker.ietf.org/doc/html/rfc8441#section-3
|
|
pub fn is_extended_connect_protocol_enabled(&self) -> bool {
|
|
self.inner.is_extended_connect_protocol_enabled()
|
|
}
|
|
}
|
|
|
|
impl<B> fmt::Debug for SendRequest<B>
|
|
where
|
|
B: Buf,
|
|
{
|
|
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
|
|
fmt.debug_struct("SendRequest").finish()
|
|
}
|
|
}
|
|
|
|
impl<B> Clone for SendRequest<B>
|
|
where
|
|
B: Buf,
|
|
{
|
|
fn clone(&self) -> Self {
|
|
SendRequest {
|
|
inner: self.inner.clone(),
|
|
pending: None,
|
|
}
|
|
}
|
|
}
|
|
|
|
#[cfg(feature = "unstable")]
|
|
impl<B> SendRequest<B>
|
|
where
|
|
B: Buf,
|
|
{
|
|
/// Returns the number of active streams.
|
|
///
|
|
/// An active stream is a stream that has not yet transitioned to a closed
|
|
/// state.
|
|
pub fn num_active_streams(&self) -> usize {
|
|
self.inner.num_active_streams()
|
|
}
|
|
|
|
/// Returns the number of streams that are held in memory.
|
|
///
|
|
/// A wired stream is a stream that is either active or is closed but must
|
|
/// stay in memory for some reason. For example, there are still outstanding
|
|
/// userspace handles pointing to the slot.
|
|
pub fn num_wired_streams(&self) -> usize {
|
|
self.inner.num_wired_streams()
|
|
}
|
|
}
|
|
|
|
// ===== impl ReadySendRequest =====
|
|
|
|
impl<B> Future for ReadySendRequest<B>
|
|
where
|
|
B: Buf + 'static,
|
|
{
|
|
type Output = Result<SendRequest<B>, crate::Error>;
|
|
|
|
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
|
match &mut self.inner {
|
|
Some(send_request) => {
|
|
ready!(send_request.poll_ready(cx))?;
|
|
}
|
|
None => panic!("called `poll` after future completed"),
|
|
}
|
|
|
|
Poll::Ready(Ok(self.inner.take().unwrap()))
|
|
}
|
|
}
|
|
|
|
// ===== impl Builder =====
|
|
|
|
impl Builder {
|
|
/// Returns a new client builder instance initialized with default
|
|
/// configuration values.
|
|
///
|
|
/// Configuration methods can be chained on the return value.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// # use tokio::io::{AsyncRead, AsyncWrite};
|
|
/// # use h2::client::*;
|
|
/// # use bytes::Bytes;
|
|
/// #
|
|
/// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
|
|
/// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
|
|
/// # {
|
|
/// // `client_fut` is a future representing the completion of the HTTP/2
|
|
/// // handshake.
|
|
/// let client_fut = Builder::new()
|
|
/// .initial_window_size(1_000_000)
|
|
/// .max_concurrent_streams(1000)
|
|
/// .handshake(my_io);
|
|
/// # client_fut.await
|
|
/// # }
|
|
/// #
|
|
/// # pub fn main() {}
|
|
/// ```
|
|
pub fn new() -> Builder {
|
|
Builder {
|
|
max_send_buffer_size: proto::DEFAULT_MAX_SEND_BUFFER_SIZE,
|
|
reset_stream_duration: Duration::from_secs(proto::DEFAULT_RESET_STREAM_SECS),
|
|
reset_stream_max: proto::DEFAULT_RESET_STREAM_MAX,
|
|
initial_target_connection_window_size: None,
|
|
initial_max_send_streams: usize::MAX,
|
|
settings: Default::default(),
|
|
stream_id: 1.into(),
|
|
}
|
|
}
|
|
|
|
/// Indicates the initial window size (in octets) for stream-level
|
|
/// flow control for received data.
|
|
///
|
|
/// The initial window of a stream is used as part of flow control. For more
|
|
/// details, see [`FlowControl`].
|
|
///
|
|
/// The default value is 65,535.
|
|
///
|
|
/// [`FlowControl`]: ../struct.FlowControl.html
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// # use tokio::io::{AsyncRead, AsyncWrite};
|
|
/// # use h2::client::*;
|
|
/// # use bytes::Bytes;
|
|
/// #
|
|
/// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
|
|
/// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
|
|
/// # {
|
|
/// // `client_fut` is a future representing the completion of the HTTP/2
|
|
/// // handshake.
|
|
/// let client_fut = Builder::new()
|
|
/// .initial_window_size(1_000_000)
|
|
/// .handshake(my_io);
|
|
/// # client_fut.await
|
|
/// # }
|
|
/// #
|
|
/// # pub fn main() {}
|
|
/// ```
|
|
pub fn initial_window_size(&mut self, size: u32) -> &mut Self {
|
|
self.settings.set_initial_window_size(Some(size));
|
|
self
|
|
}
|
|
|
|
/// Indicates the initial window size (in octets) for connection-level flow control
|
|
/// for received data.
|
|
///
|
|
/// The initial window of a connection is used as part of flow control. For more details,
|
|
/// see [`FlowControl`].
|
|
///
|
|
/// The default value is 65,535.
|
|
///
|
|
/// [`FlowControl`]: ../struct.FlowControl.html
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// # use tokio::io::{AsyncRead, AsyncWrite};
|
|
/// # use h2::client::*;
|
|
/// # use bytes::Bytes;
|
|
/// #
|
|
/// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
|
|
/// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
|
|
/// # {
|
|
/// // `client_fut` is a future representing the completion of the HTTP/2
|
|
/// // handshake.
|
|
/// let client_fut = Builder::new()
|
|
/// .initial_connection_window_size(1_000_000)
|
|
/// .handshake(my_io);
|
|
/// # client_fut.await
|
|
/// # }
|
|
/// #
|
|
/// # pub fn main() {}
|
|
/// ```
|
|
pub fn initial_connection_window_size(&mut self, size: u32) -> &mut Self {
|
|
self.initial_target_connection_window_size = Some(size);
|
|
self
|
|
}
|
|
|
|
/// Indicates the size (in octets) of the largest HTTP/2 frame payload that the
|
|
/// configured client is able to accept.
|
|
///
|
|
/// The sender may send data frames that are **smaller** than this value,
|
|
/// but any data larger than `max` will be broken up into multiple `DATA`
|
|
/// frames.
|
|
///
|
|
/// The value **must** be between 16,384 and 16,777,215. The default value is 16,384.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// # use tokio::io::{AsyncRead, AsyncWrite};
|
|
/// # use h2::client::*;
|
|
/// # use bytes::Bytes;
|
|
/// #
|
|
/// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
|
|
/// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
|
|
/// # {
|
|
/// // `client_fut` is a future representing the completion of the HTTP/2
|
|
/// // handshake.
|
|
/// let client_fut = Builder::new()
|
|
/// .max_frame_size(1_000_000)
|
|
/// .handshake(my_io);
|
|
/// # client_fut.await
|
|
/// # }
|
|
/// #
|
|
/// # pub fn main() {}
|
|
/// ```
|
|
///
|
|
/// # Panics
|
|
///
|
|
/// This function panics if `max` is not within the legal range specified
|
|
/// above.
|
|
pub fn max_frame_size(&mut self, max: u32) -> &mut Self {
|
|
self.settings.set_max_frame_size(Some(max));
|
|
self
|
|
}
|
|
|
|
/// Sets the max size of received header frames.
|
|
///
|
|
/// This advisory setting informs a peer of the maximum size of header list
|
|
/// that the sender is prepared to accept, in octets. The value is based on
|
|
/// the uncompressed size of header fields, including the length of the name
|
|
/// and value in octets plus an overhead of 32 octets for each header field.
|
|
///
|
|
/// This setting is also used to limit the maximum amount of data that is
|
|
/// buffered to decode HEADERS frames.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// # use tokio::io::{AsyncRead, AsyncWrite};
|
|
/// # use h2::client::*;
|
|
/// # use bytes::Bytes;
|
|
/// #
|
|
/// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
|
|
/// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
|
|
/// # {
|
|
/// // `client_fut` is a future representing the completion of the HTTP/2
|
|
/// // handshake.
|
|
/// let client_fut = Builder::new()
|
|
/// .max_header_list_size(16 * 1024)
|
|
/// .handshake(my_io);
|
|
/// # client_fut.await
|
|
/// # }
|
|
/// #
|
|
/// # pub fn main() {}
|
|
/// ```
|
|
pub fn max_header_list_size(&mut self, max: u32) -> &mut Self {
|
|
self.settings.set_max_header_list_size(Some(max));
|
|
self
|
|
}
|
|
|
|
/// Sets the maximum number of concurrent streams.
|
|
///
|
|
/// The maximum concurrent streams setting only controls the maximum number
|
|
/// of streams that can be initiated by the remote peer. In other words,
|
|
/// when this setting is set to 100, this does not limit the number of
|
|
/// concurrent streams that can be created by the caller.
|
|
///
|
|
/// It is recommended that this value be no smaller than 100, so as to not
|
|
/// unnecessarily limit parallelism. However, any value is legal, including
|
|
/// 0. If `max` is set to 0, then the remote will not be permitted to
|
|
/// initiate streams.
|
|
///
|
|
/// Note that streams in the reserved state, i.e., push promises that have
|
|
/// been reserved but the stream has not started, do not count against this
|
|
/// setting.
|
|
///
|
|
/// Also note that if the remote *does* exceed the value set here, it is not
|
|
/// a protocol level error. Instead, the `h2` library will immediately reset
|
|
/// the stream.
|
|
///
|
|
/// See [Section 5.1.2] in the HTTP/2 spec for more details.
|
|
///
|
|
/// [Section 5.1.2]: https://http2.github.io/http2-spec/#rfc.section.5.1.2
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// # use tokio::io::{AsyncRead, AsyncWrite};
|
|
/// # use h2::client::*;
|
|
/// # use bytes::Bytes;
|
|
/// #
|
|
/// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
|
|
/// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
|
|
/// # {
|
|
/// // `client_fut` is a future representing the completion of the HTTP/2
|
|
/// // handshake.
|
|
/// let client_fut = Builder::new()
|
|
/// .max_concurrent_streams(1000)
|
|
/// .handshake(my_io);
|
|
/// # client_fut.await
|
|
/// # }
|
|
/// #
|
|
/// # pub fn main() {}
|
|
/// ```
|
|
pub fn max_concurrent_streams(&mut self, max: u32) -> &mut Self {
|
|
self.settings.set_max_concurrent_streams(Some(max));
|
|
self
|
|
}
|
|
|
|
/// Sets the initial maximum of locally initiated (send) streams.
|
|
///
|
|
/// The initial settings will be overwritten by the remote peer when
|
|
/// the Settings frame is received. The new value will be set to the
|
|
/// `max_concurrent_streams()` from the frame.
|
|
///
|
|
/// This setting prevents the caller from exceeding this number of
|
|
/// streams that are counted towards the concurrency limit.
|
|
///
|
|
/// Sending streams past the limit returned by the peer will be treated
|
|
/// as a stream error of type PROTOCOL_ERROR or REFUSED_STREAM.
|
|
///
|
|
/// See [Section 5.1.2] in the HTTP/2 spec for more details.
|
|
///
|
|
/// [Section 5.1.2]: https://http2.github.io/http2-spec/#rfc.section.5.1.2
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// # use tokio::io::{AsyncRead, AsyncWrite};
|
|
/// # use h2::client::*;
|
|
/// # use bytes::Bytes;
|
|
/// #
|
|
/// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
|
|
/// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
|
|
/// # {
|
|
/// // `client_fut` is a future representing the completion of the HTTP/2
|
|
/// // handshake.
|
|
/// let client_fut = Builder::new()
|
|
/// .initial_max_send_streams(1000)
|
|
/// .handshake(my_io);
|
|
/// # client_fut.await
|
|
/// # }
|
|
/// #
|
|
/// # pub fn main() {}
|
|
/// ```
|
|
pub fn initial_max_send_streams(&mut self, initial: usize) -> &mut Self {
|
|
self.initial_max_send_streams = initial;
|
|
self
|
|
}
|
|
|
|
/// Sets the maximum number of concurrent locally reset streams.
|
|
///
|
|
/// When a stream is explicitly reset, the HTTP/2 specification requires
|
|
/// that any further frames received for that stream must be ignored for
|
|
/// "some time".
|
|
///
|
|
/// In order to satisfy the specification, internal state must be maintained
|
|
/// to implement the behavior. This state grows linearly with the number of
|
|
/// streams that are locally reset.
|
|
///
|
|
/// The `max_concurrent_reset_streams` setting configures sets an upper
|
|
/// bound on the amount of state that is maintained. When this max value is
|
|
/// reached, the oldest reset stream is purged from memory.
|
|
///
|
|
/// Once the stream has been fully purged from memory, any additional frames
|
|
/// received for that stream will result in a connection level protocol
|
|
/// error, forcing the connection to terminate.
|
|
///
|
|
/// The default value is 10.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// # use tokio::io::{AsyncRead, AsyncWrite};
|
|
/// # use h2::client::*;
|
|
/// # use bytes::Bytes;
|
|
/// #
|
|
/// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
|
|
/// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
|
|
/// # {
|
|
/// // `client_fut` is a future representing the completion of the HTTP/2
|
|
/// // handshake.
|
|
/// let client_fut = Builder::new()
|
|
/// .max_concurrent_reset_streams(1000)
|
|
/// .handshake(my_io);
|
|
/// # client_fut.await
|
|
/// # }
|
|
/// #
|
|
/// # pub fn main() {}
|
|
/// ```
|
|
pub fn max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self {
|
|
self.reset_stream_max = max;
|
|
self
|
|
}
|
|
|
|
/// Sets the duration to remember locally reset streams.
|
|
///
|
|
/// When a stream is explicitly reset, the HTTP/2 specification requires
|
|
/// that any further frames received for that stream must be ignored for
|
|
/// "some time".
|
|
///
|
|
/// In order to satisfy the specification, internal state must be maintained
|
|
/// to implement the behavior. This state grows linearly with the number of
|
|
/// streams that are locally reset.
|
|
///
|
|
/// The `reset_stream_duration` setting configures the max amount of time
|
|
/// this state will be maintained in memory. Once the duration elapses, the
|
|
/// stream state is purged from memory.
|
|
///
|
|
/// Once the stream has been fully purged from memory, any additional frames
|
|
/// received for that stream will result in a connection level protocol
|
|
/// error, forcing the connection to terminate.
|
|
///
|
|
/// The default value is 30 seconds.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// # use tokio::io::{AsyncRead, AsyncWrite};
|
|
/// # use h2::client::*;
|
|
/// # use std::time::Duration;
|
|
/// # use bytes::Bytes;
|
|
/// #
|
|
/// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
|
|
/// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
|
|
/// # {
|
|
/// // `client_fut` is a future representing the completion of the HTTP/2
|
|
/// // handshake.
|
|
/// let client_fut = Builder::new()
|
|
/// .reset_stream_duration(Duration::from_secs(10))
|
|
/// .handshake(my_io);
|
|
/// # client_fut.await
|
|
/// # }
|
|
/// #
|
|
/// # pub fn main() {}
|
|
/// ```
|
|
pub fn reset_stream_duration(&mut self, dur: Duration) -> &mut Self {
|
|
self.reset_stream_duration = dur;
|
|
self
|
|
}
|
|
|
|
/// Sets the maximum send buffer size per stream.
|
|
///
|
|
/// Once a stream has buffered up to (or over) the maximum, the stream's
|
|
/// flow control will not "poll" additional capacity. Once bytes for the
|
|
/// stream have been written to the connection, the send buffer capacity
|
|
/// will be freed up again.
|
|
///
|
|
/// The default is currently ~400MB, but may change.
|
|
///
|
|
/// # Panics
|
|
///
|
|
/// This function panics if `max` is larger than `u32::MAX`.
|
|
pub fn max_send_buffer_size(&mut self, max: usize) -> &mut Self {
|
|
assert!(max <= std::u32::MAX as usize);
|
|
self.max_send_buffer_size = max;
|
|
self
|
|
}
|
|
|
|
/// Enables or disables server push promises.
|
|
///
|
|
/// This value is included in the initial SETTINGS handshake. When set, the
|
|
/// server MUST NOT send a push promise. Setting this value to value to
|
|
/// false in the initial SETTINGS handshake guarantees that the remote server
|
|
/// will never send a push promise.
|
|
///
|
|
/// This setting can be changed during the life of a single HTTP/2
|
|
/// connection by sending another settings frame updating the value.
|
|
///
|
|
/// Default value: `true`.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// # use tokio::io::{AsyncRead, AsyncWrite};
|
|
/// # use h2::client::*;
|
|
/// # use std::time::Duration;
|
|
/// # use bytes::Bytes;
|
|
/// #
|
|
/// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
|
|
/// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
|
|
/// # {
|
|
/// // `client_fut` is a future representing the completion of the HTTP/2
|
|
/// // handshake.
|
|
/// let client_fut = Builder::new()
|
|
/// .enable_push(false)
|
|
/// .handshake(my_io);
|
|
/// # client_fut.await
|
|
/// # }
|
|
/// #
|
|
/// # pub fn main() {}
|
|
/// ```
|
|
pub fn enable_push(&mut self, enabled: bool) -> &mut Self {
|
|
self.settings.set_enable_push(enabled);
|
|
self
|
|
}
|
|
|
|
/// Sets the first stream ID to something other than 1.
|
|
#[cfg(feature = "unstable")]
|
|
pub fn initial_stream_id(&mut self, stream_id: u32) -> &mut Self {
|
|
self.stream_id = stream_id.into();
|
|
assert!(
|
|
self.stream_id.is_client_initiated(),
|
|
"stream id must be odd"
|
|
);
|
|
self
|
|
}
|
|
|
|
/// Creates a new configured HTTP/2 client backed by `io`.
|
|
///
|
|
/// It is expected that `io` already be in an appropriate state to commence
|
|
/// the [HTTP/2 handshake]. The handshake is completed once both the connection
|
|
/// preface and the initial settings frame is sent by the client.
|
|
///
|
|
/// The handshake future does not wait for the initial settings frame from the
|
|
/// server.
|
|
///
|
|
/// Returns a future which resolves to the [`Connection`] / [`SendRequest`]
|
|
/// tuple once the HTTP/2 handshake has been completed.
|
|
///
|
|
/// This function also allows the caller to configure the send payload data
|
|
/// type. See [Outbound data type] for more details.
|
|
///
|
|
/// [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
|
|
/// [`Connection`]: struct.Connection.html
|
|
/// [`SendRequest`]: struct.SendRequest.html
|
|
/// [Outbound data type]: ../index.html#outbound-data-type.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// Basic usage:
|
|
///
|
|
/// ```
|
|
/// # use tokio::io::{AsyncRead, AsyncWrite};
|
|
/// # use h2::client::*;
|
|
/// # use bytes::Bytes;
|
|
/// #
|
|
/// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
|
|
/// -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
|
|
/// # {
|
|
/// // `client_fut` is a future representing the completion of the HTTP/2
|
|
/// // handshake.
|
|
/// let client_fut = Builder::new()
|
|
/// .handshake(my_io);
|
|
/// # client_fut.await
|
|
/// # }
|
|
/// #
|
|
/// # pub fn main() {}
|
|
/// ```
|
|
///
|
|
/// Configures the send-payload data type. In this case, the outbound data
|
|
/// type will be `&'static [u8]`.
|
|
///
|
|
/// ```
|
|
/// # use tokio::io::{AsyncRead, AsyncWrite};
|
|
/// # use h2::client::*;
|
|
/// #
|
|
/// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
|
|
/// # -> Result<((SendRequest<&'static [u8]>, Connection<T, &'static [u8]>)), h2::Error>
|
|
/// # {
|
|
/// // `client_fut` is a future representing the completion of the HTTP/2
|
|
/// // handshake.
|
|
/// let client_fut = Builder::new()
|
|
/// .handshake::<_, &'static [u8]>(my_io);
|
|
/// # client_fut.await
|
|
/// # }
|
|
/// #
|
|
/// # pub fn main() {}
|
|
/// ```
|
|
pub fn handshake<T, B>(
|
|
&self,
|
|
io: T,
|
|
) -> impl Future<Output = Result<(SendRequest<B>, Connection<T, B>), crate::Error>>
|
|
where
|
|
T: AsyncRead + AsyncWrite + Unpin,
|
|
B: Buf + 'static,
|
|
{
|
|
Connection::handshake2(io, self.clone())
|
|
}
|
|
}
|
|
|
|
impl Default for Builder {
|
|
fn default() -> Builder {
|
|
Builder::new()
|
|
}
|
|
}
|
|
|
|
/// Creates a new configured HTTP/2 client with default configuration
|
|
/// values backed by `io`.
|
|
///
|
|
/// It is expected that `io` already be in an appropriate state to commence
|
|
/// the [HTTP/2 handshake]. See [Handshake] for more details.
|
|
///
|
|
/// Returns a future which resolves to the [`Connection`] / [`SendRequest`]
|
|
/// tuple once the HTTP/2 handshake has been completed. The returned
|
|
/// [`Connection`] instance will be using default configuration values. Use
|
|
/// [`Builder`] to customize the configuration values used by a [`Connection`]
|
|
/// instance.
|
|
///
|
|
/// [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
|
|
/// [Handshake]: ../index.html#handshake
|
|
/// [`Connection`]: struct.Connection.html
|
|
/// [`SendRequest`]: struct.SendRequest.html
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```
|
|
/// # use tokio::io::{AsyncRead, AsyncWrite};
|
|
/// # use h2::client;
|
|
/// # use h2::client::*;
|
|
/// #
|
|
/// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) -> Result<(), h2::Error>
|
|
/// # {
|
|
/// let (send_request, connection) = client::handshake(my_io).await?;
|
|
/// // The HTTP/2 handshake has completed, now start polling
|
|
/// // `connection` and use `send_request` to send requests to the
|
|
/// // server.
|
|
/// # Ok(())
|
|
/// # }
|
|
/// #
|
|
/// # pub fn main() {}
|
|
/// ```
|
|
pub async fn handshake<T>(io: T) -> Result<(SendRequest<Bytes>, Connection<T, Bytes>), crate::Error>
|
|
where
|
|
T: AsyncRead + AsyncWrite + Unpin,
|
|
{
|
|
let builder = Builder::new();
|
|
builder
|
|
.handshake(io)
|
|
.instrument(tracing::trace_span!("client_handshake"))
|
|
.await
|
|
}
|
|
|
|
// ===== impl Connection =====
|
|
|
|
async fn bind_connection<T>(io: &mut T) -> Result<(), crate::Error>
|
|
where
|
|
T: AsyncRead + AsyncWrite + Unpin,
|
|
{
|
|
tracing::debug!("binding client connection");
|
|
|
|
let msg: &'static [u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
|
|
io.write_all(msg).await.map_err(crate::Error::from_io)?;
|
|
|
|
tracing::debug!("client connection bound");
|
|
|
|
Ok(())
|
|
}
|
|
|
|
impl<T, B> Connection<T, B>
|
|
where
|
|
T: AsyncRead + AsyncWrite + Unpin,
|
|
B: Buf + 'static,
|
|
{
|
|
async fn handshake2(
|
|
mut io: T,
|
|
builder: Builder,
|
|
) -> Result<(SendRequest<B>, Connection<T, B>), crate::Error> {
|
|
bind_connection(&mut io).await?;
|
|
|
|
// Create the codec
|
|
let mut codec = Codec::new(io);
|
|
|
|
if let Some(max) = builder.settings.max_frame_size() {
|
|
codec.set_max_recv_frame_size(max as usize);
|
|
}
|
|
|
|
if let Some(max) = builder.settings.max_header_list_size() {
|
|
codec.set_max_recv_header_list_size(max as usize);
|
|
}
|
|
|
|
// Send initial settings frame
|
|
codec
|
|
.buffer(builder.settings.clone().into())
|
|
.expect("invalid SETTINGS frame");
|
|
|
|
let inner = proto::Connection::new(
|
|
codec,
|
|
proto::Config {
|
|
next_stream_id: builder.stream_id,
|
|
initial_max_send_streams: builder.initial_max_send_streams,
|
|
max_send_buffer_size: builder.max_send_buffer_size,
|
|
reset_stream_duration: builder.reset_stream_duration,
|
|
reset_stream_max: builder.reset_stream_max,
|
|
settings: builder.settings.clone(),
|
|
},
|
|
);
|
|
let send_request = SendRequest {
|
|
inner: inner.streams().clone(),
|
|
pending: None,
|
|
};
|
|
|
|
let mut connection = Connection { inner };
|
|
if let Some(sz) = builder.initial_target_connection_window_size {
|
|
connection.set_target_window_size(sz);
|
|
}
|
|
|
|
Ok((send_request, connection))
|
|
}
|
|
|
|
/// Sets the target window size for the whole connection.
|
|
///
|
|
/// If `size` is greater than the current value, then a `WINDOW_UPDATE`
|
|
/// frame will be immediately sent to the remote, increasing the connection
|
|
/// level window by `size - current_value`.
|
|
///
|
|
/// If `size` is less than the current value, nothing will happen
|
|
/// immediately. However, as window capacity is released by
|
|
/// [`FlowControl`] instances, no `WINDOW_UPDATE` frames will be sent
|
|
/// out until the number of "in flight" bytes drops below `size`.
|
|
///
|
|
/// The default value is 65,535.
|
|
///
|
|
/// See [`FlowControl`] documentation for more details.
|
|
///
|
|
/// [`FlowControl`]: ../struct.FlowControl.html
|
|
/// [library level]: ../index.html#flow-control
|
|
pub fn set_target_window_size(&mut self, size: u32) {
|
|
assert!(size <= proto::MAX_WINDOW_SIZE);
|
|
self.inner.set_target_window_size(size);
|
|
}
|
|
|
|
/// Set a new `INITIAL_WINDOW_SIZE` setting (in octets) for stream-level
|
|
/// flow control for received data.
|
|
///
|
|
/// The `SETTINGS` will be sent to the remote, and only applied once the
|
|
/// remote acknowledges the change.
|
|
///
|
|
/// This can be used to increase or decrease the window size for existing
|
|
/// streams.
|
|
///
|
|
/// # Errors
|
|
///
|
|
/// Returns an error if a previous call is still pending acknowledgement
|
|
/// from the remote endpoint.
|
|
pub fn set_initial_window_size(&mut self, size: u32) -> Result<(), crate::Error> {
|
|
assert!(size <= proto::MAX_WINDOW_SIZE);
|
|
self.inner.set_initial_window_size(size)?;
|
|
Ok(())
|
|
}
|
|
|
|
/// Takes a `PingPong` instance from the connection.
|
|
///
|
|
/// # Note
|
|
///
|
|
/// This may only be called once. Calling multiple times will return `None`.
|
|
pub fn ping_pong(&mut self) -> Option<PingPong> {
|
|
self.inner.take_user_pings().map(PingPong::new)
|
|
}
|
|
|
|
/// Returns the maximum number of concurrent streams that may be initiated
|
|
/// by this client.
|
|
///
|
|
/// This limit is configured by the server peer by sending the
|
|
/// [`SETTINGS_MAX_CONCURRENT_STREAMS` parameter][1] in a `SETTINGS` frame.
|
|
/// This method returns the currently acknowledged value recieved from the
|
|
/// remote.
|
|
///
|
|
/// [1]: https://tools.ietf.org/html/rfc7540#section-5.1.2
|
|
pub fn max_concurrent_send_streams(&self) -> usize {
|
|
self.inner.max_send_streams()
|
|
}
|
|
/// Returns the maximum number of concurrent streams that may be initiated
|
|
/// by the server on this connection.
|
|
///
|
|
/// This returns the value of the [`SETTINGS_MAX_CONCURRENT_STREAMS`
|
|
/// parameter][1] sent in a `SETTINGS` frame that has been
|
|
/// acknowledged by the remote peer. The value to be sent is configured by
|
|
/// the [`Builder::max_concurrent_streams`][2] method before handshaking
|
|
/// with the remote peer.
|
|
///
|
|
/// [1]: https://tools.ietf.org/html/rfc7540#section-5.1.2
|
|
/// [2]: ../struct.Builder.html#method.max_concurrent_streams
|
|
pub fn max_concurrent_recv_streams(&self) -> usize {
|
|
self.inner.max_recv_streams()
|
|
}
|
|
}
|
|
|
|
impl<T, B> Future for Connection<T, B>
|
|
where
|
|
T: AsyncRead + AsyncWrite + Unpin,
|
|
B: Buf + 'static,
|
|
{
|
|
type Output = Result<(), crate::Error>;
|
|
|
|
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
|
self.inner.maybe_close_connection_if_no_streams();
|
|
self.inner.poll(cx).map_err(Into::into)
|
|
}
|
|
}
|
|
|
|
impl<T, B> fmt::Debug for Connection<T, B>
|
|
where
|
|
T: AsyncRead + AsyncWrite,
|
|
T: fmt::Debug,
|
|
B: fmt::Debug + Buf,
|
|
{
|
|
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
|
|
fmt::Debug::fmt(&self.inner, fmt)
|
|
}
|
|
}
|
|
|
|
// ===== impl ResponseFuture =====
|
|
|
|
impl Future for ResponseFuture {
|
|
type Output = Result<Response<RecvStream>, crate::Error>;
|
|
|
|
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
|
let (parts, _) = ready!(self.inner.poll_response(cx))?.into_parts();
|
|
let body = RecvStream::new(FlowControl::new(self.inner.clone()));
|
|
|
|
Poll::Ready(Ok(Response::from_parts(parts, body)))
|
|
}
|
|
}
|
|
|
|
impl ResponseFuture {
|
|
/// Returns the stream ID of the response stream.
|
|
///
|
|
/// # Panics
|
|
///
|
|
/// If the lock on the stream store has been poisoned.
|
|
pub fn stream_id(&self) -> crate::StreamId {
|
|
crate::StreamId::from_internal(self.inner.stream_id())
|
|
}
|
|
/// Returns a stream of PushPromises
|
|
///
|
|
/// # Panics
|
|
///
|
|
/// If this method has been called before
|
|
/// or the stream was itself was pushed
|
|
pub fn push_promises(&mut self) -> PushPromises {
|
|
if self.push_promise_consumed {
|
|
panic!("Reference to push promises stream taken!");
|
|
}
|
|
self.push_promise_consumed = true;
|
|
PushPromises {
|
|
inner: self.inner.clone(),
|
|
}
|
|
}
|
|
}
|
|
|
|
// ===== impl PushPromises =====
|
|
|
|
impl PushPromises {
|
|
/// Get the next `PushPromise`.
|
|
pub async fn push_promise(&mut self) -> Option<Result<PushPromise, crate::Error>> {
|
|
futures_util::future::poll_fn(move |cx| self.poll_push_promise(cx)).await
|
|
}
|
|
|
|
#[doc(hidden)]
|
|
pub fn poll_push_promise(
|
|
&mut self,
|
|
cx: &mut Context<'_>,
|
|
) -> Poll<Option<Result<PushPromise, crate::Error>>> {
|
|
match self.inner.poll_pushed(cx) {
|
|
Poll::Ready(Some(Ok((request, response)))) => {
|
|
let response = PushedResponseFuture {
|
|
inner: ResponseFuture {
|
|
inner: response,
|
|
push_promise_consumed: false,
|
|
},
|
|
};
|
|
Poll::Ready(Some(Ok(PushPromise { request, response })))
|
|
}
|
|
Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e.into()))),
|
|
Poll::Ready(None) => Poll::Ready(None),
|
|
Poll::Pending => Poll::Pending,
|
|
}
|
|
}
|
|
}
|
|
|
|
#[cfg(feature = "stream")]
|
|
impl futures_core::Stream for PushPromises {
|
|
type Item = Result<PushPromise, crate::Error>;
|
|
|
|
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
|
self.poll_push_promise(cx)
|
|
}
|
|
}
|
|
|
|
// ===== impl PushPromise =====
|
|
|
|
impl PushPromise {
|
|
/// Returns a reference to the push promise's request headers.
|
|
pub fn request(&self) -> &Request<()> {
|
|
&self.request
|
|
}
|
|
|
|
/// Returns a mutable reference to the push promise's request headers.
|
|
pub fn request_mut(&mut self) -> &mut Request<()> {
|
|
&mut self.request
|
|
}
|
|
|
|
/// Consumes `self`, returning the push promise's request headers and
|
|
/// response future.
|
|
pub fn into_parts(self) -> (Request<()>, PushedResponseFuture) {
|
|
(self.request, self.response)
|
|
}
|
|
}
|
|
|
|
// ===== impl PushedResponseFuture =====
|
|
|
|
impl Future for PushedResponseFuture {
|
|
type Output = Result<Response<RecvStream>, crate::Error>;
|
|
|
|
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
|
Pin::new(&mut self.inner).poll(cx)
|
|
}
|
|
}
|
|
|
|
impl PushedResponseFuture {
|
|
/// Returns the stream ID of the response stream.
|
|
///
|
|
/// # Panics
|
|
///
|
|
/// If the lock on the stream store has been poisoned.
|
|
pub fn stream_id(&self) -> crate::StreamId {
|
|
self.inner.stream_id()
|
|
}
|
|
}
|
|
|
|
// ===== impl Peer =====
|
|
|
|
impl Peer {
|
|
pub fn convert_send_message(
|
|
id: StreamId,
|
|
request: Request<()>,
|
|
protocol: Option<Protocol>,
|
|
end_of_stream: bool,
|
|
) -> Result<Headers, SendError> {
|
|
use http::request::Parts;
|
|
|
|
let (
|
|
Parts {
|
|
method,
|
|
uri,
|
|
headers,
|
|
version,
|
|
..
|
|
},
|
|
_,
|
|
) = request.into_parts();
|
|
|
|
let is_connect = method == Method::CONNECT;
|
|
|
|
// Build the set pseudo header set. All requests will include `method`
|
|
// and `path`.
|
|
let mut pseudo = Pseudo::request(method, uri, protocol);
|
|
|
|
if pseudo.scheme.is_none() {
|
|
// If the scheme is not set, then there are a two options.
|
|
//
|
|
// 1) Authority is not set. In this case, a request was issued with
|
|
// a relative URI. This is permitted **only** when forwarding
|
|
// HTTP 1.x requests. If the HTTP version is set to 2.0, then
|
|
// this is an error.
|
|
//
|
|
// 2) Authority is set, then the HTTP method *must* be CONNECT.
|
|
//
|
|
// It is not possible to have a scheme but not an authority set (the
|
|
// `http` crate does not allow it).
|
|
//
|
|
if pseudo.authority.is_none() {
|
|
if version == Version::HTTP_2 {
|
|
return Err(UserError::MissingUriSchemeAndAuthority.into());
|
|
} else {
|
|
// This is acceptable as per the above comment. However,
|
|
// HTTP/2 requires that a scheme is set. Since we are
|
|
// forwarding an HTTP 1.1 request, the scheme is set to
|
|
// "http".
|
|
pseudo.set_scheme(uri::Scheme::HTTP);
|
|
}
|
|
} else if !is_connect {
|
|
// TODO: Error
|
|
}
|
|
}
|
|
|
|
// Create the HEADERS frame
|
|
let mut frame = Headers::new(id, pseudo, headers);
|
|
|
|
if end_of_stream {
|
|
frame.set_end_stream()
|
|
}
|
|
|
|
Ok(frame)
|
|
}
|
|
}
|
|
|
|
impl proto::Peer for Peer {
|
|
type Poll = Response<()>;
|
|
|
|
const NAME: &'static str = "Client";
|
|
|
|
fn r#dyn() -> proto::DynPeer {
|
|
proto::DynPeer::Client
|
|
}
|
|
|
|
fn is_server() -> bool {
|
|
false
|
|
}
|
|
|
|
fn convert_poll_message(
|
|
pseudo: Pseudo,
|
|
fields: HeaderMap,
|
|
stream_id: StreamId,
|
|
) -> Result<Self::Poll, Error> {
|
|
let mut b = Response::builder();
|
|
|
|
b = b.version(Version::HTTP_2);
|
|
|
|
if let Some(status) = pseudo.status {
|
|
b = b.status(status);
|
|
}
|
|
|
|
let mut response = match b.body(()) {
|
|
Ok(response) => response,
|
|
Err(_) => {
|
|
// TODO: Should there be more specialized handling for different
|
|
// kinds of errors
|
|
return Err(Error::library_reset(stream_id, Reason::PROTOCOL_ERROR));
|
|
}
|
|
};
|
|
|
|
*response.headers_mut() = fields;
|
|
|
|
Ok(response)
|
|
}
|
|
}
|