diff --git a/src/body/body.rs b/src/body/body.rs
index 0c75f719..355f0d87 100644
--- a/src/body/body.rs
+++ b/src/body/body.rs
@@ -11,6 +11,7 @@ use futures_util::TryStreamExt;
use http::HeaderMap;
use http_body::{Body as HttpBody, SizeHint};
+use crate::common::sync_wrapper::SyncWrapper;
use crate::common::{task, watch, Future, Never, Pin, Poll};
use crate::proto::h2::ping;
use crate::proto::DecodedLength;
@@ -42,13 +43,11 @@ enum Kind {
content_length: DecodedLength,
recv: h2::RecvStream,
},
- // NOTE: This requires `Sync` because of how easy it is to use `await`
- // while a borrow of a `Request
` exists.
- //
- // See https://github.com/rust-lang/rust/issues/57017
#[cfg(feature = "stream")]
Wrapped(
- Pin>> + Send + Sync>>,
+ SyncWrapper<
+ Pin>> + Send>>,
+ >,
),
}
@@ -156,12 +155,12 @@ impl Body {
#[cfg(feature = "stream")]
pub fn wrap_stream(stream: S) -> Body
where
- S: Stream- > + Send + Sync + 'static,
+ S: Stream
- > + Send + 'static,
O: Into + 'static,
E: Into> + 'static,
{
let mapped = stream.map_ok(Into::into).map_err(Into::into);
- Body::new(Kind::Wrapped(Box::pin(mapped)))
+ Body::new(Kind::Wrapped(SyncWrapper::new(Box::pin(mapped))))
}
/// Converts this `Body` into a `Future` of a pending HTTP upgrade.
@@ -280,7 +279,7 @@ impl Body {
},
#[cfg(feature = "stream")]
- Kind::Wrapped(ref mut s) => match ready!(s.as_mut().poll_next(cx)) {
+ Kind::Wrapped(ref mut s) => match ready!(s.get_mut().as_mut().poll_next(cx)) {
Some(res) => Poll::Ready(Some(res.map_err(crate::Error::new_body))),
None => Poll::Ready(None),
},
@@ -402,16 +401,12 @@ impl Stream for Body {
/// This function requires enabling the `stream` feature in your
/// `Cargo.toml`.
#[cfg(feature = "stream")]
-impl From>> + Send + Sync>>
- for Body
-{
+impl From>> + Send>> for Body {
#[inline]
fn from(
- stream: Box<
- dyn Stream
- >> + Send + Sync,
- >,
+ stream: Box>> + Send>,
) -> Body {
- Body::new(Kind::Wrapped(stream.into()))
+ Body::new(Kind::Wrapped(SyncWrapper::new(stream.into())))
}
}
diff --git a/src/common/mod.rs b/src/common/mod.rs
index e436fe5e..d9d62bc2 100644
--- a/src/common/mod.rs
+++ b/src/common/mod.rs
@@ -13,6 +13,7 @@ pub(crate) mod exec;
pub(crate) mod io;
mod lazy;
mod never;
+pub(crate) mod sync_wrapper;
pub(crate) mod task;
pub(crate) mod watch;
diff --git a/src/common/sync_wrapper.rs b/src/common/sync_wrapper.rs
new file mode 100644
index 00000000..1e4aa403
--- /dev/null
+++ b/src/common/sync_wrapper.rs
@@ -0,0 +1,115 @@
+/*
+ * This is a copy of the sync_wrapper crate.
+ */
+//! A mutual exclusion primitive that relies on static type information only
+//!
+//! This library is inspired by [this discussion](https://internals.rust-lang.org/t/what-shall-sync-mean-across-an-await/12020/2).
+#![doc(html_logo_url = "https://developer.actyx.com/img/logo.svg")]
+#![doc(html_favicon_url = "https://developer.actyx.com/img/favicon.ico")]
+
+/// A mutual exclusion primitive that relies on static type information only
+///
+/// In some cases synchronization can be proven statically: whenever you hold an exclusive `&mut`
+/// reference, the Rust type system ensures that no other part of the program can hold another
+/// reference to the data. Therefore it is safe to access it even if the current thread obtained
+/// this reference via a channel. Whenever this is the case, the overhead of allocating and locking
+/// a [`Mutex`] can be avoided by using this static version.
+///
+/// One example where this is often applicable is [`Future`], which requires an exclusive reference
+/// for its [`poll`] method: While a given `Future` implementation may not be safe to access by
+/// multiple threads concurrently, the executor can only run the `Future` on one thread at any
+/// given time, making it [`Sync`] in practice as long as the implementation is `Send`. You can
+/// therefore use the sync wrapper to prove that your data structure is `Sync` even though it
+/// contains such a `Future`.
+///
+/// # Example
+///
+/// ```ignore
+/// use hyper::common::sync_wrapper::SyncWrapper;
+/// use std::future::Future;
+///
+/// struct MyThing {
+/// future: SyncWrapper + Send>>,
+/// }
+///
+/// impl MyThing {
+/// // all accesses to `self.future` now require an exclusive reference or ownership
+/// }
+///
+/// fn assert_sync() {}
+///
+/// assert_sync::();
+/// ```
+///
+/// [`Mutex`]: https://doc.rust-lang.org/std/sync/struct.Mutex.html
+/// [`Future`]: https://doc.rust-lang.org/std/future/trait.Future.html
+/// [`poll`]: https://doc.rust-lang.org/std/future/trait.Future.html#method.poll
+/// [`Sync`]: https://doc.rust-lang.org/std/marker/trait.Sync.html
+#[repr(transparent)]
+pub struct SyncWrapper(T);
+
+impl SyncWrapper {
+ /// Creates a new SyncWrapper containing the given value.
+ ///
+ /// # Examples
+ ///
+ /// ```ignore
+ /// use hyper::common::sync_wrapper::SyncWrapper;
+ ///
+ /// let wrapped = SyncWrapper::new(42);
+ /// ```
+ pub fn new(value: T) -> Self {
+ Self(value)
+ }
+
+ /// Acquires a reference to the protected value.
+ ///
+ /// This is safe because it requires an exclusive reference to the wrapper. Therefore this method
+ /// neither panics nor does it return an error. This is in contrast to [`Mutex::get_mut`] which
+ /// returns an error if another thread panicked while holding the lock. It is not recommended
+ /// to send an exclusive reference to a potentially damaged value to another thread for further
+ /// processing.
+ ///
+ /// [`Mutex::get_mut`]: https://doc.rust-lang.org/std/sync/struct.Mutex.html#method.get_mut
+ ///
+ /// # Examples
+ ///
+ /// ```ignore
+ /// use hyper::common::sync_wrapper::SyncWrapper;
+ ///
+ /// let mut wrapped = SyncWrapper::new(42);
+ /// let value = wrapped.get_mut();
+ /// *value = 0;
+ /// assert_eq!(*wrapped.get_mut(), 0);
+ /// ```
+ pub fn get_mut(&mut self) -> &mut T {
+ &mut self.0
+ }
+
+ /// Consumes this wrapper, returning the underlying data.
+ ///
+ /// This is safe because it requires ownership of the wrapper, aherefore this method will neither
+ /// panic nor does it return an error. This is in contrast to [`Mutex::into_inner`] which
+ /// returns an error if another thread panicked while holding the lock. It is not recommended
+ /// to send an exclusive reference to a potentially damaged value to another thread for further
+ /// processing.
+ ///
+ /// [`Mutex::into_inner`]: https://doc.rust-lang.org/std/sync/struct.Mutex.html#method.into_inner
+ ///
+ /// # Examples
+ ///
+ /// ```ignore
+ /// use hyper::common::sync_wrapper::SyncWrapper;
+ ///
+ /// let mut wrapped = SyncWrapper::new(42);
+ /// assert_eq!(wrapped.into_inner(), 42);
+ /// ```
+ #[allow(dead_code)]
+ pub fn into_inner(self) -> T {
+ self.0
+ }
+}
+
+// this is safe because the only operations permitted on this data structure require exclusive
+// access or ownership
+unsafe impl Sync for SyncWrapper {}