feat(http2): Make HTTP/2 support an optional feature

cc #2251

BREAKING CHANGE: This puts all HTTP/2 methods and support behind an
  `http2` cargo feature, which will not be enabled by default. To use
  HTTP/2, add `features = ["http2"]` to the hyper dependency in your
  `Cargo.toml`.
This commit is contained in:
Sean McArthur
2020-11-09 16:11:04 -08:00
parent 5438e9b7bf
commit b819b428d3
19 changed files with 395 additions and 112 deletions

View File

@@ -31,7 +31,7 @@ http = "0.2"
http-body = "0.3.1"
httpdate = "0.3"
httparse = "1.0"
h2 = { git = "https://github.com/hyperium/h2" }
h2 = { git = "https://github.com/hyperium/h2", optional = true }
itoa = "0.4.1"
tracing = { version = "0.1", default-features = false, features = ["log", "std"] }
pin-project = "1.0"
@@ -56,6 +56,7 @@ tokio = { version = "0.3", features = [
"fs",
"macros",
"io-std",
"io-util",
"rt",
"rt-multi-thread", # so examples can use #[tokio::main]
"sync",
@@ -73,7 +74,16 @@ pnet = "0.25.0"
[features]
default = [
"runtime",
"stream"
"stream",
#"http1",
"http2",
]
full = [
#"http1",
"http2",
"stream",
"runtime",
]
runtime = [
"tcp",
@@ -86,6 +96,10 @@ tcp = [
"tokio/time",
]
# HTTP versions
#http1 = []
http2 = ["h2"]
# `impl Stream` for things
stream = []
@@ -94,10 +108,11 @@ nightly = []
__internal_happy_eyeballs_tests = []
[package.metadata.docs.rs]
features = [
"runtime",
"stream",
]
features = ["full"]
rustdoc-args = ["--cfg", "docsrs"]
[package.metadata.playground]
features = ["full"]
[profile.release]
codegen-units = 1

View File

@@ -14,6 +14,7 @@ use http_body::{Body as HttpBody, SizeHint};
#[cfg(feature = "stream")]
use crate::common::sync_wrapper::SyncWrapper;
use crate::common::{task, watch, Future, Never, Pin, Poll};
#[cfg(feature = "http2")]
use crate::proto::h2::ping;
use crate::proto::DecodedLength;
use crate::upgrade::OnUpgrade;
@@ -39,6 +40,7 @@ enum Kind {
want_tx: watch::Sender,
rx: mpsc::Receiver<Result<Bytes, crate::Error>>,
},
#[cfg(feature = "http2")]
H2 {
ping: ping::Recorder,
content_length: DecodedLength,
@@ -186,6 +188,7 @@ impl Body {
Body { kind, extra: None }
}
#[cfg(feature = "http2")]
pub(crate) fn h2(
recv: h2::RecvStream,
content_length: DecodedLength,
@@ -273,6 +276,7 @@ impl Body {
None => Poll::Ready(None),
}
}
#[cfg(feature = "http2")]
Kind::H2 {
ref ping,
recv: ref mut h2,
@@ -325,10 +329,11 @@ impl HttpBody for Body {
}
fn poll_trailers(
mut self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
#[cfg_attr(not(feature = "http2"), allow(unused_mut))] mut self: Pin<&mut Self>,
#[cfg_attr(not(feature = "http2"), allow(unused))] cx: &mut task::Context<'_>,
) -> Poll<Result<Option<HeaderMap>, Self::Error>> {
match self.kind {
#[cfg(feature = "http2")]
Kind::H2 {
recv: ref mut h2,
ref ping,
@@ -348,6 +353,7 @@ impl HttpBody for Body {
match self.kind {
Kind::Once(ref val) => val.is_none(),
Kind::Chan { content_length, .. } => content_length == DecodedLength::ZERO,
#[cfg(feature = "http2")]
Kind::H2 { recv: ref h2, .. } => h2.is_end_stream(),
#[cfg(feature = "stream")]
Kind::Wrapped(..) => false,
@@ -355,20 +361,26 @@ impl HttpBody for Body {
}
fn size_hint(&self) -> SizeHint {
macro_rules! opt_len {
($content_length:expr) => {{
let mut hint = SizeHint::default();
if let Some(content_length) = $content_length.into_opt() {
hint.set_exact(content_length);
}
hint
}};
}
match self.kind {
Kind::Once(Some(ref val)) => SizeHint::with_exact(val.len() as u64),
Kind::Once(None) => SizeHint::with_exact(0),
#[cfg(feature = "stream")]
Kind::Wrapped(..) => SizeHint::default(),
Kind::Chan { content_length, .. } | Kind::H2 { content_length, .. } => {
let mut hint = SizeHint::default();
if let Some(content_length) = content_length.into_opt() {
hint.set_exact(content_length);
}
hint
}
Kind::Chan { content_length, .. } => opt_len!(content_length),
#[cfg(feature = "http2")]
Kind::H2 { content_length, .. } => opt_len!(content_length),
}
}
}

9
src/cfg.rs Normal file
View File

@@ -0,0 +1,9 @@
macro_rules! cfg_http2 {
($($item:item)*) => {
$(
#[cfg(feature = "http2")]
//#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
$item
)*
}
}

View File

@@ -13,6 +13,7 @@ use std::fmt;
use std::mem;
use std::sync::Arc;
#[cfg(feature = "runtime")]
#[cfg(feature = "http2")]
use std::time::Duration;
use bytes::Bytes;
@@ -36,6 +37,7 @@ where
B: HttpBody,
{
H1(#[pin] Http1Dispatcher<T, B, proto::h1::ClientTransaction>),
#[cfg(feature = "http2")]
H2(#[pin] proto::h2::ClientTask<B>),
}
@@ -79,8 +81,16 @@ pub struct Builder {
h1_title_case_headers: bool,
h1_read_buf_exact_size: Option<usize>,
h1_max_buf_size: Option<usize>,
http2: bool,
#[cfg(feature = "http2")]
h2_builder: proto::h2::client::Config,
version: Proto,
}
#[derive(Clone, Debug)]
enum Proto {
Http1,
#[cfg(feature = "http2")]
Http2,
}
/// A future returned by `SendRequest::send_request`.
@@ -122,6 +132,7 @@ pub struct Parts<T> {
// A `SendRequest` that can be cloned to send HTTP2 requests.
// private for now, probably not a great idea of a type...
#[must_use = "futures do nothing unless polled"]
#[cfg(feature = "http2")]
pub(super) struct Http2SendRequest<B> {
dispatch: dispatch::UnboundedSender<Request<B>, Response<Body>>,
}
@@ -152,6 +163,7 @@ impl<B> SendRequest<B> {
self.dispatch.is_closed()
}
#[cfg(feature = "http2")]
pub(super) fn into_http2(self) -> Http2SendRequest<B> {
Http2SendRequest {
dispatch: self.dispatch.unbound(),
@@ -269,6 +281,7 @@ impl<B> fmt::Debug for SendRequest<B> {
// ===== impl Http2SendRequest
#[cfg(feature = "http2")]
impl<B> Http2SendRequest<B> {
pub(super) fn is_ready(&self) -> bool {
self.dispatch.is_ready()
@@ -279,6 +292,7 @@ impl<B> Http2SendRequest<B> {
}
}
#[cfg(feature = "http2")]
impl<B> Http2SendRequest<B>
where
B: HttpBody + 'static,
@@ -310,12 +324,14 @@ where
}
}
#[cfg(feature = "http2")]
impl<B> fmt::Debug for Http2SendRequest<B> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Http2SendRequest").finish()
}
}
#[cfg(feature = "http2")]
impl<B> Clone for Http2SendRequest<B> {
fn clone(&self) -> Self {
Http2SendRequest {
@@ -339,6 +355,7 @@ where
pub fn into_parts(self) -> Parts<T> {
let (io, read_buf, _) = match self.inner.expect("already upgraded") {
ProtoClient::H1(h1) => h1.into_inner(),
#[cfg(feature = "http2")]
ProtoClient::H2(_h2) => {
panic!("http2 cannot into_inner");
}
@@ -365,6 +382,7 @@ where
pub fn poll_without_shutdown(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
match *self.inner.as_mut().expect("already upgraded") {
ProtoClient::H1(ref mut h1) => h1.poll_without_shutdown(cx),
#[cfg(feature = "http2")]
ProtoClient::H2(ref mut h2) => Pin::new(h2).poll(cx).map_ok(|_| ()),
}
}
@@ -428,8 +446,9 @@ impl Builder {
h1_read_buf_exact_size: None,
h1_title_case_headers: false,
h1_max_buf_size: None,
http2: false,
#[cfg(feature = "http2")]
h2_builder: Default::default(),
version: Proto::Http1,
}
}
@@ -472,8 +491,10 @@ impl Builder {
/// Sets whether HTTP2 is required.
///
/// Default is false.
#[cfg(feature = "http2")]
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
pub fn http2_only(&mut self, enabled: bool) -> &mut Builder {
self.http2 = enabled;
self.version = if enabled { Proto::Http2 } else { Proto::Http1 };
self
}
@@ -485,6 +506,8 @@ impl Builder {
/// If not set, hyper will use a default.
///
/// [spec]: https://http2.github.io/http2-spec/#SETTINGS_INITIAL_WINDOW_SIZE
#[cfg(feature = "http2")]
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
pub fn http2_initial_stream_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
if let Some(sz) = sz.into() {
self.h2_builder.adaptive_window = false;
@@ -498,6 +521,8 @@ impl Builder {
/// Passing `None` will do nothing.
///
/// If not set, hyper will use a default.
#[cfg(feature = "http2")]
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
pub fn http2_initial_connection_window_size(
&mut self,
sz: impl Into<Option<u32>>,
@@ -514,6 +539,8 @@ impl Builder {
/// Enabling this will override the limits set in
/// `http2_initial_stream_window_size` and
/// `http2_initial_connection_window_size`.
#[cfg(feature = "http2")]
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
pub fn http2_adaptive_window(&mut self, enabled: bool) -> &mut Self {
use proto::h2::SPEC_WINDOW_SIZE;
@@ -530,6 +557,8 @@ impl Builder {
/// Passing `None` will do nothing.
///
/// If not set, hyper will use a default.
#[cfg(feature = "http2")]
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
pub fn http2_max_frame_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
if let Some(sz) = sz.into() {
self.h2_builder.max_frame_size = sz;
@@ -548,6 +577,8 @@ impl Builder {
///
/// Requires the `runtime` cargo feature to be enabled.
#[cfg(feature = "runtime")]
#[cfg(feature = "http2")]
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
pub fn http2_keep_alive_interval(
&mut self,
interval: impl Into<Option<Duration>>,
@@ -567,6 +598,8 @@ impl Builder {
///
/// Requires the `runtime` cargo feature to be enabled.
#[cfg(feature = "runtime")]
#[cfg(feature = "http2")]
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
pub fn http2_keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self {
self.h2_builder.keep_alive_timeout = timeout;
self
@@ -585,6 +618,8 @@ impl Builder {
///
/// Requires the `runtime` cargo feature to be enabled.
#[cfg(feature = "runtime")]
#[cfg(feature = "http2")]
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
pub fn http2_keep_alive_while_idle(&mut self, enabled: bool) -> &mut Self {
self.h2_builder.keep_alive_while_idle = enabled;
self
@@ -604,10 +639,11 @@ impl Builder {
let opts = self.clone();
async move {
trace!("client handshake HTTP/{}", if opts.http2 { 2 } else { 1 });
trace!("client handshake {:?}", opts.version);
let (tx, rx) = dispatch::channel();
let proto = if !opts.http2 {
let proto = match opts.version {
Proto::Http1 => {
let mut conn = proto::Conn::new(io);
if let Some(writev) = opts.h1_writev {
if writev {
@@ -628,10 +664,14 @@ impl Builder {
let cd = proto::h1::dispatch::Client::new(rx);
let dispatch = proto::h1::Dispatcher::new(cd, conn);
ProtoClient::H1(dispatch)
} else {
let h2 = proto::h2::client::handshake(io, rx, &opts.h2_builder, opts.exec.clone())
}
#[cfg(feature = "http2")]
Proto::Http2 => {
let h2 =
proto::h2::client::handshake(io, rx, &opts.h2_builder, opts.exec.clone())
.await?;
ProtoClient::H2(h2)
}
};
Ok((
@@ -684,6 +724,7 @@ where
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
match self.project() {
ProtoClientProj::H1(c) => c.poll(cx),
#[cfg(feature = "http2")]
ProtoClientProj::H2(c) => c.poll(cx),
}
}

View File

@@ -184,6 +184,7 @@ impl Connected {
// Don't public expose that `Connected` is `Clone`, unsure if we want to
// keep that contract...
#[cfg(feature = "http2")]
pub(super) fn clone(&self) -> Connected {
Connected {
alpn: self.alpn.clone(),

View File

@@ -1,8 +1,10 @@
use futures_util::future;
#[cfg(feature = "http2")]
use std::future::Future;
use tokio::stream::Stream;
use tokio::sync::{mpsc, oneshot};
use crate::common::{task, Future, Pin, Poll};
use crate::common::{task, Pin, Poll};
pub type RetryPromise<T, U> = oneshot::Receiver<Result<U, (crate::Error, Option<T>)>>;
pub type Promise<T> = oneshot::Receiver<Result<T, crate::Error>>;
@@ -41,6 +43,7 @@ pub struct Sender<T, U> {
///
/// Cannot poll the Giver, but can still use it to determine if the Receiver
/// has been dropped. However, this version can be cloned.
#[cfg(feature = "http2")]
pub struct UnboundedSender<T, U> {
/// Only used for `is_closed`, since mpsc::UnboundedSender cannot be checked.
giver: want::SharedGiver,
@@ -97,6 +100,7 @@ impl<T, U> Sender<T, U> {
.map_err(|mut e| (e.0).0.take().expect("envelope not dropped").0)
}
#[cfg(feature = "http2")]
pub fn unbound(self) -> UnboundedSender<T, U> {
UnboundedSender {
giver: self.giver.shared(),
@@ -105,6 +109,7 @@ impl<T, U> Sender<T, U> {
}
}
#[cfg(feature = "http2")]
impl<T, U> UnboundedSender<T, U> {
pub fn is_ready(&self) -> bool {
!self.giver.is_canceled()
@@ -123,6 +128,7 @@ impl<T, U> UnboundedSender<T, U> {
}
}
#[cfg(feature = "http2")]
impl<T, U> Clone for UnboundedSender<T, U> {
fn clone(&self) -> Self {
UnboundedSender {
@@ -197,6 +203,7 @@ pub enum Callback<T, U> {
}
impl<T, U> Callback<T, U> {
#[cfg(feature = "http2")]
pub(crate) fn is_canceled(&self) -> bool {
match *self {
Callback::Retry(ref tx) => tx.is_closed(),
@@ -222,10 +229,13 @@ impl<T, U> Callback<T, U> {
}
}
#[cfg(feature = "http2")]
pub(crate) fn send_when(
self,
mut when: impl Future<Output = Result<U, (crate::Error, Option<T>)>> + Unpin,
) -> impl Future<Output = ()> {
use futures_util::future;
let mut cb = Some(self);
// "select" on this callback being canceled, and the future completing
@@ -330,6 +340,7 @@ mod tests {
let _ = tx.try_send(Custom(2)).expect("2 ready");
}
#[cfg(feature = "http2")]
#[test]
fn unbounded_sender_doesnt_bound_on_want() {
let (tx, rx) = channel::<Custom, ()>();

View File

@@ -460,6 +460,9 @@ where
) -> impl Lazy<Output = crate::Result<Pooled<PoolClient<B>>>> + Unpin {
let executor = self.conn_builder.exec.clone();
let pool = self.pool.clone();
#[cfg(not(feature = "http2"))]
let conn_builder = self.conn_builder.clone();
#[cfg(feature = "http2")]
let mut conn_builder = self.conn_builder.clone();
let ver = self.config.ver;
let is_ver_h2 = ver == Ver::Http2;
@@ -505,10 +508,16 @@ where
} else {
connecting
};
#[cfg_attr(not(feature = "http2"), allow(unused))]
let is_h2 = is_ver_h2 || connected.alpn == Alpn::H2;
#[cfg(feature = "http2")]
{
conn_builder.http2_only(is_h2);
}
Either::Left(Box::pin(
conn_builder
.http2_only(is_h2)
.handshake(io)
.and_then(move |(tx, conn)| {
trace!(
@@ -524,15 +533,23 @@ where
tx.when_ready()
})
.map_ok(move |tx| {
let tx = {
#[cfg(feature = "http2")]
{
if is_h2 {
PoolTx::Http2(tx.into_http2())
} else {
PoolTx::Http1(tx)
}
}
#[cfg(not(feature = "http2"))]
PoolTx::Http1(tx)
};
pool.pooled(
connecting,
PoolClient {
conn_info: connected,
tx: if is_h2 {
PoolTx::Http2(tx.into_http2())
} else {
PoolTx::Http1(tx)
},
tx,
},
)
}),
@@ -640,6 +657,7 @@ struct PoolClient<B> {
enum PoolTx<B> {
Http1(conn::SendRequest<B>),
#[cfg(feature = "http2")]
Http2(conn::Http2SendRequest<B>),
}
@@ -647,6 +665,7 @@ impl<B> PoolClient<B> {
fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
match self.tx {
PoolTx::Http1(ref mut tx) => tx.poll_ready(cx),
#[cfg(feature = "http2")]
PoolTx::Http2(_) => Poll::Ready(Ok(())),
}
}
@@ -658,6 +677,7 @@ impl<B> PoolClient<B> {
fn is_http2(&self) -> bool {
match self.tx {
PoolTx::Http1(_) => false,
#[cfg(feature = "http2")]
PoolTx::Http2(_) => true,
}
}
@@ -665,6 +685,7 @@ impl<B> PoolClient<B> {
fn is_ready(&self) -> bool {
match self.tx {
PoolTx::Http1(ref tx) => tx.is_ready(),
#[cfg(feature = "http2")]
PoolTx::Http2(ref tx) => tx.is_ready(),
}
}
@@ -672,6 +693,7 @@ impl<B> PoolClient<B> {
fn is_closed(&self) -> bool {
match self.tx {
PoolTx::Http1(ref tx) => tx.is_closed(),
#[cfg(feature = "http2")]
PoolTx::Http2(ref tx) => tx.is_closed(),
}
}
@@ -686,7 +708,11 @@ impl<B: HttpBody + 'static> PoolClient<B> {
B: Send,
{
match self.tx {
#[cfg(not(feature = "http2"))]
PoolTx::Http1(ref mut tx) => tx.send_request_retryable(req),
#[cfg(feature = "http2")]
PoolTx::Http1(ref mut tx) => Either::Left(tx.send_request_retryable(req)),
#[cfg(feature = "http2")]
PoolTx::Http2(ref mut tx) => Either::Right(tx.send_request_retryable(req)),
}
}
@@ -699,6 +725,7 @@ where
fn is_open(&self) -> bool {
match self.tx {
PoolTx::Http1(ref tx) => tx.is_ready(),
#[cfg(feature = "http2")]
PoolTx::Http2(ref tx) => tx.is_ready(),
}
}
@@ -709,6 +736,7 @@ where
conn_info: self.conn_info,
tx: PoolTx::Http1(tx),
}),
#[cfg(feature = "http2")]
PoolTx::Http2(tx) => {
let b = PoolClient {
conn_info: self.conn_info.clone(),
@@ -1020,6 +1048,8 @@ impl Builder {
/// Note that setting this to true prevents HTTP/1 from being allowed.
///
/// Default is false.
#[cfg(feature = "http2")]
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
pub fn http2_only(&mut self, val: bool) -> &mut Self {
self.client_config.ver = if val { Ver::Http2 } else { Ver::Auto };
self
@@ -1033,6 +1063,8 @@ impl Builder {
/// If not set, hyper will use a default.
///
/// [spec]: https://http2.github.io/http2-spec/#SETTINGS_INITIAL_WINDOW_SIZE
#[cfg(feature = "http2")]
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
pub fn http2_initial_stream_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
self.conn_builder
.http2_initial_stream_window_size(sz.into());
@@ -1044,6 +1076,8 @@ impl Builder {
/// Passing `None` will do nothing.
///
/// If not set, hyper will use a default.
#[cfg(feature = "http2")]
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
pub fn http2_initial_connection_window_size(
&mut self,
sz: impl Into<Option<u32>>,
@@ -1058,6 +1092,8 @@ impl Builder {
/// Enabling this will override the limits set in
/// `http2_initial_stream_window_size` and
/// `http2_initial_connection_window_size`.
#[cfg(feature = "http2")]
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
pub fn http2_adaptive_window(&mut self, enabled: bool) -> &mut Self {
self.conn_builder.http2_adaptive_window(enabled);
self
@@ -1068,6 +1104,8 @@ impl Builder {
/// Passing `None` will do nothing.
///
/// If not set, hyper will use a default.
#[cfg(feature = "http2")]
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
pub fn http2_max_frame_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
self.conn_builder.http2_max_frame_size(sz);
self
@@ -1084,6 +1122,8 @@ impl Builder {
///
/// Requires the `runtime` cargo feature to be enabled.
#[cfg(feature = "runtime")]
#[cfg(feature = "http2")]
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
pub fn http2_keep_alive_interval(
&mut self,
interval: impl Into<Option<Duration>>,
@@ -1103,6 +1143,8 @@ impl Builder {
///
/// Requires the `runtime` cargo feature to be enabled.
#[cfg(feature = "runtime")]
#[cfg(feature = "http2")]
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
pub fn http2_keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self {
self.conn_builder.http2_keep_alive_timeout(timeout);
self
@@ -1121,6 +1163,8 @@ impl Builder {
///
/// Requires the `runtime` cargo feature to be enabled.
#[cfg(feature = "runtime")]
#[cfg(feature = "http2")]
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
pub fn http2_keep_alive_while_idle(&mut self, enabled: bool) -> &mut Self {
self.conn_builder.http2_keep_alive_while_idle(enabled);
self

View File

@@ -45,6 +45,7 @@ pub(super) enum Reservation<T> {
/// This connection could be used multiple times, the first one will be
/// reinserted into the `idle` pool, and the second will be given to
/// the `Checkout`.
#[cfg(feature = "http2")]
Shared(T, T),
/// This connection requires unique access. It will be returned after
/// use is complete.
@@ -199,9 +200,14 @@ impl<T: Poolable> Pool<T> {
}
*/
pub(super) fn pooled(&self, mut connecting: Connecting<T>, value: T) -> Pooled<T> {
pub(super) fn pooled(
&self,
#[cfg_attr(not(feature = "http2"), allow(unused_mut))] mut connecting: Connecting<T>,
value: T,
) -> Pooled<T> {
let (value, pool_ref) = if let Some(ref enabled) = self.inner {
match value.reserve() {
#[cfg(feature = "http2")]
Reservation::Shared(to_insert, to_return) => {
let mut inner = enabled.lock().unwrap();
inner.put(connecting.key.clone(), to_insert, enabled);
@@ -291,6 +297,7 @@ impl<'a, T: Poolable + 'a> IdlePopper<'a, T> {
}
let value = match entry.value.reserve() {
#[cfg(feature = "http2")]
Reservation::Shared(to_reinsert, to_checkout) => {
self.list.push(Idle {
idle_at: Instant::now(),
@@ -325,6 +332,7 @@ impl<T: Poolable> PoolInner<T> {
if !tx.is_canceled() {
let reserved = value.take().expect("value already sent");
let reserved = match reserved.reserve() {
#[cfg(feature = "http2")]
Reservation::Shared(to_keep, to_send) => {
value = Some(to_keep);
to_send

View File

@@ -4,6 +4,7 @@ use std::pin::Pin;
use std::sync::Arc;
use crate::body::{Body, HttpBody};
#[cfg(feature = "http2")]
use crate::proto::h2::server::H2Stream;
use crate::server::conn::spawn_all::{NewSvcTask, Watcher};
use crate::service::HttpService;
@@ -14,7 +15,7 @@ pub trait Executor<Fut> {
fn execute(&self, fut: Fut);
}
pub trait H2Exec<F, B: HttpBody>: Clone {
pub trait ConnStreamExec<F, B: HttpBody>: Clone {
fn execute_h2stream(&mut self, fut: H2Stream<F, B>);
}
@@ -64,7 +65,7 @@ impl fmt::Debug for Exec {
}
}
impl<F, B> H2Exec<F, B> for Exec
impl<F, B> ConnStreamExec<F, B> for Exec
where
H2Stream<F, B>: Future<Output = ()> + Send + 'static,
B: HttpBody,
@@ -87,7 +88,7 @@ where
// ==== impl Executor =====
impl<E, F, B> H2Exec<F, B> for E
impl<E, F, B> ConnStreamExec<F, B> for E
where
E: Executor<H2Stream<F, B>> + Clone,
H2Stream<F, B>: Future<Output = ()>,
@@ -109,3 +110,30 @@ where
self.execute(fut)
}
}
// If http2 is not enable, we just have a stub here, so that the trait bounds
// that *would* have been needed are still checked. Why?
//
// Because enabling `http2` shouldn't suddenly add new trait bounds that cause
// a compilation error.
#[cfg(not(feature = "http2"))]
#[allow(missing_debug_implementations)]
pub struct H2Stream<F, B>(std::marker::PhantomData<(F, B)>);
#[cfg(not(feature = "http2"))]
impl<F, B, E> Future for H2Stream<F, B>
where
F: Future<Output = Result<http::Response<B>, E>>,
B: HttpBody,
B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
E: Into<Box<dyn std::error::Error + Send + Sync>>,
{
type Output = ();
fn poll(
self: Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
unreachable!()
}
}

View File

@@ -14,6 +14,7 @@ pub(crate) struct Rewind<T> {
}
impl<T> Rewind<T> {
#[cfg(any(feature = "http2", test))]
pub(crate) fn new(io: T) -> Self {
Rewind {
pre: None,
@@ -28,6 +29,7 @@ impl<T> Rewind<T> {
}
}
#[cfg(any(feature = "http2", test))]
pub(crate) fn rewind(&mut self, bs: Bytes) {
debug_assert!(self.pre.is_none());
self.pre = Some(bs);

View File

@@ -49,6 +49,7 @@ pub(crate) enum Kind {
Shutdown,
/// A general error from h2.
#[cfg(feature = "http2")]
Http2,
}
@@ -175,6 +176,7 @@ impl Error {
None
}
#[cfg(feature = "http2")]
pub(crate) fn h2_reason(&self) -> h2::Reason {
// Find an h2::Reason somewhere in the cause stack, if it exists,
// otherwise assume an INTERNAL_ERROR.
@@ -284,6 +286,7 @@ impl Error {
Error::new(Kind::Shutdown).with(cause)
}
#[cfg(feature = "http2")]
pub(crate) fn new_h2(cause: ::h2::Error) -> Error {
if cause.is_io() {
Error::new_io(cause.into_io().expect("h2::Error::is_io"))
@@ -313,6 +316,7 @@ impl Error {
Kind::BodyWrite => "error writing a body to connection",
Kind::BodyWriteAborted => "body write aborted",
Kind::Shutdown => "error shutting down connection",
#[cfg(feature = "http2")]
Kind::Http2 => "http2 error",
Kind::Io => "connection error",
@@ -432,18 +436,21 @@ mod tests {
assert_eq!(mem::size_of::<Error>(), mem::size_of::<usize>());
}
#[cfg(feature = "http2")]
#[test]
fn h2_reason_unknown() {
let closed = Error::new_closed();
assert_eq!(closed.h2_reason(), h2::Reason::INTERNAL_ERROR);
}
#[cfg(feature = "http2")]
#[test]
fn h2_reason_one_level() {
let body_err = Error::new_user_body(h2::Error::from(h2::Reason::ENHANCE_YOUR_CALM));
assert_eq!(body_err.h2_reason(), h2::Reason::ENHANCE_YOUR_CALM);
}
#[cfg(feature = "http2")]
#[test]
fn h2_reason_nested() {
let recvd = Error::new_h2(h2::Error::from(h2::Reason::HTTP_1_1_REQUIRED));

View File

@@ -1,6 +1,7 @@
use bytes::BytesMut;
use http::header::{HeaderValue, OccupiedEntry, ValueIter};
use http::header::{CONTENT_LENGTH, TRANSFER_ENCODING};
#[cfg(feature = "http2")]
use http::method::Method;
use http::HeaderMap;
@@ -58,6 +59,7 @@ pub fn content_length_parse_all_values(values: ValueIter<'_, HeaderValue>) -> Op
}
}
#[cfg(feature = "http2")]
pub fn method_has_defined_payload_semantics(method: &Method) -> bool {
match *method {
Method::GET | Method::HEAD | Method::DELETE | Method::CONNECT => false,
@@ -65,6 +67,7 @@ pub fn method_has_defined_payload_semantics(method: &Method) -> bool {
}
}
#[cfg(feature = "http2")]
pub fn set_content_length_if_missing(headers: &mut HeaderMap, len: u64) {
headers
.entry(CONTENT_LENGTH)

View File

@@ -55,6 +55,8 @@ pub use crate::client::Client;
pub use crate::error::{Error, Result};
pub use crate::server::Server;
#[macro_use]
mod cfg;
#[macro_use]
mod common;
pub mod body;

View File

@@ -3,6 +3,7 @@ use std::fmt::{self, Write};
use std::str;
use std::time::{Duration, SystemTime};
#[cfg(feature = "http2")]
use http::header::HeaderValue;
use httpdate::HttpDate;
@@ -21,6 +22,7 @@ pub fn update() {
})
}
#[cfg(feature = "http2")]
pub(crate) fn update_and_header_value() -> HeaderValue {
CACHED.with(|cache| {
let mut cache = cache.borrow_mut();

View File

@@ -10,7 +10,7 @@ use tokio::io::{AsyncRead, AsyncWrite};
use super::{decode_content_length, ping, PipeToSendStream, SendBuf};
use crate::body::HttpBody;
use crate::common::exec::H2Exec;
use crate::common::exec::ConnStreamExec;
use crate::common::{task, Future, Pin, Poll};
use crate::headers;
use crate::proto::Dispatched;
@@ -95,7 +95,7 @@ where
S: HttpService<Body, ResBody = B>,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
B: HttpBody + 'static,
E: H2Exec<S::Future, B>,
E: ConnStreamExec<S::Future, B>,
{
pub(crate) fn new(io: T, service: S, config: &Config, exec: E) -> Server<T, S, B, E> {
let mut builder = h2::server::Builder::default();
@@ -162,7 +162,7 @@ where
S: HttpService<Body, ResBody = B>,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
B: HttpBody + 'static,
E: H2Exec<S::Future, B>,
E: ConnStreamExec<S::Future, B>,
{
type Output = crate::Result<Dispatched>;
@@ -216,7 +216,7 @@ where
where
S: HttpService<Body, ResBody = B>,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
E: H2Exec<S::Future, B>,
E: ConnStreamExec<S::Future, B>,
{
if self.closing.is_none() {
loop {

View File

@@ -5,7 +5,9 @@ pub(crate) use self::body_length::DecodedLength;
pub(crate) use self::h1::{dispatch, Conn, ServerTransaction};
pub(crate) mod h1;
cfg_http2! {
pub(crate) mod h2;
}
/// An Incoming Message head. Includes request/status line, and headers.
#[derive(Clone, Debug, Default, PartialEq)]

View File

@@ -45,10 +45,12 @@
use std::error::Error as StdError;
use std::fmt;
use std::marker::PhantomData;
use std::mem;
#[cfg(feature = "tcp")]
use std::net::SocketAddr;
#[cfg(feature = "runtime")]
#[cfg(feature = "http2")]
use std::time::Duration;
use bytes::Bytes;
@@ -57,9 +59,11 @@ use tokio::io::{AsyncRead, AsyncWrite};
use super::Accept;
use crate::body::{Body, HttpBody};
use crate::common::exec::{Exec, H2Exec, NewSvcExec};
use crate::common::exec::{ConnStreamExec, Exec, NewSvcExec};
#[cfg(feature = "http2")]
use crate::common::io::Rewind;
use crate::common::{task, Future, Pin, Poll, Unpin};
#[cfg(feature = "http2")]
use crate::error::{Kind, Parse};
use crate::proto;
use crate::service::{HttpService, MakeServiceRef};
@@ -85,6 +89,7 @@ pub struct Http<E = Exec> {
h1_half_close: bool,
h1_keep_alive: bool,
h1_writev: Option<bool>,
#[cfg(feature = "http2")]
h2_builder: proto::h2::server::Config,
mode: ConnectionMode,
max_buf_size: Option<usize>,
@@ -97,8 +102,10 @@ enum ConnectionMode {
/// Always use HTTP/1 and do not upgrade when a parse error occurs.
H1Only,
/// Always use HTTP/2.
#[cfg(feature = "http2")]
H2Only,
/// Use HTTP/1 and try to upgrade to h2 when a parse error occurs.
#[cfg(feature = "http2")]
Fallback,
}
@@ -150,6 +157,7 @@ where
S: HttpService<Body>,
{
pub(super) conn: Option<ProtoServer<T, S::ResBody, S, E>>,
#[cfg(feature = "http2")]
fallback: Fallback<E>,
}
@@ -167,16 +175,20 @@ where
T,
proto::ServerTransaction,
>,
PhantomData<E>,
),
#[cfg(feature = "http2")]
H2(#[pin] proto::h2::Server<Rewind<T>, S, B, E>),
}
#[cfg(feature = "http2")]
#[derive(Clone, Debug)]
enum Fallback<E> {
ToHttp2(proto::h2::server::Config, E),
Http1Only,
}
#[cfg(feature = "http2")]
impl<E> Fallback<E> {
fn to_h2(&self) -> bool {
match *self {
@@ -186,6 +198,7 @@ impl<E> Fallback<E> {
}
}
#[cfg(feature = "http2")]
impl<E> Unpin for Fallback<E> {}
/// Deconstructed parts of a `Connection`.
@@ -221,8 +234,9 @@ impl Http {
h1_half_close: false,
h1_keep_alive: true,
h1_writev: None,
#[cfg(feature = "http2")]
h2_builder: Default::default(),
mode: ConnectionMode::Fallback,
mode: ConnectionMode::default(),
max_buf_size: None,
pipeline_flush: false,
}
@@ -237,8 +251,11 @@ impl<E> Http<E> {
if val {
self.mode = ConnectionMode::H1Only;
} else {
#[cfg(feature = "http2")]
{
self.mode = ConnectionMode::Fallback;
}
}
self
}
@@ -291,6 +308,8 @@ impl<E> Http<E> {
/// Sets whether HTTP2 is required.
///
/// Default is false
#[cfg(feature = "http2")]
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
pub fn http2_only(&mut self, val: bool) -> &mut Self {
if val {
self.mode = ConnectionMode::H2Only;
@@ -308,6 +327,8 @@ impl<E> Http<E> {
/// If not set, hyper will use a default.
///
/// [spec]: https://http2.github.io/http2-spec/#SETTINGS_INITIAL_WINDOW_SIZE
#[cfg(feature = "http2")]
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
pub fn http2_initial_stream_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
if let Some(sz) = sz.into() {
self.h2_builder.adaptive_window = false;
@@ -321,6 +342,8 @@ impl<E> Http<E> {
/// Passing `None` will do nothing.
///
/// If not set, hyper will use a default.
#[cfg(feature = "http2")]
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
pub fn http2_initial_connection_window_size(
&mut self,
sz: impl Into<Option<u32>>,
@@ -337,6 +360,8 @@ impl<E> Http<E> {
/// Enabling this will override the limits set in
/// `http2_initial_stream_window_size` and
/// `http2_initial_connection_window_size`.
#[cfg(feature = "http2")]
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
pub fn http2_adaptive_window(&mut self, enabled: bool) -> &mut Self {
use proto::h2::SPEC_WINDOW_SIZE;
@@ -353,6 +378,8 @@ impl<E> Http<E> {
/// Passing `None` will do nothing.
///
/// If not set, hyper will use a default.
#[cfg(feature = "http2")]
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
pub fn http2_max_frame_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
if let Some(sz) = sz.into() {
self.h2_builder.max_frame_size = sz;
@@ -366,6 +393,8 @@ impl<E> Http<E> {
/// Default is no limit (`std::u32::MAX`). Passing `None` will do nothing.
///
/// [spec]: https://http2.github.io/http2-spec/#SETTINGS_MAX_CONCURRENT_STREAMS
#[cfg(feature = "http2")]
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
pub fn http2_max_concurrent_streams(&mut self, max: impl Into<Option<u32>>) -> &mut Self {
self.h2_builder.max_concurrent_streams = max.into();
self
@@ -382,6 +411,8 @@ impl<E> Http<E> {
///
/// Requires the `runtime` cargo feature to be enabled.
#[cfg(feature = "runtime")]
#[cfg(feature = "http2")]
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
pub fn http2_keep_alive_interval(
&mut self,
interval: impl Into<Option<Duration>>,
@@ -401,6 +432,8 @@ impl<E> Http<E> {
///
/// Requires the `runtime` cargo feature to be enabled.
#[cfg(feature = "runtime")]
#[cfg(feature = "http2")]
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
pub fn http2_keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self {
self.h2_builder.keep_alive_timeout = timeout;
self
@@ -441,6 +474,7 @@ impl<E> Http<E> {
h1_half_close: self.h1_half_close,
h1_keep_alive: self.h1_keep_alive,
h1_writev: self.h1_writev,
#[cfg(feature = "http2")]
h2_builder: self.h2_builder,
mode: self.mode,
max_buf_size: self.max_buf_size,
@@ -483,10 +517,10 @@ impl<E> Http<E> {
Bd: HttpBody + 'static,
Bd::Error: Into<Box<dyn StdError + Send + Sync>>,
I: AsyncRead + AsyncWrite + Unpin,
E: H2Exec<S::Future, Bd>,
E: ConnStreamExec<S::Future, Bd>,
{
let proto = match self.mode {
ConnectionMode::H1Only | ConnectionMode::Fallback => {
macro_rules! h1 {
() => {{
let mut conn = proto::Conn::new(io);
if !self.h1_keep_alive {
conn.disable_keep_alive();
@@ -506,8 +540,16 @@ impl<E> Http<E> {
conn.set_max_buf_size(max);
}
let sd = proto::h1::dispatch::Server::new(service);
ProtoServer::H1(proto::h1::Dispatcher::new(sd, conn))
ProtoServer::H1(proto::h1::Dispatcher::new(sd, conn), PhantomData)
}};
}
let proto = match self.mode {
#[cfg(not(feature = "http2"))]
ConnectionMode::H1Only => h1!(),
#[cfg(feature = "http2")]
ConnectionMode::H1Only | ConnectionMode::Fallback => h1!(),
#[cfg(feature = "http2")]
ConnectionMode::H2Only => {
let rewind_io = Rewind::new(io);
let h2 =
@@ -518,6 +560,7 @@ impl<E> Http<E> {
Connection {
conn: Some(proto),
#[cfg(feature = "http2")]
fallback: if self.mode == ConnectionMode::Fallback {
Fallback::ToHttp2(self.h2_builder.clone(), self.exec.clone())
} else {
@@ -534,7 +577,7 @@ impl<E> Http<E> {
S: MakeServiceRef<IO, Body, ResBody = Bd>,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
Bd: HttpBody,
E: H2Exec<<S::Service as HttpService<Body>>::Future, Bd>,
E: ConnStreamExec<<S::Service as HttpService<Body>>::Future, Bd>,
{
Serve {
incoming,
@@ -553,7 +596,7 @@ where
I: AsyncRead + AsyncWrite + Unpin,
B: HttpBody + 'static,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
E: H2Exec<S::Future, B>,
E: ConnStreamExec<S::Future, B>,
{
/// Start a graceful shutdown process for this connection.
///
@@ -567,9 +610,10 @@ where
/// nothing.
pub fn graceful_shutdown(self: Pin<&mut Self>) {
match self.project().conn {
Some(ProtoServer::H1(ref mut h1)) => {
Some(ProtoServer::H1(ref mut h1, _)) => {
h1.disable_keep_alive();
}
#[cfg(feature = "http2")]
Some(ProtoServer::H2(ref mut h2)) => {
h2.graceful_shutdown();
}
@@ -596,7 +640,7 @@ where
/// This method will return a `None` if this connection is using an h2 protocol.
pub fn try_into_parts(self) -> Option<Parts<I, S>> {
match self.conn.unwrap() {
ProtoServer::H1(h1) => {
ProtoServer::H1(h1, _) => {
let (io, read_buf, dispatch) = h1.into_inner();
Some(Parts {
io,
@@ -605,6 +649,7 @@ where
_inner: (),
})
}
#[cfg(feature = "http2")]
ProtoServer::H2(_h2) => None,
}
}
@@ -628,18 +673,24 @@ where
{
loop {
let polled = match *self.conn.as_mut().unwrap() {
ProtoServer::H1(ref mut h1) => h1.poll_without_shutdown(cx),
ProtoServer::H1(ref mut h1, _) => h1.poll_without_shutdown(cx),
#[cfg(feature = "http2")]
ProtoServer::H2(ref mut h2) => return Pin::new(h2).poll(cx).map_ok(|_| ()),
};
match ready!(polled) {
Ok(()) => return Poll::Ready(Ok(())),
Err(e) => match *e.kind() {
Err(e) => {
#[cfg(feature = "http2")]
match *e.kind() {
Kind::Parse(Parse::VersionH2) if self.fallback.to_h2() => {
self.upgrade_h2();
continue;
}
_ => return Poll::Ready(Err(e)),
},
_ => (),
}
return Poll::Ready(Err(e));
}
}
}
}
@@ -659,12 +710,13 @@ where
})
}
#[cfg(feature = "http2")]
fn upgrade_h2(&mut self) {
trace!("Trying to upgrade connection to h2");
let conn = self.conn.take();
let (io, read_buf, dispatch) = match conn.unwrap() {
ProtoServer::H1(h1) => h1.into_inner(),
ProtoServer::H1(h1, _) => h1.into_inner(),
ProtoServer::H2(_h2) => {
panic!("h2 cannot into_inner");
}
@@ -699,7 +751,7 @@ where
I: AsyncRead + AsyncWrite + Unpin + 'static,
B: HttpBody + 'static,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
E: H2Exec<S::Future, B>,
E: ConnStreamExec<S::Future, B>,
{
type Output = crate::Result<()>;
@@ -716,13 +768,18 @@ where
}
return Poll::Ready(Ok(()));
}
Err(e) => match *e.kind() {
Err(e) => {
#[cfg(feature = "http2")]
match *e.kind() {
Kind::Parse(Parse::VersionH2) if self.fallback.to_h2() => {
self.upgrade_h2();
continue;
}
_ => return Poll::Ready(Err(e)),
},
_ => (),
}
return Poll::Ready(Err(e));
}
}
}
}
@@ -736,6 +793,23 @@ where
f.debug_struct("Connection").finish()
}
}
// ===== impl ConnectionMode =====
impl ConnectionMode {}
impl Default for ConnectionMode {
#[cfg(feature = "http2")]
fn default() -> ConnectionMode {
ConnectionMode::Fallback
}
#[cfg(not(feature = "http2"))]
fn default() -> ConnectionMode {
ConnectionMode::H1Only
}
}
// ===== impl Serve =====
impl<I, S, E> Serve<I, S, E> {
@@ -766,7 +840,7 @@ where
IE: Into<Box<dyn StdError + Send + Sync>>,
S: MakeServiceRef<IO, Body, ResBody = B>,
B: HttpBody,
E: H2Exec<<S::Service as HttpService<Body>>::Future, B>,
E: ConnStreamExec<<S::Service as HttpService<Body>>::Future, B>,
{
fn poll_next_(
self: Pin<&mut Self>,
@@ -804,7 +878,7 @@ where
S: HttpService<Body, ResBody = B>,
B: HttpBody + 'static,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
E: H2Exec<S::Future, B>,
E: ConnStreamExec<S::Future, B>,
{
type Output = Result<Connection<I, S, E>, FE>;
@@ -838,7 +912,7 @@ where
IO: AsyncRead + AsyncWrite + Unpin + Send + 'static,
S: MakeServiceRef<IO, Body, ResBody = B>,
B: HttpBody,
E: H2Exec<<S::Service as HttpService<Body>>::Future, B>,
E: ConnStreamExec<<S::Service as HttpService<Body>>::Future, B>,
{
pub(super) fn poll_watch<W>(
self: Pin<&mut Self>,
@@ -875,13 +949,14 @@ where
S::Error: Into<Box<dyn StdError + Send + Sync>>,
B: HttpBody + 'static,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
E: H2Exec<S::Future, B>,
E: ConnStreamExec<S::Future, B>,
{
type Output = crate::Result<proto::Dispatched>;
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
match self.project() {
ProtoServerProj::H1(s) => s.poll(cx),
ProtoServerProj::H1(s, _) => s.poll(cx),
#[cfg(feature = "http2")]
ProtoServerProj::H2(s) => s.poll(cx),
}
}
@@ -893,7 +968,7 @@ pub(crate) mod spawn_all {
use super::{Connecting, UpgradeableConnection};
use crate::body::{Body, HttpBody};
use crate::common::exec::H2Exec;
use crate::common::exec::ConnStreamExec;
use crate::common::{task, Future, Pin, Poll, Unpin};
use crate::service::HttpService;
use pin_project::pin_project;
@@ -920,7 +995,7 @@ pub(crate) mod spawn_all {
where
I: AsyncRead + AsyncWrite + Unpin + Send + 'static,
S: HttpService<Body>,
E: H2Exec<S::Future, S::ResBody>,
E: ConnStreamExec<S::Future, S::ResBody>,
S::ResBody: 'static,
<S::ResBody as HttpBody>::Error: Into<Box<dyn StdError + Send + Sync>>,
{
@@ -970,7 +1045,7 @@ pub(crate) mod spawn_all {
S: HttpService<Body, ResBody = B>,
B: HttpBody + 'static,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
E: H2Exec<S::Future, B>,
E: ConnStreamExec<S::Future, B>,
W: Watcher<I, S, E>,
{
type Output = ();
@@ -1036,7 +1111,7 @@ mod upgrades {
I: AsyncRead + AsyncWrite + Unpin,
B: HttpBody + 'static,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
E: H2Exec<S::Future, B>,
E: ConnStreamExec<S::Future, B>,
{
/// Start a graceful shutdown process for this connection.
///
@@ -1054,7 +1129,7 @@ mod upgrades {
I: AsyncRead + AsyncWrite + Unpin + Send + 'static,
B: HttpBody + 'static,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
E: super::H2Exec<S::Future, B>,
E: ConnStreamExec<S::Future, B>,
{
type Output = crate::Result<()>;
@@ -1064,7 +1139,7 @@ mod upgrades {
Ok(proto::Dispatched::Shutdown) => return Poll::Ready(Ok(())),
Ok(proto::Dispatched::Upgrade(pending)) => {
let h1 = match mem::replace(&mut self.inner.conn, None) {
Some(ProtoServer::H1(h1)) => h1,
Some(ProtoServer::H1(h1, _)) => h1,
_ => unreachable!("Upgrade expects h1"),
};
@@ -1072,13 +1147,18 @@ mod upgrades {
pending.fulfill(Upgraded::new(io, buf));
return Poll::Ready(Ok(()));
}
Err(e) => match *e.kind() {
Err(e) => {
#[cfg(feature = "http2")]
match *e.kind() {
Kind::Parse(Parse::VersionH2) if self.inner.fallback.to_h2() => {
self.inner.upgrade_h2();
continue;
}
_ => return Poll::Ready(Err(e)),
},
_ => (),
}
return Poll::Ready(Err(e));
}
}
}
}

View File

@@ -70,7 +70,7 @@ use tokio::io::{AsyncRead, AsyncWrite};
use self::accept::Accept;
use crate::body::{Body, HttpBody};
use crate::common::exec::{Exec, H2Exec, NewSvcExec};
use crate::common::exec::{ConnStreamExec, Exec, NewSvcExec};
use crate::common::{task, Future, Pin, Poll, Unpin};
use crate::service::{HttpService, MakeServiceRef};
// Renamed `Http` as `Http_` for now so that people upgrading don't see an
@@ -154,7 +154,7 @@ where
S::Error: Into<Box<dyn StdError + Send + Sync>>,
B: HttpBody + Send + Sync + 'static,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
E: H2Exec<<S::Service as HttpService<Body>>::Future, B>,
E: ConnStreamExec<<S::Service as HttpService<Body>>::Future, B>,
E: NewSvcExec<IO, S::Future, S::Service, E, GracefulWatcher>,
{
/// Prepares a server to handle graceful shutdown when the provided future
@@ -210,7 +210,7 @@ where
S::Error: Into<Box<dyn StdError + Send + Sync>>,
B: HttpBody + 'static,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
E: H2Exec<<S::Service as HttpService<Body>>::Future, B>,
E: ConnStreamExec<<S::Service as HttpService<Body>>::Future, B>,
E: NewSvcExec<IO, S::Future, S::Service, E, NoopWatcher>,
{
type Output = crate::Result<()>;
@@ -307,6 +307,8 @@ impl<I, E> Builder<I, E> {
/// Sets whether HTTP/2 is required.
///
/// Default is `false`.
#[cfg(feature = "http2")]
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
pub fn http2_only(mut self, val: bool) -> Self {
self.protocol.http2_only(val);
self
@@ -320,6 +322,8 @@ impl<I, E> Builder<I, E> {
/// If not set, hyper will use a default.
///
/// [spec]: https://http2.github.io/http2-spec/#SETTINGS_INITIAL_WINDOW_SIZE
#[cfg(feature = "http2")]
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
pub fn http2_initial_stream_window_size(mut self, sz: impl Into<Option<u32>>) -> Self {
self.protocol.http2_initial_stream_window_size(sz.into());
self
@@ -330,6 +334,8 @@ impl<I, E> Builder<I, E> {
/// Passing `None` will do nothing.
///
/// If not set, hyper will use a default.
#[cfg(feature = "http2")]
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
pub fn http2_initial_connection_window_size(mut self, sz: impl Into<Option<u32>>) -> Self {
self.protocol
.http2_initial_connection_window_size(sz.into());
@@ -341,6 +347,8 @@ impl<I, E> Builder<I, E> {
/// Enabling this will override the limits set in
/// `http2_initial_stream_window_size` and
/// `http2_initial_connection_window_size`.
#[cfg(feature = "http2")]
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
pub fn http2_adaptive_window(mut self, enabled: bool) -> Self {
self.protocol.http2_adaptive_window(enabled);
self
@@ -351,6 +359,8 @@ impl<I, E> Builder<I, E> {
/// Passing `None` will do nothing.
///
/// If not set, hyper will use a default.
#[cfg(feature = "http2")]
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
pub fn http2_max_frame_size(mut self, sz: impl Into<Option<u32>>) -> Self {
self.protocol.http2_max_frame_size(sz);
self
@@ -362,6 +372,8 @@ impl<I, E> Builder<I, E> {
/// Default is no limit (`std::u32::MAX`). Passing `None` will do nothing.
///
/// [spec]: https://http2.github.io/http2-spec/#SETTINGS_MAX_CONCURRENT_STREAMS
#[cfg(feature = "http2")]
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
pub fn http2_max_concurrent_streams(mut self, max: impl Into<Option<u32>>) -> Self {
self.protocol.http2_max_concurrent_streams(max.into());
self
@@ -378,6 +390,8 @@ impl<I, E> Builder<I, E> {
///
/// Requires the `runtime` cargo feature to be enabled.
#[cfg(feature = "runtime")]
#[cfg(feature = "http2")]
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
pub fn http2_keep_alive_interval(mut self, interval: impl Into<Option<Duration>>) -> Self {
self.protocol.http2_keep_alive_interval(interval);
self
@@ -394,6 +408,8 @@ impl<I, E> Builder<I, E> {
///
/// Requires the `runtime` cargo feature to be enabled.
#[cfg(feature = "runtime")]
#[cfg(feature = "http2")]
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
pub fn http2_keep_alive_timeout(mut self, timeout: Duration) -> Self {
self.protocol.http2_keep_alive_timeout(timeout);
self
@@ -449,7 +465,7 @@ impl<I, E> Builder<I, E> {
B: HttpBody + 'static,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
E: NewSvcExec<I::Conn, S::Future, S::Service, E, NoopWatcher>,
E: H2Exec<<S::Service as HttpService<Body>>::Future, B>,
E: ConnStreamExec<<S::Service as HttpService<Body>>::Future, B>,
{
let serve = self.protocol.serve(self.incoming, new_service);
let spawn_all = serve.spawn_all();

View File

@@ -7,7 +7,7 @@ use super::conn::{SpawnAll, UpgradeableConnection, Watcher};
use super::Accept;
use crate::body::{Body, HttpBody};
use crate::common::drain::{self, Draining, Signal, Watch, Watching};
use crate::common::exec::{H2Exec, NewSvcExec};
use crate::common::exec::{ConnStreamExec, NewSvcExec};
use crate::common::{task, Future, Pin, Poll, Unpin};
use crate::service::{HttpService, MakeServiceRef};
@@ -53,7 +53,7 @@ where
B: HttpBody + Send + Sync + 'static,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
F: Future<Output = ()>,
E: H2Exec<<S::Service as HttpService<Body>>::Future, B>,
E: ConnStreamExec<<S::Service as HttpService<Body>>::Future, B>,
E: NewSvcExec<IO, S::Future, S::Service, E, GracefulWatcher>,
{
type Output = crate::Result<()>;
@@ -96,7 +96,7 @@ impl<I, S, E> Watcher<I, S, E> for GracefulWatcher
where
I: AsyncRead + AsyncWrite + Unpin + Send + 'static,
S: HttpService<Body>,
E: H2Exec<S::Future, S::ResBody>,
E: ConnStreamExec<S::Future, S::ResBody>,
S::ResBody: Send + Sync + 'static,
<S::ResBody as HttpBody>::Error: Into<Box<dyn StdError + Send + Sync>>,
{
@@ -115,7 +115,7 @@ where
I: AsyncRead + AsyncWrite + Unpin,
S::ResBody: HttpBody + Send + 'static,
<S::ResBody as HttpBody>::Error: Into<Box<dyn StdError + Send + Sync>>,
E: H2Exec<S::Future, S::ResBody>,
E: ConnStreamExec<S::Future, S::ResBody>,
{
conn.graceful_shutdown()
}