feat(upgrade): Moved HTTP upgrades off Body to a new API (#2337)

Closes #2086

BREAKING CHANGE: The method `Body::on_upgrade()` is gone. It is
  essentially replaced with `hyper::upgrade::on(msg)`.
This commit is contained in:
Sean McArthur
2020-11-19 16:36:12 -08:00
committed by GitHub
parent 751c122589
commit 121c33132c
6 changed files with 62 additions and 27 deletions

View File

@@ -58,7 +58,7 @@ async fn proxy(client: HttpClient, req: Request<Body>) -> Result<Response<Body>,
// `on_upgrade` future. // `on_upgrade` future.
if let Some(addr) = host_addr(req.uri()) { if let Some(addr) = host_addr(req.uri()) {
tokio::task::spawn(async move { tokio::task::spawn(async move {
match req.into_body().on_upgrade().await { match hyper::upgrade::on(req).await {
Ok(upgraded) => { Ok(upgraded) => {
if let Err(e) = tunnel(upgraded, addr).await { if let Err(e) = tunnel(upgraded, addr).await {
eprintln!("server io error: {}", e); eprintln!("server io error: {}", e);

View File

@@ -34,7 +34,7 @@ async fn server_upgraded_io(mut upgraded: Upgraded) -> Result<()> {
} }
/// Our server HTTP handler to initiate HTTP upgrades. /// Our server HTTP handler to initiate HTTP upgrades.
async fn server_upgrade(req: Request<Body>) -> Result<Response<Body>> { async fn server_upgrade(mut req: Request<Body>) -> Result<Response<Body>> {
let mut res = Response::new(Body::empty()); let mut res = Response::new(Body::empty());
// Send a 400 to any request that doesn't have // Send a 400 to any request that doesn't have
@@ -52,7 +52,7 @@ async fn server_upgrade(req: Request<Body>) -> Result<Response<Body>> {
// is returned below, so it's better to spawn this future instead // is returned below, so it's better to spawn this future instead
// waiting for it to complete to then return a response. // waiting for it to complete to then return a response.
tokio::task::spawn(async move { tokio::task::spawn(async move {
match req.into_body().on_upgrade().await { match hyper::upgrade::on(&mut req).await {
Ok(upgraded) => { Ok(upgraded) => {
if let Err(e) = server_upgraded_io(upgraded).await { if let Err(e) = server_upgraded_io(upgraded).await {
eprintln!("server foobar io error: {}", e) eprintln!("server foobar io error: {}", e)
@@ -97,7 +97,7 @@ async fn client_upgrade_request(addr: SocketAddr) -> Result<()> {
panic!("Our server didn't upgrade: {}", res.status()); panic!("Our server didn't upgrade: {}", res.status());
} }
match res.into_body().on_upgrade().await { match hyper::upgrade::on(res).await {
Ok(upgraded) => { Ok(upgraded) => {
if let Err(e) = client_upgraded_io(upgraded).await { if let Err(e) = client_upgraded_io(upgraded).await {
eprintln!("client foobar io error: {}", e) eprintln!("client foobar io error: {}", e)

View File

@@ -187,13 +187,15 @@ impl Body {
Body::new(Kind::Wrapped(SyncWrapper::new(Box::pin(mapped)))) Body::new(Kind::Wrapped(SyncWrapper::new(Box::pin(mapped))))
} }
/// Converts this `Body` into a `Future` of a pending HTTP upgrade. // TODO: Eventually the pending upgrade should be stored in the
/// // `Extensions`, and all these pieces can be removed. In v0.14, we made
/// See [the `upgrade` module](crate::upgrade) for more. // the breaking changes, so now this TODO can be done without breakage.
pub fn on_upgrade(self) -> OnUpgrade { pub(crate) fn take_upgrade(&mut self) -> OnUpgrade {
self.extra if let Some(ref mut extra) = self.extra {
.map(|ex| ex.on_upgrade) std::mem::replace(&mut extra.on_upgrade, OnUpgrade::none())
.unwrap_or_else(OnUpgrade::none) } else {
OnUpgrade::none()
}
} }
fn new(kind: Kind) -> Body { fn new(kind: Kind) -> Body {

View File

@@ -57,18 +57,16 @@ pub struct Parts<T> {
_inner: (), _inner: (),
} }
/// Gets a pending HTTP upgrade from this message.
pub fn on<T: sealed::CanUpgrade>(msg: T) -> OnUpgrade {
msg.on_upgrade()
}
#[cfg(feature = "http1")] #[cfg(feature = "http1")]
pub(crate) struct Pending { pub(crate) struct Pending {
tx: oneshot::Sender<crate::Result<Upgraded>>, tx: oneshot::Sender<crate::Result<Upgraded>>,
} }
/// Error cause returned when an upgrade was expected but canceled
/// for whatever reason.
///
/// This likely means the actual `Conn` future wasn't polled and upgraded.
#[derive(Debug)]
struct UpgradeExpected(());
#[cfg(feature = "http1")] #[cfg(feature = "http1")]
pub(crate) fn pending() -> (Pending, OnUpgrade) { pub(crate) fn pending() -> (Pending, OnUpgrade) {
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
@@ -162,9 +160,7 @@ impl Future for OnUpgrade {
Some(ref mut rx) => Pin::new(rx).poll(cx).map(|res| match res { Some(ref mut rx) => Pin::new(rx).poll(cx).map(|res| match res {
Ok(Ok(upgraded)) => Ok(upgraded), Ok(Ok(upgraded)) => Ok(upgraded),
Ok(Err(err)) => Err(err), Ok(Err(err)) => Err(err),
Err(_oneshot_canceled) => { Err(_oneshot_canceled) => Err(crate::Error::new_canceled().with(UpgradeExpected)),
Err(crate::Error::new_canceled().with(UpgradeExpected(())))
}
}), }),
None => Poll::Ready(Err(crate::Error::new_user_no_upgrade())), None => Poll::Ready(Err(crate::Error::new_user_no_upgrade())),
} }
@@ -196,9 +192,16 @@ impl Pending {
// ===== impl UpgradeExpected ===== // ===== impl UpgradeExpected =====
/// Error cause returned when an upgrade was expected but canceled
/// for whatever reason.
///
/// This likely means the actual `Conn` future wasn't polled and upgraded.
#[derive(Debug)]
struct UpgradeExpected;
impl fmt::Display for UpgradeExpected { impl fmt::Display for UpgradeExpected {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "upgrade expected but not completed") f.write_str("upgrade expected but not completed")
} }
} }
@@ -277,6 +280,38 @@ impl<T: AsyncRead + AsyncWrite + Unpin + 'static> Io for ForwardsWriteBuf<T> {
} }
} }
mod sealed {
use super::OnUpgrade;
pub trait CanUpgrade {
fn on_upgrade(self) -> OnUpgrade;
}
impl CanUpgrade for http::Request<crate::Body> {
fn on_upgrade(self) -> OnUpgrade {
self.into_body().take_upgrade()
}
}
impl CanUpgrade for &'_ mut http::Request<crate::Body> {
fn on_upgrade(self) -> OnUpgrade {
self.body_mut().take_upgrade()
}
}
impl CanUpgrade for http::Response<crate::Body> {
fn on_upgrade(self) -> OnUpgrade {
self.into_body().take_upgrade()
}
}
impl CanUpgrade for &'_ mut http::Response<crate::Body> {
fn on_upgrade(self) -> OnUpgrade {
self.body_mut().take_upgrade()
}
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;

View File

@@ -1790,9 +1790,7 @@ mod dispatch_impl {
let res = rt.block_on(future::join(res, rx).map(|r| r.0)).unwrap(); let res = rt.block_on(future::join(res, rx).map(|r| r.0)).unwrap();
assert_eq!(res.status(), 101); assert_eq!(res.status(), 101);
let upgraded = rt let upgraded = rt.block_on(hyper::upgrade::on(res)).expect("on_upgrade");
.block_on(res.into_body().on_upgrade())
.expect("on_upgrade");
let parts = upgraded.downcast::<DebugStream>().unwrap(); let parts = upgraded.downcast::<DebugStream>().unwrap();
assert_eq!(s(&parts.read_buf), "foobar=ready"); assert_eq!(s(&parts.read_buf), "foobar=ready");

View File

@@ -1341,7 +1341,7 @@ async fn upgrades_new() {
let (upgrades_tx, upgrades_rx) = mpsc::channel(); let (upgrades_tx, upgrades_rx) = mpsc::channel();
let svc = service_fn(move |req: Request<Body>| { let svc = service_fn(move |req: Request<Body>| {
let on_upgrade = req.into_body().on_upgrade(); let on_upgrade = hyper::upgrade::on(req);
let _ = upgrades_tx.send(on_upgrade); let _ = upgrades_tx.send(on_upgrade);
future::ok::<_, hyper::Error>( future::ok::<_, hyper::Error>(
Response::builder() Response::builder()
@@ -1448,7 +1448,7 @@ async fn http_connect_new() {
let (upgrades_tx, upgrades_rx) = mpsc::channel(); let (upgrades_tx, upgrades_rx) = mpsc::channel();
let svc = service_fn(move |req: Request<Body>| { let svc = service_fn(move |req: Request<Body>| {
let on_upgrade = req.into_body().on_upgrade(); let on_upgrade = hyper::upgrade::on(req);
let _ = upgrades_tx.send(on_upgrade); let _ = upgrades_tx.send(on_upgrade);
future::ok::<_, hyper::Error>( future::ok::<_, hyper::Error>(
Response::builder() Response::builder()