From 245fa9c44c2a5c6d4e8c43614a438f83a9cc2be1 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Fri, 6 Dec 2019 13:15:39 -0800 Subject: [PATCH] refactor(server): remove Unpin requirement from the Body generic --- Cargo.toml | 2 +- src/proto/h2/mod.rs | 83 +++++++++++++++++++++++++----------------- src/proto/h2/server.rs | 15 +++----- src/server/conn.rs | 9 ----- src/server/mod.rs | 3 -- src/server/shutdown.rs | 3 -- 6 files changed, 55 insertions(+), 60 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 2b314931..a6fb5244 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,7 +27,7 @@ futures-util = { version = "0.3", default-features = false } http = "0.2" http-body = "0.3" httparse = "1.0" -h2 = "0.2" +h2 = "0.2.1" itoa = "0.4.1" log = "0.4" pin-project = "0.4" diff --git a/src/proto/h2/mod.rs b/src/proto/h2/mod.rs index 4a81f3c7..356b6715 100644 --- a/src/proto/h2/mod.rs +++ b/src/proto/h2/mod.rs @@ -5,6 +5,7 @@ use http::header::{ TRANSFER_ENCODING, UPGRADE, }; use http::HeaderMap; +use pin_project::pin_project; use crate::body::Payload; use crate::common::{task, Future, Pin, Poll}; @@ -74,12 +75,14 @@ fn strip_connection_headers(headers: &mut HeaderMap, is_request: bool) { // body adapters used by both Client and Server +#[pin_project] struct PipeToSendStream where S: Payload, { body_tx: SendStream>, data_done: bool, + #[pin] stream: S, } @@ -94,39 +97,26 @@ where stream: stream, } } - - fn on_user_err(&mut self, err: S::Error) -> crate::Error { - let err = crate::Error::new_user_body(err); - debug!("send body user stream error: {}", err); - self.body_tx.send_reset(err.h2_reason()); - err - } - - fn send_eos_frame(&mut self) -> crate::Result<()> { - trace!("send body eos"); - self.body_tx - .send_data(SendBuf(None), true) - .map_err(crate::Error::new_body_write) - } } impl Future for PipeToSendStream where - S: Payload + Unpin, + S: Payload, { type Output = crate::Result<()>; - fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + let mut me = self.project(); loop { - if !self.data_done { + if !*me.data_done { // we don't have the next chunk of data yet, so just reserve 1 byte to make // sure there's some capacity available. h2 will handle the capacity management // for the actual body chunk. - self.body_tx.reserve_capacity(1); + me.body_tx.reserve_capacity(1); - if self.body_tx.capacity() == 0 { + if me.body_tx.capacity() == 0 { loop { - match ready!(self.body_tx.poll_capacity(cx)) { + match ready!(me.body_tx.poll_capacity(cx)) { Some(Ok(0)) => {} Some(Ok(_)) => break, Some(Err(e)) => { @@ -136,7 +126,7 @@ where } } } else { - if let Poll::Ready(reason) = self + if let Poll::Ready(reason) = me .body_tx .poll_reset(cx) .map_err(crate::Error::new_body_write)? @@ -148,9 +138,9 @@ where } } - match ready!(Pin::new(&mut self.stream).poll_data(cx)) { + match ready!(me.stream.as_mut().poll_data(cx)) { Some(Ok(chunk)) => { - let is_eos = self.stream.is_end_stream(); + let is_eos = me.stream.is_end_stream(); trace!( "send body chunk: {} bytes, eos={}", chunk.remaining(), @@ -158,7 +148,7 @@ where ); let buf = SendBuf(Some(chunk)); - self.body_tx + me.body_tx .send_data(buf, is_eos) .map_err(crate::Error::new_body_write)?; @@ -166,20 +156,20 @@ where return Poll::Ready(Ok(())); } } - Some(Err(e)) => return Poll::Ready(Err(self.on_user_err(e))), + Some(Err(e)) => return Poll::Ready(Err(me.body_tx.on_user_err(e))), None => { - self.body_tx.reserve_capacity(0); - let is_eos = self.stream.is_end_stream(); + me.body_tx.reserve_capacity(0); + let is_eos = me.stream.is_end_stream(); if is_eos { - return Poll::Ready(self.send_eos_frame()); + return Poll::Ready(me.body_tx.send_eos_frame()); } else { - self.data_done = true; + *me.data_done = true; // loop again to poll_trailers } } } } else { - if let Poll::Ready(reason) = self + if let Poll::Ready(reason) = me .body_tx .poll_reset(cx) .map_err(|e| crate::Error::new_body_write(e))? @@ -190,24 +180,49 @@ where )))); } - match ready!(Pin::new(&mut self.stream).poll_trailers(cx)) { + match ready!(me.stream.poll_trailers(cx)) { Ok(Some(trailers)) => { - self.body_tx + me.body_tx .send_trailers(trailers) .map_err(crate::Error::new_body_write)?; return Poll::Ready(Ok(())); } Ok(None) => { // There were no trailers, so send an empty DATA frame... - return Poll::Ready(self.send_eos_frame()); + return Poll::Ready(me.body_tx.send_eos_frame()); } - Err(e) => return Poll::Ready(Err(self.on_user_err(e))), + Err(e) => return Poll::Ready(Err(me.body_tx.on_user_err(e))), } } } } } +trait SendStreamExt { + fn on_user_err(&mut self, err: E) -> crate::Error + where + E: Into>; + fn send_eos_frame(&mut self) -> crate::Result<()>; +} + +impl SendStreamExt for SendStream> { + fn on_user_err(&mut self, err: E) -> crate::Error + where + E: Into>, + { + let err = crate::Error::new_user_body(err); + debug!("send body user stream error: {}", err); + self.send_reset(err.h2_reason()); + err + } + + fn send_eos_frame(&mut self) -> crate::Result<()> { + trace!("send body eos"); + self.send_data(SendBuf(None), true) + .map_err(crate::Error::new_body_write) + } +} + struct SendBuf(Option); impl Buf for SendBuf { diff --git a/src/proto/h2/server.rs b/src/proto/h2/server.rs index 094748ae..daa5fa2d 100644 --- a/src/proto/h2/server.rs +++ b/src/proto/h2/server.rs @@ -51,7 +51,6 @@ where S: HttpService, S::Error: Into>, B: Payload, - B::Data: Unpin, E: H2Exec, { pub(crate) fn new(io: T, service: S, builder: &Builder, exec: E) -> Server { @@ -89,7 +88,6 @@ where S: HttpService, S::Error: Into>, B: Payload, - B::Data: Unpin, E: H2Exec, { type Output = crate::Result; @@ -124,7 +122,6 @@ impl Serving where T: AsyncRead + AsyncWrite + Unpin, B: Payload, - B::Data: Unpin, { fn poll_server( &mut self, @@ -216,7 +213,7 @@ where B: Payload, { Service(#[pin] F), - Body(PipeToSendStream), + Body(#[pin] PipeToSendStream), } impl H2Stream @@ -247,8 +244,7 @@ macro_rules! reply { impl H2Stream where F: Future, E>>, - B: Payload + Unpin, - B::Data: Unpin, + B: Payload, E: Into>, { #[project] @@ -303,8 +299,8 @@ where return Poll::Ready(Ok(())); } } - H2StreamState::Body(ref mut pipe) => { - return Pin::new(pipe).poll(cx); + H2StreamState::Body(pipe) => { + return pipe.poll(cx); } }; me.state.set(next); @@ -315,8 +311,7 @@ where impl Future for H2Stream where F: Future, E>>, - B: Payload + Unpin, - B::Data: Unpin, + B: Payload, E: Into>, { type Output = (); diff --git a/src/server/conn.rs b/src/server/conn.rs index 881d6901..689b2308 100644 --- a/src/server/conn.rs +++ b/src/server/conn.rs @@ -382,7 +382,6 @@ impl Http { S: HttpService, S::Error: Into>, Bd: Payload, - Bd::Data: Unpin, I: AsyncRead + AsyncWrite + Unpin, E: H2Exec, { @@ -449,7 +448,6 @@ where S::Error: Into>, I: AsyncRead + AsyncWrite + Unpin, B: Payload + 'static, - B::Data: Unpin, E: H2Exec, { /// Start a graceful shutdown process for this connection. @@ -588,7 +586,6 @@ where S::Error: Into>, I: AsyncRead + AsyncWrite + Unpin + 'static, B: Payload + 'static, - B::Data: Unpin, E: H2Exec, { type Output = crate::Result<()>; @@ -708,7 +705,6 @@ where F: Future>, S: HttpService, B: Payload, - B::Data: Unpin, E: H2Exec, { type Output = Result, FE>; @@ -779,7 +775,6 @@ where S: HttpService, S::Error: Into>, B: Payload, - B::Data: Unpin, E: H2Exec, { type Output = crate::Result; @@ -827,7 +822,6 @@ pub(crate) mod spawn_all { where I: AsyncRead + AsyncWrite + Unpin + Send + 'static, S: HttpService, - ::Data: Unpin, E: H2Exec, { type Future = UpgradeableConnection; @@ -875,7 +869,6 @@ pub(crate) mod spawn_all { NE: Into>, S: HttpService, B: Payload, - B::Data: Unpin, E: H2Exec, W: Watcher, { @@ -943,7 +936,6 @@ mod upgrades { S::Error: Into>, I: AsyncRead + AsyncWrite + Unpin, B: Payload + 'static, - B::Data: Unpin, E: H2Exec, { /// Start a graceful shutdown process for this connection. @@ -961,7 +953,6 @@ mod upgrades { S::Error: Into>, I: AsyncRead + AsyncWrite + Unpin + Send + 'static, B: Payload + 'static, - B::Data: Unpin, E: super::H2Exec, { type Output = crate::Result<()>; diff --git a/src/server/mod.rs b/src/server/mod.rs index c28dcfd9..ec767b8e 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -150,7 +150,6 @@ where S: MakeServiceRef, S::Error: Into>, B: Payload, - B::Data: Unpin, E: H2Exec<>::Future, B>, E: NewSvcExec, { @@ -206,7 +205,6 @@ where S: MakeServiceRef, S::Error: Into>, B: Payload, - B::Data: Unpin, E: H2Exec<>::Future, B>, E: NewSvcExec, { @@ -388,7 +386,6 @@ impl Builder { S: MakeServiceRef, S::Error: Into>, B: Payload, - B::Data: Unpin, E: NewSvcExec, E: H2Exec<>::Future, B>, { diff --git a/src/server/shutdown.rs b/src/server/shutdown.rs index 67d29011..1dc668ce 100644 --- a/src/server/shutdown.rs +++ b/src/server/shutdown.rs @@ -51,7 +51,6 @@ where S: MakeServiceRef, S::Error: Into>, B: Payload, - B::Data: Unpin, F: Future, E: H2Exec<>::Future, B>, E: NewSvcExec, @@ -98,7 +97,6 @@ impl Watcher for GracefulWatcher where I: AsyncRead + AsyncWrite + Unpin + Send + 'static, S: HttpService, - ::Data: Unpin, E: H2Exec, { type Future = @@ -115,7 +113,6 @@ where S::Error: Into>, I: AsyncRead + AsyncWrite + Unpin, S::ResBody: Payload + 'static, - ::Data: Unpin, E: H2Exec, { conn.graceful_shutdown()