feat(client): implement the HTTP/2 extended CONNECT protocol from RFC 8441 (#2682)
This commit is contained in:
@@ -487,6 +487,23 @@ where
|
||||
Poll::Ready(Ok(conn.take().unwrap().into_parts()))
|
||||
})
|
||||
}
|
||||
|
||||
/// Returns whether the [extended CONNECT protocol][1] is enabled or not.
|
||||
///
|
||||
/// This setting is configured by the server peer by sending the
|
||||
/// [`SETTINGS_ENABLE_CONNECT_PROTOCOL` parameter][2] in a `SETTINGS` frame.
|
||||
/// This method returns the currently acknowledged value recieved from the
|
||||
/// remote.
|
||||
///
|
||||
/// [1]: https://datatracker.ietf.org/doc/html/rfc8441#section-4
|
||||
/// [2]: https://datatracker.ietf.org/doc/html/rfc8441#section-3
|
||||
#[cfg(feature = "http2")]
|
||||
pub fn http2_is_extended_connect_protocol_enabled(&self) -> bool {
|
||||
match self.inner.as_ref().unwrap() {
|
||||
ProtoClient::H1 { .. } => false,
|
||||
ProtoClient::H2 { h2 } => h2.is_extended_connect_protocol_enabled(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, B> Future for Connection<T, B>
|
||||
|
||||
60
src/ext.rs
60
src/ext.rs
@@ -1,9 +1,67 @@
|
||||
//! HTTP extensions
|
||||
//! HTTP extensions.
|
||||
|
||||
use bytes::Bytes;
|
||||
#[cfg(feature = "http1")]
|
||||
use http::header::{HeaderName, IntoHeaderName, ValueIter};
|
||||
use http::HeaderMap;
|
||||
#[cfg(feature = "http2")]
|
||||
use std::fmt;
|
||||
|
||||
#[cfg(feature = "http2")]
|
||||
/// Represents the `:protocol` pseudo-header used by
|
||||
/// the [Extended CONNECT Protocol].
|
||||
///
|
||||
/// [Extended CONNECT Protocol]: https://datatracker.ietf.org/doc/html/rfc8441#section-4
|
||||
#[derive(Clone, Eq, PartialEq)]
|
||||
pub struct Protocol {
|
||||
inner: h2::ext::Protocol,
|
||||
}
|
||||
|
||||
#[cfg(feature = "http2")]
|
||||
impl Protocol {
|
||||
/// Converts a static string to a protocol name.
|
||||
pub const fn from_static(value: &'static str) -> Self {
|
||||
Self {
|
||||
inner: h2::ext::Protocol::from_static(value),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns a str representation of the header.
|
||||
pub fn as_str(&self) -> &str {
|
||||
self.inner.as_str()
|
||||
}
|
||||
|
||||
pub(crate) fn from_inner(inner: h2::ext::Protocol) -> Self {
|
||||
Self { inner }
|
||||
}
|
||||
|
||||
pub(crate) fn into_inner(self) -> h2::ext::Protocol {
|
||||
self.inner
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "http2")]
|
||||
impl<'a> From<&'a str> for Protocol {
|
||||
fn from(value: &'a str) -> Self {
|
||||
Self {
|
||||
inner: h2::ext::Protocol::from(value),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "http2")]
|
||||
impl AsRef<[u8]> for Protocol {
|
||||
fn as_ref(&self) -> &[u8] {
|
||||
self.inner.as_ref()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "http2")]
|
||||
impl fmt::Debug for Protocol {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
self.inner.fmt(f)
|
||||
}
|
||||
}
|
||||
|
||||
/// A map from header names to their original casing as received in an HTTP message.
|
||||
///
|
||||
|
||||
@@ -76,7 +76,7 @@ mod cfg;
|
||||
mod common;
|
||||
pub mod body;
|
||||
mod error;
|
||||
mod ext;
|
||||
pub mod ext;
|
||||
#[cfg(test)]
|
||||
mod mock;
|
||||
pub mod rt;
|
||||
|
||||
@@ -14,6 +14,7 @@ use tracing::{debug, trace, warn};
|
||||
use super::{ping, H2Upgraded, PipeToSendStream, SendBuf};
|
||||
use crate::body::HttpBody;
|
||||
use crate::common::{exec::Exec, task, Future, Never, Pin, Poll};
|
||||
use crate::ext::Protocol;
|
||||
use crate::headers;
|
||||
use crate::proto::h2::UpgradedSendStream;
|
||||
use crate::proto::Dispatched;
|
||||
@@ -204,6 +205,15 @@ where
|
||||
req_rx: ClientRx<B>,
|
||||
}
|
||||
|
||||
impl<B> ClientTask<B>
|
||||
where
|
||||
B: HttpBody + 'static,
|
||||
{
|
||||
pub(crate) fn is_extended_connect_protocol_enabled(&self) -> bool {
|
||||
self.h2_tx.is_extended_connect_protocol_enabled()
|
||||
}
|
||||
}
|
||||
|
||||
impl<B> Future for ClientTask<B>
|
||||
where
|
||||
B: HttpBody + Send + 'static,
|
||||
@@ -260,6 +270,10 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(protocol) = req.extensions_mut().remove::<Protocol>() {
|
||||
req.extensions_mut().insert(protocol.into_inner());
|
||||
}
|
||||
|
||||
let (fut, body_tx) = match self.h2_tx.send_request(req, !is_connect && eos) {
|
||||
Ok(ok) => ok,
|
||||
Err(err) => {
|
||||
|
||||
@@ -15,6 +15,7 @@ use super::{ping, PipeToSendStream, SendBuf};
|
||||
use crate::body::HttpBody;
|
||||
use crate::common::exec::ConnStreamExec;
|
||||
use crate::common::{date, task, Future, Pin, Poll};
|
||||
use crate::ext::Protocol;
|
||||
use crate::headers;
|
||||
use crate::proto::h2::ping::Recorder;
|
||||
use crate::proto::h2::{H2Upgraded, UpgradedSendStream};
|
||||
@@ -41,6 +42,7 @@ pub(crate) struct Config {
|
||||
pub(crate) initial_conn_window_size: u32,
|
||||
pub(crate) initial_stream_window_size: u32,
|
||||
pub(crate) max_frame_size: u32,
|
||||
pub(crate) enable_connect_protocol: bool,
|
||||
pub(crate) max_concurrent_streams: Option<u32>,
|
||||
#[cfg(feature = "runtime")]
|
||||
pub(crate) keep_alive_interval: Option<Duration>,
|
||||
@@ -56,6 +58,7 @@ impl Default for Config {
|
||||
initial_conn_window_size: DEFAULT_CONN_WINDOW,
|
||||
initial_stream_window_size: DEFAULT_STREAM_WINDOW,
|
||||
max_frame_size: DEFAULT_MAX_FRAME_SIZE,
|
||||
enable_connect_protocol: false,
|
||||
max_concurrent_streams: None,
|
||||
#[cfg(feature = "runtime")]
|
||||
keep_alive_interval: None,
|
||||
@@ -117,6 +120,9 @@ where
|
||||
if let Some(max) = config.max_concurrent_streams {
|
||||
builder.max_concurrent_streams(max);
|
||||
}
|
||||
if config.enable_connect_protocol {
|
||||
builder.enable_connect_protocol();
|
||||
}
|
||||
let handshake = builder.handshake(io);
|
||||
|
||||
let bdp = if config.adaptive_window {
|
||||
@@ -280,7 +286,7 @@ where
|
||||
|
||||
let is_connect = req.method() == Method::CONNECT;
|
||||
let (mut parts, stream) = req.into_parts();
|
||||
let (req, connect_parts) = if !is_connect {
|
||||
let (mut req, connect_parts) = if !is_connect {
|
||||
(
|
||||
Request::from_parts(
|
||||
parts,
|
||||
@@ -307,6 +313,10 @@ where
|
||||
)
|
||||
};
|
||||
|
||||
if let Some(protocol) = req.extensions_mut().remove::<h2::ext::Protocol>() {
|
||||
req.extensions_mut().insert(Protocol::from_inner(protocol));
|
||||
}
|
||||
|
||||
let fut = H2Stream::new(service.call(req), connect_parts, respond);
|
||||
exec.execute_h2stream(fut);
|
||||
}
|
||||
|
||||
@@ -558,6 +558,15 @@ impl<E> Http<E> {
|
||||
self
|
||||
}
|
||||
|
||||
/// Enables the [extended CONNECT protocol].
|
||||
///
|
||||
/// [extended CONNECT protocol]: https://datatracker.ietf.org/doc/html/rfc8441#section-4
|
||||
#[cfg(feature = "http2")]
|
||||
pub fn http2_enable_connect_protocol(&mut self) -> &mut Self {
|
||||
self.h2_builder.enable_connect_protocol = true;
|
||||
self
|
||||
}
|
||||
|
||||
/// Set the maximum buffer size for the connection.
|
||||
///
|
||||
/// Default is ~400kb.
|
||||
|
||||
@@ -453,6 +453,15 @@ impl<I, E> Builder<I, E> {
|
||||
self
|
||||
}
|
||||
|
||||
/// Enables the [extended CONNECT protocol].
|
||||
///
|
||||
/// [extended CONNECT protocol]: https://datatracker.ietf.org/doc/html/rfc8441#section-4
|
||||
#[cfg(feature = "http2")]
|
||||
pub fn http2_enable_connect_protocol(mut self) -> Self {
|
||||
self.protocol.http2_enable_connect_protocol();
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets the `Executor` to deal with connection tasks.
|
||||
///
|
||||
/// Default is `tokio::spawn`.
|
||||
|
||||
Reference in New Issue
Block a user