diff --git a/Cargo.toml b/Cargo.toml index 53f90799..a262ebda 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,7 +32,7 @@ iovec = "0.1" itoa = "0.4.1" log = "0.4" net2 = { version = "0.2.32", optional = true } -pin-project = { version = "=0.4.0-alpha.11", features = ["project_attr"] } +pin-project = "0.4" time = "0.1" tokio = { version = "=0.2.0-alpha.5", optional = true, default-features = false, features = ["rt-full"] } tower-service = "=0.3.0-alpha.1" @@ -193,3 +193,18 @@ required-features = ["runtime", "unstable-stream"] name = "server" path = "tests/server.rs" required-features = ["runtime", "unstable-stream"] + +[patch.crates-io] +tokio = { git = "https://github.com/tokio-rs/tokio" } +tokio-codec = { git = "https://github.com/tokio-rs/tokio" } +tokio-executor = { git = "https://github.com/tokio-rs/tokio" } +tokio-fs = { git = "https://github.com/tokio-rs/tokio" } +tokio-io = { git = "https://github.com/tokio-rs/tokio" } +tokio-macros = { git = "https://github.com/tokio-rs/tokio" } +tokio-net = { git = "https://github.com/tokio-rs/tokio" } +tokio-sync = { git = "https://github.com/tokio-rs/tokio" } +tokio-test = { git = "https://github.com/tokio-rs/tokio" } +tokio-timer = { git = "https://github.com/tokio-rs/tokio" } + +tower-service = { git = "https://github.com/taiki-e/tower", branch = "pin-project" } +tower-make = { git = "https://github.com/taiki-e/tower", branch = "pin-project" } diff --git a/src/common/drain.rs b/src/common/drain.rs index 822b3e61..119308cc 100644 --- a/src/common/drain.rs +++ b/src/common/drain.rs @@ -98,9 +98,9 @@ where { type Output = F::Output; - fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + let mut me = self.project(); loop { - let me = self.project(); match mem::replace(me.state, State::Draining) { State::Watch(on_drain) => { let recv = me.watch.rx.recv_ref(); @@ -109,7 +109,7 @@ where match recv.poll_unpin(cx) { Poll::Ready(None) => { // Drain has been triggered! - on_drain(me.future); + on_drain(me.future.as_mut()); }, Poll::Ready(Some(_/*State::Open*/)) | Poll::Pending => { diff --git a/src/proto/h2/server.rs b/src/proto/h2/server.rs index 2e658d48..8f7d23d8 100644 --- a/src/proto/h2/server.rs +++ b/src/proto/h2/server.rs @@ -254,11 +254,11 @@ where E: Into>, { #[project] - fn poll2(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { + fn poll2(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { + let mut me = self.project(); loop { - let mut me = self.project(); #[project] - let next = match me.state.project() { + let next = match me.state.as_mut().project() { H2StreamState::Service(h) => { let res = match h.poll(cx) { Poll::Ready(Ok(r)) => r, diff --git a/src/server/conn.rs b/src/server/conn.rs index 5d80cc70..d4c7e9fc 100644 --- a/src/server/conn.rs +++ b/src/server/conn.rs @@ -509,7 +509,7 @@ where /// /// This `Connection` should continue to be polled until shutdown /// can finish. - pub fn graceful_shutdown(mut self: Pin<&mut Self>) { + pub fn graceful_shutdown(self: Pin<&mut Self>) { match self.project().conn.as_mut().unwrap() { Either::A(ref mut h1) => { h1.disable_keep_alive(); @@ -727,8 +727,9 @@ where B: Payload, E: H2Exec<>::Future, B>, { - fn poll_next_(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll>>> { - match ready!(self.project().make_service.poll_ready_ref(cx)) { + fn poll_next_(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll>>> { + let me = self.project(); + match ready!(me.make_service.poll_ready_ref(cx)) { Ok(()) => (), Err(e) => { trace!("make_service closed"); @@ -736,13 +737,13 @@ where } } - if let Some(item) = ready!(self.project().incoming.poll_accept(cx)) { + if let Some(item) = ready!(me.incoming.poll_accept(cx)) { let io = item.map_err(crate::Error::new_accept)?; - let new_fut = self.project().make_service.make_service_ref(&io); + let new_fut = me.make_service.make_service_ref(&io); Poll::Ready(Some(Ok(Connecting { future: new_fut, io: Some(io), - protocol: self.protocol.clone(), + protocol: me.protocol.clone(), }))) } else { Poll::Ready(None) @@ -781,10 +782,11 @@ where { type Output = Result, FE>; - fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { - let service = ready!(self.project().future.poll(cx))?; - let io = self.project().io.take().expect("polled after complete"); - Poll::Ready(Ok(self.protocol.serve_connection(io, service))) + fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + let me = self.project(); + let service = ready!(me.future.poll(cx))?; + let io = me.io.take().expect("polled after complete"); + Poll::Ready(Ok(me.protocol.serve_connection(io, service))) } } @@ -816,15 +818,16 @@ where B: Payload, E: H2Exec<>::Future, B>, { - pub(super) fn poll_watch(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, watcher: &W) -> Poll> + pub(super) fn poll_watch(self: Pin<&mut Self>, cx: &mut task::Context<'_>, watcher: &W) -> Poll> where E: NewSvcExec, W: Watcher, { + let mut me = self.project(); loop { - if let Some(connecting) = ready!(self.project().serve.poll_next_(cx)?) { + if let Some(connecting) = ready!(me.serve.as_mut().poll_next_(cx)?) { let fut = NewSvcTask::new(connecting, watcher.clone()); - self.project().serve.project().protocol.exec.execute_new_svc(fut)?; + me.serve.as_mut().project().protocol.exec.execute_new_svc(fut)?; } else { return Poll::Ready(Ok(())); } @@ -841,8 +844,7 @@ where type Output = A::Output; #[project] - fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { - // Just simple pin projection to the inner variants + fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { #[project] match self.project() { Either::A(a) => a.poll(cx), @@ -939,16 +941,16 @@ pub(crate) mod spawn_all { type Output = (); #[project] - fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { // If it weren't for needing to name this type so the `Send` bounds // could be projected to the `Serve` executor, this could just be // an `async fn`, and much safer. Woe is me. + let mut me = self.project(); loop { - let mut me = self.project(); let next = { #[project] - match me.state.project() { + match me.state.as_mut().project() { State::Connecting(connecting, watcher) => { let res = ready!(connecting.poll(cx)); let conn = match res { @@ -963,7 +965,7 @@ pub(crate) mod spawn_all { State::Connected(connected) }, State::Connected(future) => { - return future + return future .poll(cx) .map(|res| { if let Err(err) = res { diff --git a/src/server/mod.rs b/src/server/mod.rs index da125b38..b891d6f5 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -214,7 +214,7 @@ where { type Output = crate::Result<()>; - fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { self.project().spawn_all.poll_watch(cx, &NoopWatcher) } } diff --git a/src/server/shutdown.rs b/src/server/shutdown.rs index 4591a1a0..fec48e90 100644 --- a/src/server/shutdown.rs +++ b/src/server/shutdown.rs @@ -61,12 +61,12 @@ where type Output = crate::Result<()>; #[project] - fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { let mut me = self.project(); loop { let next = { #[project] - match me.state.project() { + match me.state.as_mut().project() { State::Running { drain, spawn_all,