feat(body): remove Sync bound for Body::wrap_stream

A stream wrapped into a Body previously needed to implement `Sync` so
that the Body type implements this autotrait as well (which is needed
due to limitations in async/await). Since a stream only offers one
method that is called with an exclusive reference, this type is
statically proven to be Sync already. In theory it should be fine to add
an `unsafe impl Sync`, but this commit instead adds a SyncWrapper to
enlist the compiler’s help in proving that this is (and remains) correct.

This makes it easier to construct response bodies for client code.
This commit is contained in:
Roland Kuhn
2020-04-19 18:32:17 +02:00
committed by Sean McArthur
parent d5b0ee5672
commit 042c770603
3 changed files with 126 additions and 15 deletions

View File

@@ -11,6 +11,7 @@ use futures_util::TryStreamExt;
use http::HeaderMap;
use http_body::{Body as HttpBody, SizeHint};
use crate::common::sync_wrapper::SyncWrapper;
use crate::common::{task, watch, Future, Never, Pin, Poll};
use crate::proto::h2::ping;
use crate::proto::DecodedLength;
@@ -42,13 +43,11 @@ enum Kind {
content_length: DecodedLength,
recv: h2::RecvStream,
},
// NOTE: This requires `Sync` because of how easy it is to use `await`
// while a borrow of a `Request<Body>` exists.
//
// See https://github.com/rust-lang/rust/issues/57017
#[cfg(feature = "stream")]
Wrapped(
Pin<Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send + Sync>>,
SyncWrapper<
Pin<Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send>>,
>,
),
}
@@ -156,12 +155,12 @@ impl Body {
#[cfg(feature = "stream")]
pub fn wrap_stream<S, O, E>(stream: S) -> Body
where
S: Stream<Item = Result<O, E>> + Send + Sync + 'static,
S: Stream<Item = Result<O, E>> + Send + 'static,
O: Into<Bytes> + 'static,
E: Into<Box<dyn StdError + Send + Sync>> + 'static,
{
let mapped = stream.map_ok(Into::into).map_err(Into::into);
Body::new(Kind::Wrapped(Box::pin(mapped)))
Body::new(Kind::Wrapped(SyncWrapper::new(Box::pin(mapped))))
}
/// Converts this `Body` into a `Future` of a pending HTTP upgrade.
@@ -280,7 +279,7 @@ impl Body {
},
#[cfg(feature = "stream")]
Kind::Wrapped(ref mut s) => match ready!(s.as_mut().poll_next(cx)) {
Kind::Wrapped(ref mut s) => match ready!(s.get_mut().as_mut().poll_next(cx)) {
Some(res) => Poll::Ready(Some(res.map_err(crate::Error::new_body))),
None => Poll::Ready(None),
},
@@ -402,16 +401,12 @@ impl Stream for Body {
/// This function requires enabling the `stream` feature in your
/// `Cargo.toml`.
#[cfg(feature = "stream")]
impl From<Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send + Sync>>
for Body
{
impl From<Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send>> for Body {
#[inline]
fn from(
stream: Box<
dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send + Sync,
>,
stream: Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send>,
) -> Body {
Body::new(Kind::Wrapped(stream.into()))
Body::new(Kind::Wrapped(SyncWrapper::new(stream.into())))
}
}