refactor(http1): move upgrade state from body to head (#2353)

Move state required for protocol upgrades to head
representations, instead of associating it with the body.

Closes #2340.

Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com>
This commit is contained in:
Arve Knudsen
2020-12-15 16:31:48 +01:00
committed by GitHub
parent 7d9a5806e1
commit ede3a6bd9d
5 changed files with 33 additions and 37 deletions

View File

@@ -23,7 +23,6 @@ use crate::common::{task, watch, Pin, Poll};
use crate::common::{Future, Never}; use crate::common::{Future, Never};
#[cfg(all(feature = "http2", any(feature = "client", feature = "server")))] #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
use crate::proto::h2::ping; use crate::proto::h2::ping;
use crate::upgrade::OnUpgrade;
type BodySender = mpsc::Sender<Result<Bytes, crate::Error>>; type BodySender = mpsc::Sender<Result<Bytes, crate::Error>>;
@@ -70,7 +69,6 @@ struct Extra {
/// a brand new connection, since the pool didn't know about the idle /// a brand new connection, since the pool didn't know about the idle
/// connection yet. /// connection yet.
delayed_eof: Option<DelayEof>, delayed_eof: Option<DelayEof>,
on_upgrade: OnUpgrade,
} }
#[cfg(any(feature = "http1", feature = "http2"))] #[cfg(any(feature = "http1", feature = "http2"))]
@@ -187,17 +185,6 @@ impl Body {
Body::new(Kind::Wrapped(SyncWrapper::new(Box::pin(mapped)))) Body::new(Kind::Wrapped(SyncWrapper::new(Box::pin(mapped))))
} }
// TODO: Eventually the pending upgrade should be stored in the
// `Extensions`, and all these pieces can be removed. In v0.14, we made
// the breaking changes, so now this TODO can be done without breakage.
pub(crate) fn take_upgrade(&mut self) -> OnUpgrade {
if let Some(ref mut extra) = self.extra {
std::mem::replace(&mut extra.on_upgrade, OnUpgrade::none())
} else {
OnUpgrade::none()
}
}
fn new(kind: Kind) -> Body { fn new(kind: Kind) -> Body {
Body { kind, extra: None } Body { kind, extra: None }
} }
@@ -217,14 +204,6 @@ impl Body {
body body
} }
#[cfg(feature = "http1")]
pub(crate) fn set_on_upgrade(&mut self, upgrade: OnUpgrade) {
debug_assert!(!upgrade.is_none(), "set_on_upgrade with empty upgrade");
let extra = self.extra_mut();
debug_assert!(extra.on_upgrade.is_none(), "set_on_upgrade twice");
extra.on_upgrade = upgrade;
}
#[cfg(any(feature = "http1", feature = "http2"))] #[cfg(any(feature = "http1", feature = "http2"))]
#[cfg(feature = "client")] #[cfg(feature = "client")]
pub(crate) fn delayed_eof(&mut self, fut: DelayEofUntil) { pub(crate) fn delayed_eof(&mut self, fut: DelayEofUntil) {
@@ -239,12 +218,8 @@ impl Body {
#[cfg(any(feature = "http1", feature = "http2"))] #[cfg(any(feature = "http1", feature = "http2"))]
fn extra_mut(&mut self) -> &mut Extra { fn extra_mut(&mut self) -> &mut Extra {
self.extra.get_or_insert_with(|| { self.extra
Box::new(Extra { .get_or_insert_with(|| Box::new(Extra { delayed_eof: None }))
delayed_eof: None,
on_upgrade: OnUpgrade::none(),
})
})
} }
fn poll_eof(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<crate::Result<Bytes>>> { fn poll_eof(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<crate::Result<Bytes>>> {

View File

@@ -10,6 +10,7 @@ use crate::common::{task, Future, Pin, Poll, Unpin};
use crate::proto::{ use crate::proto::{
BodyLength, Conn, Dispatched, MessageHead, RequestHead, BodyLength, Conn, Dispatched, MessageHead, RequestHead,
}; };
use crate::upgrade::OnUpgrade;
pub(crate) struct Dispatcher<D, Bs: HttpBody, I, T> { pub(crate) struct Dispatcher<D, Bs: HttpBody, I, T> {
conn: Conn<I, Bs::Data, T>, conn: Conn<I, Bs::Data, T>,
@@ -243,8 +244,8 @@ where
} }
// dispatch is ready for a message, try to read one // dispatch is ready for a message, try to read one
match ready!(self.conn.poll_read_head(cx)) { match ready!(self.conn.poll_read_head(cx)) {
Some(Ok((head, body_len, wants))) => { Some(Ok((mut head, body_len, wants))) => {
let mut body = match body_len { let body = match body_len {
DecodedLength::ZERO => Body::empty(), DecodedLength::ZERO => Body::empty(),
other => { other => {
let (tx, rx) = Body::new_channel(other, wants.contains(Wants::EXPECT)); let (tx, rx) = Body::new_channel(other, wants.contains(Wants::EXPECT));
@@ -253,7 +254,10 @@ where
} }
}; };
if wants.contains(Wants::UPGRADE) { if wants.contains(Wants::UPGRADE) {
body.set_on_upgrade(self.conn.on_upgrade()); let upgrade = self.conn.on_upgrade();
debug_assert!(!upgrade.is_none(), "empty upgrade");
debug_assert!(head.extensions.get::<OnUpgrade>().is_none(), "OnUpgrade already set");
head.extensions.insert(upgrade);
} }
self.dispatch.recv_msg(Ok((head, body)))?; self.dispatch.recv_msg(Ok((head, body)))?;
Poll::Ready(Ok(())) Poll::Ready(Ok(()))
@@ -488,6 +492,7 @@ cfg_server! {
version: parts.version, version: parts.version,
subject: parts.status, subject: parts.status,
headers: parts.headers, headers: parts.headers,
extensions: http::Extensions::default(),
}; };
Poll::Ready(Some(Ok((head, body)))) Poll::Ready(Some(Ok((head, body))))
} else { } else {
@@ -506,6 +511,7 @@ cfg_server! {
*req.uri_mut() = msg.subject.1; *req.uri_mut() = msg.subject.1;
*req.headers_mut() = msg.headers; *req.headers_mut() = msg.headers;
*req.version_mut() = msg.version; *req.version_mut() = msg.version;
*req.extensions_mut() = msg.extensions;
let fut = self.service.call(req); let fut = self.service.call(req);
self.in_flight.set(Some(fut)); self.in_flight.set(Some(fut));
Ok(()) Ok(())
@@ -570,6 +576,7 @@ cfg_client! {
version: parts.version, version: parts.version,
subject: crate::proto::RequestLine(parts.method, parts.uri), subject: crate::proto::RequestLine(parts.method, parts.uri),
headers: parts.headers, headers: parts.headers,
extensions: http::Extensions::default(),
}; };
*this.callback = Some(cb); *this.callback = Some(cb);
Poll::Ready(Some(Ok((head, body)))) Poll::Ready(Some(Ok((head, body))))
@@ -594,6 +601,7 @@ cfg_client! {
*res.status_mut() = msg.subject; *res.status_mut() = msg.subject;
*res.headers_mut() = msg.headers; *res.headers_mut() = msg.headers;
*res.version_mut() = msg.version; *res.version_mut() = msg.version;
*res.extensions_mut() = msg.extensions;
cb.send(Ok(res)); cb.send(Ok(res));
Ok(()) Ok(())
} else { } else {

View File

@@ -270,6 +270,7 @@ impl Http1Transaction for Server {
version, version,
subject, subject,
headers, headers,
extensions: http::Extensions::default(),
}, },
decode: decoder, decode: decoder,
expect_continue, expect_continue,
@@ -713,6 +714,7 @@ impl Http1Transaction for Client {
version, version,
subject: status, subject: status,
headers, headers,
extensions: http::Extensions::default(),
}; };
if let Some((decode, is_upgrade)) = Client::decoder(&head, ctx.req_method)? { if let Some((decode, is_upgrade)) = Client::decoder(&head, ctx.req_method)? {
return Ok(Some(ParsedMessage { return Ok(Some(ParsedMessage {

View File

@@ -16,7 +16,7 @@ cfg_http2! {
} }
/// An Incoming Message head. Includes request/status line, and headers. /// An Incoming Message head. Includes request/status line, and headers.
#[derive(Clone, Debug, Default, PartialEq)] #[derive(Debug, Default)]
pub struct MessageHead<S> { pub struct MessageHead<S> {
/// HTTP version of the message. /// HTTP version of the message.
pub version: http::Version, pub version: http::Version,
@@ -24,6 +24,9 @@ pub struct MessageHead<S> {
pub subject: S, pub subject: S,
/// Headers of the Incoming message. /// Headers of the Incoming message.
pub headers: http::HeaderMap, pub headers: http::HeaderMap,
/// Extensions.
extensions: http::Extensions,
} }
/// An incoming request message. /// An incoming request message.

View File

@@ -317,26 +317,34 @@ mod sealed {
} }
impl CanUpgrade for http::Request<crate::Body> { impl CanUpgrade for http::Request<crate::Body> {
fn on_upgrade(self) -> OnUpgrade { fn on_upgrade(mut self) -> OnUpgrade {
self.into_body().take_upgrade() self.extensions_mut()
.remove::<OnUpgrade>()
.unwrap_or_else(OnUpgrade::none)
} }
} }
impl CanUpgrade for &'_ mut http::Request<crate::Body> { impl CanUpgrade for &'_ mut http::Request<crate::Body> {
fn on_upgrade(self) -> OnUpgrade { fn on_upgrade(self) -> OnUpgrade {
self.body_mut().take_upgrade() self.extensions_mut()
.remove::<OnUpgrade>()
.unwrap_or_else(OnUpgrade::none)
} }
} }
impl CanUpgrade for http::Response<crate::Body> { impl CanUpgrade for http::Response<crate::Body> {
fn on_upgrade(self) -> OnUpgrade { fn on_upgrade(mut self) -> OnUpgrade {
self.into_body().take_upgrade() self.extensions_mut()
.remove::<OnUpgrade>()
.unwrap_or_else(OnUpgrade::none)
} }
} }
impl CanUpgrade for &'_ mut http::Response<crate::Body> { impl CanUpgrade for &'_ mut http::Response<crate::Body> {
fn on_upgrade(self) -> OnUpgrade { fn on_upgrade(self) -> OnUpgrade {
self.body_mut().take_upgrade() self.extensions_mut()
.remove::<OnUpgrade>()
.unwrap_or_else(OnUpgrade::none)
} }
} }
} }