refactor(server): remove Unpin requirement from the Body generic

This commit is contained in:
Sean McArthur
2019-12-06 13:15:39 -08:00
parent 71101e701f
commit 245fa9c44c
6 changed files with 55 additions and 60 deletions

View File

@@ -27,7 +27,7 @@ futures-util = { version = "0.3", default-features = false }
http = "0.2" http = "0.2"
http-body = "0.3" http-body = "0.3"
httparse = "1.0" httparse = "1.0"
h2 = "0.2" h2 = "0.2.1"
itoa = "0.4.1" itoa = "0.4.1"
log = "0.4" log = "0.4"
pin-project = "0.4" pin-project = "0.4"

View File

@@ -5,6 +5,7 @@ use http::header::{
TRANSFER_ENCODING, UPGRADE, TRANSFER_ENCODING, UPGRADE,
}; };
use http::HeaderMap; use http::HeaderMap;
use pin_project::pin_project;
use crate::body::Payload; use crate::body::Payload;
use crate::common::{task, Future, Pin, Poll}; use crate::common::{task, Future, Pin, Poll};
@@ -74,12 +75,14 @@ fn strip_connection_headers(headers: &mut HeaderMap, is_request: bool) {
// body adapters used by both Client and Server // body adapters used by both Client and Server
#[pin_project]
struct PipeToSendStream<S> struct PipeToSendStream<S>
where where
S: Payload, S: Payload,
{ {
body_tx: SendStream<SendBuf<S::Data>>, body_tx: SendStream<SendBuf<S::Data>>,
data_done: bool, data_done: bool,
#[pin]
stream: S, stream: S,
} }
@@ -94,39 +97,26 @@ where
stream: stream, stream: stream,
} }
} }
fn on_user_err(&mut self, err: S::Error) -> crate::Error {
let err = crate::Error::new_user_body(err);
debug!("send body user stream error: {}", err);
self.body_tx.send_reset(err.h2_reason());
err
}
fn send_eos_frame(&mut self) -> crate::Result<()> {
trace!("send body eos");
self.body_tx
.send_data(SendBuf(None), true)
.map_err(crate::Error::new_body_write)
}
} }
impl<S> Future for PipeToSendStream<S> impl<S> Future for PipeToSendStream<S>
where where
S: Payload + Unpin, S: Payload,
{ {
type Output = crate::Result<()>; type Output = crate::Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
let mut me = self.project();
loop { loop {
if !self.data_done { if !*me.data_done {
// we don't have the next chunk of data yet, so just reserve 1 byte to make // we don't have the next chunk of data yet, so just reserve 1 byte to make
// sure there's some capacity available. h2 will handle the capacity management // sure there's some capacity available. h2 will handle the capacity management
// for the actual body chunk. // for the actual body chunk.
self.body_tx.reserve_capacity(1); me.body_tx.reserve_capacity(1);
if self.body_tx.capacity() == 0 { if me.body_tx.capacity() == 0 {
loop { loop {
match ready!(self.body_tx.poll_capacity(cx)) { match ready!(me.body_tx.poll_capacity(cx)) {
Some(Ok(0)) => {} Some(Ok(0)) => {}
Some(Ok(_)) => break, Some(Ok(_)) => break,
Some(Err(e)) => { Some(Err(e)) => {
@@ -136,7 +126,7 @@ where
} }
} }
} else { } else {
if let Poll::Ready(reason) = self if let Poll::Ready(reason) = me
.body_tx .body_tx
.poll_reset(cx) .poll_reset(cx)
.map_err(crate::Error::new_body_write)? .map_err(crate::Error::new_body_write)?
@@ -148,9 +138,9 @@ where
} }
} }
match ready!(Pin::new(&mut self.stream).poll_data(cx)) { match ready!(me.stream.as_mut().poll_data(cx)) {
Some(Ok(chunk)) => { Some(Ok(chunk)) => {
let is_eos = self.stream.is_end_stream(); let is_eos = me.stream.is_end_stream();
trace!( trace!(
"send body chunk: {} bytes, eos={}", "send body chunk: {} bytes, eos={}",
chunk.remaining(), chunk.remaining(),
@@ -158,7 +148,7 @@ where
); );
let buf = SendBuf(Some(chunk)); let buf = SendBuf(Some(chunk));
self.body_tx me.body_tx
.send_data(buf, is_eos) .send_data(buf, is_eos)
.map_err(crate::Error::new_body_write)?; .map_err(crate::Error::new_body_write)?;
@@ -166,20 +156,20 @@ where
return Poll::Ready(Ok(())); return Poll::Ready(Ok(()));
} }
} }
Some(Err(e)) => return Poll::Ready(Err(self.on_user_err(e))), Some(Err(e)) => return Poll::Ready(Err(me.body_tx.on_user_err(e))),
None => { None => {
self.body_tx.reserve_capacity(0); me.body_tx.reserve_capacity(0);
let is_eos = self.stream.is_end_stream(); let is_eos = me.stream.is_end_stream();
if is_eos { if is_eos {
return Poll::Ready(self.send_eos_frame()); return Poll::Ready(me.body_tx.send_eos_frame());
} else { } else {
self.data_done = true; *me.data_done = true;
// loop again to poll_trailers // loop again to poll_trailers
} }
} }
} }
} else { } else {
if let Poll::Ready(reason) = self if let Poll::Ready(reason) = me
.body_tx .body_tx
.poll_reset(cx) .poll_reset(cx)
.map_err(|e| crate::Error::new_body_write(e))? .map_err(|e| crate::Error::new_body_write(e))?
@@ -190,24 +180,49 @@ where
)))); ))));
} }
match ready!(Pin::new(&mut self.stream).poll_trailers(cx)) { match ready!(me.stream.poll_trailers(cx)) {
Ok(Some(trailers)) => { Ok(Some(trailers)) => {
self.body_tx me.body_tx
.send_trailers(trailers) .send_trailers(trailers)
.map_err(crate::Error::new_body_write)?; .map_err(crate::Error::new_body_write)?;
return Poll::Ready(Ok(())); return Poll::Ready(Ok(()));
} }
Ok(None) => { Ok(None) => {
// There were no trailers, so send an empty DATA frame... // There were no trailers, so send an empty DATA frame...
return Poll::Ready(self.send_eos_frame()); return Poll::Ready(me.body_tx.send_eos_frame());
} }
Err(e) => return Poll::Ready(Err(self.on_user_err(e))), Err(e) => return Poll::Ready(Err(me.body_tx.on_user_err(e))),
} }
} }
} }
} }
} }
trait SendStreamExt {
fn on_user_err<E>(&mut self, err: E) -> crate::Error
where
E: Into<Box<dyn std::error::Error + Send + Sync>>;
fn send_eos_frame(&mut self) -> crate::Result<()>;
}
impl<B: Buf> SendStreamExt for SendStream<SendBuf<B>> {
fn on_user_err<E>(&mut self, err: E) -> crate::Error
where
E: Into<Box<dyn std::error::Error + Send + Sync>>,
{
let err = crate::Error::new_user_body(err);
debug!("send body user stream error: {}", err);
self.send_reset(err.h2_reason());
err
}
fn send_eos_frame(&mut self) -> crate::Result<()> {
trace!("send body eos");
self.send_data(SendBuf(None), true)
.map_err(crate::Error::new_body_write)
}
}
struct SendBuf<B>(Option<B>); struct SendBuf<B>(Option<B>);
impl<B: Buf> Buf for SendBuf<B> { impl<B: Buf> Buf for SendBuf<B> {

View File

@@ -51,7 +51,6 @@ where
S: HttpService<Body, ResBody = B>, S: HttpService<Body, ResBody = B>,
S::Error: Into<Box<dyn StdError + Send + Sync>>, S::Error: Into<Box<dyn StdError + Send + Sync>>,
B: Payload, B: Payload,
B::Data: Unpin,
E: H2Exec<S::Future, B>, E: H2Exec<S::Future, B>,
{ {
pub(crate) fn new(io: T, service: S, builder: &Builder, exec: E) -> Server<T, S, B, E> { pub(crate) fn new(io: T, service: S, builder: &Builder, exec: E) -> Server<T, S, B, E> {
@@ -89,7 +88,6 @@ where
S: HttpService<Body, ResBody = B>, S: HttpService<Body, ResBody = B>,
S::Error: Into<Box<dyn StdError + Send + Sync>>, S::Error: Into<Box<dyn StdError + Send + Sync>>,
B: Payload, B: Payload,
B::Data: Unpin,
E: H2Exec<S::Future, B>, E: H2Exec<S::Future, B>,
{ {
type Output = crate::Result<Dispatched>; type Output = crate::Result<Dispatched>;
@@ -124,7 +122,6 @@ impl<T, B> Serving<T, B>
where where
T: AsyncRead + AsyncWrite + Unpin, T: AsyncRead + AsyncWrite + Unpin,
B: Payload, B: Payload,
B::Data: Unpin,
{ {
fn poll_server<S, E>( fn poll_server<S, E>(
&mut self, &mut self,
@@ -216,7 +213,7 @@ where
B: Payload, B: Payload,
{ {
Service(#[pin] F), Service(#[pin] F),
Body(PipeToSendStream<B>), Body(#[pin] PipeToSendStream<B>),
} }
impl<F, B> H2Stream<F, B> impl<F, B> H2Stream<F, B>
@@ -247,8 +244,7 @@ macro_rules! reply {
impl<F, B, E> H2Stream<F, B> impl<F, B, E> H2Stream<F, B>
where where
F: Future<Output = Result<Response<B>, E>>, F: Future<Output = Result<Response<B>, E>>,
B: Payload + Unpin, B: Payload,
B::Data: Unpin,
E: Into<Box<dyn StdError + Send + Sync>>, E: Into<Box<dyn StdError + Send + Sync>>,
{ {
#[project] #[project]
@@ -303,8 +299,8 @@ where
return Poll::Ready(Ok(())); return Poll::Ready(Ok(()));
} }
} }
H2StreamState::Body(ref mut pipe) => { H2StreamState::Body(pipe) => {
return Pin::new(pipe).poll(cx); return pipe.poll(cx);
} }
}; };
me.state.set(next); me.state.set(next);
@@ -315,8 +311,7 @@ where
impl<F, B, E> Future for H2Stream<F, B> impl<F, B, E> Future for H2Stream<F, B>
where where
F: Future<Output = Result<Response<B>, E>>, F: Future<Output = Result<Response<B>, E>>,
B: Payload + Unpin, B: Payload,
B::Data: Unpin,
E: Into<Box<dyn StdError + Send + Sync>>, E: Into<Box<dyn StdError + Send + Sync>>,
{ {
type Output = (); type Output = ();

View File

@@ -382,7 +382,6 @@ impl<E> Http<E> {
S: HttpService<Body, ResBody = Bd>, S: HttpService<Body, ResBody = Bd>,
S::Error: Into<Box<dyn StdError + Send + Sync>>, S::Error: Into<Box<dyn StdError + Send + Sync>>,
Bd: Payload, Bd: Payload,
Bd::Data: Unpin,
I: AsyncRead + AsyncWrite + Unpin, I: AsyncRead + AsyncWrite + Unpin,
E: H2Exec<S::Future, Bd>, E: H2Exec<S::Future, Bd>,
{ {
@@ -449,7 +448,6 @@ where
S::Error: Into<Box<dyn StdError + Send + Sync>>, S::Error: Into<Box<dyn StdError + Send + Sync>>,
I: AsyncRead + AsyncWrite + Unpin, I: AsyncRead + AsyncWrite + Unpin,
B: Payload + 'static, B: Payload + 'static,
B::Data: Unpin,
E: H2Exec<S::Future, B>, E: H2Exec<S::Future, B>,
{ {
/// Start a graceful shutdown process for this connection. /// Start a graceful shutdown process for this connection.
@@ -588,7 +586,6 @@ where
S::Error: Into<Box<dyn StdError + Send + Sync>>, S::Error: Into<Box<dyn StdError + Send + Sync>>,
I: AsyncRead + AsyncWrite + Unpin + 'static, I: AsyncRead + AsyncWrite + Unpin + 'static,
B: Payload + 'static, B: Payload + 'static,
B::Data: Unpin,
E: H2Exec<S::Future, B>, E: H2Exec<S::Future, B>,
{ {
type Output = crate::Result<()>; type Output = crate::Result<()>;
@@ -708,7 +705,6 @@ where
F: Future<Output = Result<S, FE>>, F: Future<Output = Result<S, FE>>,
S: HttpService<Body, ResBody = B>, S: HttpService<Body, ResBody = B>,
B: Payload, B: Payload,
B::Data: Unpin,
E: H2Exec<S::Future, B>, E: H2Exec<S::Future, B>,
{ {
type Output = Result<Connection<I, S, E>, FE>; type Output = Result<Connection<I, S, E>, FE>;
@@ -779,7 +775,6 @@ where
S: HttpService<Body, ResBody = B>, S: HttpService<Body, ResBody = B>,
S::Error: Into<Box<dyn StdError + Send + Sync>>, S::Error: Into<Box<dyn StdError + Send + Sync>>,
B: Payload, B: Payload,
B::Data: Unpin,
E: H2Exec<S::Future, B>, E: H2Exec<S::Future, B>,
{ {
type Output = crate::Result<proto::Dispatched>; type Output = crate::Result<proto::Dispatched>;
@@ -827,7 +822,6 @@ pub(crate) mod spawn_all {
where where
I: AsyncRead + AsyncWrite + Unpin + Send + 'static, I: AsyncRead + AsyncWrite + Unpin + Send + 'static,
S: HttpService<Body>, S: HttpService<Body>,
<S::ResBody as Payload>::Data: Unpin,
E: H2Exec<S::Future, S::ResBody>, E: H2Exec<S::Future, S::ResBody>,
{ {
type Future = UpgradeableConnection<I, S, E>; type Future = UpgradeableConnection<I, S, E>;
@@ -875,7 +869,6 @@ pub(crate) mod spawn_all {
NE: Into<Box<dyn StdError + Send + Sync>>, NE: Into<Box<dyn StdError + Send + Sync>>,
S: HttpService<Body, ResBody = B>, S: HttpService<Body, ResBody = B>,
B: Payload, B: Payload,
B::Data: Unpin,
E: H2Exec<S::Future, B>, E: H2Exec<S::Future, B>,
W: Watcher<I, S, E>, W: Watcher<I, S, E>,
{ {
@@ -943,7 +936,6 @@ mod upgrades {
S::Error: Into<Box<dyn StdError + Send + Sync>>, S::Error: Into<Box<dyn StdError + Send + Sync>>,
I: AsyncRead + AsyncWrite + Unpin, I: AsyncRead + AsyncWrite + Unpin,
B: Payload + 'static, B: Payload + 'static,
B::Data: Unpin,
E: H2Exec<S::Future, B>, E: H2Exec<S::Future, B>,
{ {
/// Start a graceful shutdown process for this connection. /// Start a graceful shutdown process for this connection.
@@ -961,7 +953,6 @@ mod upgrades {
S::Error: Into<Box<dyn StdError + Send + Sync>>, S::Error: Into<Box<dyn StdError + Send + Sync>>,
I: AsyncRead + AsyncWrite + Unpin + Send + 'static, I: AsyncRead + AsyncWrite + Unpin + Send + 'static,
B: Payload + 'static, B: Payload + 'static,
B::Data: Unpin,
E: super::H2Exec<S::Future, B>, E: super::H2Exec<S::Future, B>,
{ {
type Output = crate::Result<()>; type Output = crate::Result<()>;

View File

@@ -150,7 +150,6 @@ where
S: MakeServiceRef<IO, Body, ResBody = B>, S: MakeServiceRef<IO, Body, ResBody = B>,
S::Error: Into<Box<dyn StdError + Send + Sync>>, S::Error: Into<Box<dyn StdError + Send + Sync>>,
B: Payload, B: Payload,
B::Data: Unpin,
E: H2Exec<<S::Service as HttpService<Body>>::Future, B>, E: H2Exec<<S::Service as HttpService<Body>>::Future, B>,
E: NewSvcExec<IO, S::Future, S::Service, E, GracefulWatcher>, E: NewSvcExec<IO, S::Future, S::Service, E, GracefulWatcher>,
{ {
@@ -206,7 +205,6 @@ where
S: MakeServiceRef<IO, Body, ResBody = B>, S: MakeServiceRef<IO, Body, ResBody = B>,
S::Error: Into<Box<dyn StdError + Send + Sync>>, S::Error: Into<Box<dyn StdError + Send + Sync>>,
B: Payload, B: Payload,
B::Data: Unpin,
E: H2Exec<<S::Service as HttpService<Body>>::Future, B>, E: H2Exec<<S::Service as HttpService<Body>>::Future, B>,
E: NewSvcExec<IO, S::Future, S::Service, E, NoopWatcher>, E: NewSvcExec<IO, S::Future, S::Service, E, NoopWatcher>,
{ {
@@ -388,7 +386,6 @@ impl<I, E> Builder<I, E> {
S: MakeServiceRef<I::Conn, Body, ResBody = B>, S: MakeServiceRef<I::Conn, Body, ResBody = B>,
S::Error: Into<Box<dyn StdError + Send + Sync>>, S::Error: Into<Box<dyn StdError + Send + Sync>>,
B: Payload, B: Payload,
B::Data: Unpin,
E: NewSvcExec<I::Conn, S::Future, S::Service, E, NoopWatcher>, E: NewSvcExec<I::Conn, S::Future, S::Service, E, NoopWatcher>,
E: H2Exec<<S::Service as HttpService<Body>>::Future, B>, E: H2Exec<<S::Service as HttpService<Body>>::Future, B>,
{ {

View File

@@ -51,7 +51,6 @@ where
S: MakeServiceRef<IO, Body, ResBody = B>, S: MakeServiceRef<IO, Body, ResBody = B>,
S::Error: Into<Box<dyn StdError + Send + Sync>>, S::Error: Into<Box<dyn StdError + Send + Sync>>,
B: Payload, B: Payload,
B::Data: Unpin,
F: Future<Output = ()>, F: Future<Output = ()>,
E: H2Exec<<S::Service as HttpService<Body>>::Future, B>, E: H2Exec<<S::Service as HttpService<Body>>::Future, B>,
E: NewSvcExec<IO, S::Future, S::Service, E, GracefulWatcher>, E: NewSvcExec<IO, S::Future, S::Service, E, GracefulWatcher>,
@@ -98,7 +97,6 @@ impl<I, S, E> Watcher<I, S, E> for GracefulWatcher
where where
I: AsyncRead + AsyncWrite + Unpin + Send + 'static, I: AsyncRead + AsyncWrite + Unpin + Send + 'static,
S: HttpService<Body>, S: HttpService<Body>,
<S::ResBody as Payload>::Data: Unpin,
E: H2Exec<S::Future, S::ResBody>, E: H2Exec<S::Future, S::ResBody>,
{ {
type Future = type Future =
@@ -115,7 +113,6 @@ where
S::Error: Into<Box<dyn StdError + Send + Sync>>, S::Error: Into<Box<dyn StdError + Send + Sync>>,
I: AsyncRead + AsyncWrite + Unpin, I: AsyncRead + AsyncWrite + Unpin,
S::ResBody: Payload + 'static, S::ResBody: Payload + 'static,
<S::ResBody as Payload>::Data: Unpin,
E: H2Exec<S::Future, S::ResBody>, E: H2Exec<S::Future, S::ResBody>,
{ {
conn.graceful_shutdown() conn.graceful_shutdown()